turtleBot/flower_game_env.py

398 lines
15 KiB
Python

import gymnasium as gym
from gymnasium import spaces
import numpy as np
import mss
import cv2
import pyautogui
import time
from typing import Tuple, Optional, List
# ---------------- Hilfsfunktionen ----------------
def _centroid_from_mask(mask: np.ndarray) -> Tuple[Optional[int], Optional[int], int]:
cnt = int(cv2.countNonZero(mask))
if cnt == 0:
return None, None, 0
M = cv2.moments(mask)
if M["m00"] == 0:
return None, None, cnt
return int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"]), cnt
def _centroids_from_contours(
mask: np.ndarray,
ui_exclude_rects: List[Tuple[float, float, float, float]],
min_area_px: int,
circ_min: float,
aspect_tol: float,
extent_min: float,
solidity_min: float,
) -> List[Tuple[int, int, int]]:
"""
Liefert (cx, cy, area) für Konturen, die Bomben-Formkriterien erfüllen.
ui_exclude_rects akzeptiert Einträge entweder in Pixeln (x0,y0,x1,y1)
oder normiert (0..1). Normierte Werte werden auf die aktuelle Framegröße
umgerechnet.
"""
h, w = mask.shape
# UI-Zonen ausmaskieren (Pixel- oder Normalformate unterstützen)
if ui_exclude_rects:
for (x0, y0, x1, y1) in ui_exclude_rects:
# Wenn alle Koordinaten in [0,1], als normierte Eingaben interpretieren
if 0.0 <= x0 <= 1.0 and 0.0 <= x1 <= 1.0 and 0.0 <= y0 <= 1.0 and 0.0 <= y1 <= 1.0:
px0 = int(round(x0 * w))
px1 = int(round(x1 * w))
py0 = int(round(y0 * h))
py1 = int(round(y1 * h))
else:
px0, py0, px1, py1 = int(x0), int(y0), int(x1), int(y1)
# clamp
px0 = max(0, min(w, px0))
px1 = max(0, min(w, px1))
py0 = max(0, min(h, py0))
py1 = max(0, min(h, py1))
if py0 < py1 and px0 < px1:
mask[py0:py1, px0:px1] = 0
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
out = []
for c in contours:
area = float(cv2.contourArea(c))
if area < float(min_area_px):
continue
x, y, w_b, h_b = cv2.boundingRect(c)
if h_b == 0 or w_b == 0:
continue
aspect = w_b / float(h_b)
if not (1.0 - aspect_tol <= aspect <= 1.0 + aspect_tol):
continue
per = float(cv2.arcLength(c, True))
if per <= 0:
continue
circularity = 4.0 * np.pi * area / (per * per)
if circularity < circ_min:
continue
hull = cv2.convexHull(c)
hull_area = float(cv2.contourArea(hull))
if hull_area <= 0:
continue
solidity = area / hull_area
extent = area / float(w_b * h_b)
if solidity < solidity_min or extent < extent_min:
continue
M = cv2.moments(c)
if M["m00"] == 0:
continue
cx = int(M["m10"] / M["m00"])
cy = int(M["m01"] / M["m00"])
out.append((cx, cy, int(area)))
return out
# ---------------- Environment ----------------
class FlowerGameEnv(gym.Env):
"""
Beobachtung (nur positionsbasiert, kein Bild im Learning-Interface!):
obs = {"state": [tx, ty, fx, fy, bx, by]}
Alle Werte in [0,1] relativ zur aktuellen Framebreite/-höhe.
(bx,by) ist die nächste gültige Bombe relativ zur Turtle, sonst 0.
Actions: 0=W, 1=A, 2=S, 3=D
Rewards (größeninvariant, **achsenweise normiert**):
+1.0 bei Kontakt, definiert als Rechtecktest um die Turtle:
|tx - fx|/w <= eat_x_nd/2 UND |ty - fy|/h <= eat_y_nd/2
+shaping_gain * (eukl. Distanzabnahme zur Blume) [Distanz/diag]
-5.0 wenn irgendeine Bombe im Kollisionsrechteck liegt:
|tx - bx|/w <= collision_x_nd/2 UND |ty - by|/h <= collision_y_nd/2
Größeninvarianz:
- Schwellen sind feste Bruchteile der **Breite** bzw. **Höhe** eines
festen Referenzmaßes (ref_size). Standardmäßig (1900,1263), passend
zu deinem Setup; kann überschrieben werden.
- Bomben-Minimalfläche als Anteil an der Referenzfläche (w_ref*h_ref)
und pro Frame in Pixel umgerechnet.
- UI-Exclude-Rects: optional normiert (0..1) oder in Pixeln.
"""
metadata = {"render_modes": []}
def __init__(
self,
monitor_area,
ui_exclude_rects: Optional[List[Tuple[float, float, float, float]]] = None,
ref_size: Optional[Tuple[int, int]] = (1900, 1263), # feste Baseline
):
super().__init__()
self.monitor_area = monitor_area
self.sct = mss.mss()
# --- Observation & Actions (nur STATE) ---
self.observation_space = spaces.Dict(
{
"state": spaces.Box(low=0.0, high=1.0, shape=(6,), dtype=np.float32)
}
)
self.action_space = spaces.Discrete(4)
# --- HSV-Grenzen ---
self.yellow_lower = np.array([15, 40, 200], dtype=np.uint8)
self.yellow_upper = np.array([25, 120, 255], dtype=np.uint8)
self.white_lower = np.array([0, 0, 220], dtype=np.uint8)
self.white_upper = np.array([180, 50, 255], dtype=np.uint8)
self.black_lower = np.array([0, 0, 0], dtype=np.uint8)
self.black_upper = np.array([180, 80, 60], dtype=np.uint8)
self.green1_lower = np.array([30, 80, 80], dtype=np.uint8)
self.green1_upper = np.array([45, 255, 255], dtype=np.uint8)
self.green2_lower = np.array([65, 100, 80], dtype=np.uint8)
self.green2_upper = np.array([90, 255, 255], dtype=np.uint8)
self.kernel = np.ones((3, 3), np.uint8)
# --- Rechteckige Default-Schwellen (volle Breite/Höhe in Pixel) ---
self._eat_x_px_default = 320
self._eat_y_px_default = 220
self._collision_x_px_default = 320
self._collision_y_px_default = 220
self._bomb_min_area_px_default = 400
# --- Feste Referenzgröße (größeninvariante Bruchteile) ---
if ref_size is not None:
w_ref, h_ref = int(ref_size[0]), int(ref_size[1])
else:
w_ref = int(self.monitor_area.get("width", 1))
h_ref = int(self.monitor_area.get("height", 1))
self.w_ref = max(1, w_ref)
self.h_ref = max(1, h_ref)
# --- Bruchteile relativ zu w_ref/h_ref (achsenweise Normierung) ---
# Semantik: Werte beziehen sich auf die **volle** Rechteckbreite/-höhe;
# in den Tests werden Halbachsen (= */2) verwendet.
self.eat_x_nd = float(self._eat_x_px_default) / float(self.w_ref)
self.eat_y_nd = float(self._eat_y_px_default) / float(self.h_ref)
self.collision_x_nd = float(self._collision_x_px_default) / float(self.w_ref)
self.collision_y_nd = float(self._collision_y_px_default) / float(self.h_ref)
self.bomb_min_area_frac = float(self._bomb_min_area_px_default) / float(self.w_ref * self.h_ref)
# Reward-/Heuristik-Parameter (dimensionslos)
self.shaping_gain = 1.0
self.eat_reward = 1.0
self.collision_penalty = 5.0
# Event-Cooldown
self.contact_cooldown_frames = 8
self._cooldown = 0
self.prev_dist_to_flower_nd = None
self.prev_flower_x = None
self.prev_flower_y = None
self.flowers_eaten = 0
# Bomben-Filter (konstant, bis auf min_area -> wird aus frac abgeleitet)
self.bomb_circ_min = 0.60
self.bomb_aspect_tol = 0.35
self.bomb_extent_min = 0.60
self.bomb_solidity_min = 0.85
# UI-Ausschlusszonen (px oder normiert), relativ zum monitor_area
self.ui_exclude_rects = ui_exclude_rects or []
# Cache
self._last_cache = {
"turtle_xy": (None, None),
"flower_xy": (None, None),
"bombs_xy": [],
"turtle_found": False,
"flower_found": False,
"frame_hw": (1, 1),
}
self.raw = None;
# ---------------- Gymnasium API ----------------
def reset(self, seed=None, options=None):
super().reset(seed=seed)
self.prev_dist_to_flower_nd = None
self._cooldown = 0
self.flowers_eaten = 0
obs = self._build_observation()
return obs, {}
def step(self, action):
if action == 0:
pyautogui.press("w")
elif action == 1:
pyautogui.press("a")
elif action == 2:
pyautogui.press("s")
elif action == 3:
pyautogui.press("d")
time.sleep(0.01)
obs = self._build_observation()
reward = self._calculate_reward()
if self._cooldown > 0:
self._cooldown -= 1
info = {
"flowers_eaten": self.flowers_eaten,
"bombs_expected": self._bombs_expected(self.flowers_eaten),
"bombs_detected": len(self._last_cache["bombs_xy"]),
# Semantik: *_nd sind Bruchteile von w bzw. h (nicht Diagonale)
"eat_x_nd": self.eat_x_nd,
"eat_y_nd": self.eat_y_nd,
"collision_x_nd": self.collision_x_nd,
"collision_y_nd": self.collision_y_nd,
# Anteil bezogen auf Referenzfläche, in Pixel pro Frame: frac * (w*h)
"bomb_min_area_frac": self.bomb_min_area_frac,
"ref_size": (self.w_ref, self.h_ref),
}
return obs, reward, False, False, info
# ---------------- Erkennung & Beobachtung ----------------
def _grab_bgr(self) -> np.ndarray:
raw = np.array(self.sct.grab(self.monitor_area)) # BGRA
self.raw = raw
return cv2.cvtColor(raw, cv2.COLOR_BGRA2BGR)
def _detect_entities(self, frame_bgr):
h, w, _ = frame_bgr.shape
hsv = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2HSV)
# Flower (weiß ∧ gelb)
mw = cv2.inRange(hsv, self.white_lower, self.white_upper)
my = cv2.inRange(hsv, self.yellow_lower, self.yellow_upper)
mw = cv2.morphologyEx(mw, cv2.MORPH_DILATE, self.kernel, iterations=1)
my = cv2.morphologyEx(my, cv2.MORPH_DILATE, self.kernel, iterations=1)
mf = cv2.bitwise_and(mw, my)
mf = cv2.morphologyEx(mf, cv2.MORPH_CLOSE, self.kernel, iterations=1)
fx, fy, _ = _centroid_from_mask(mf)
flower_found = (fx is not None and fy is not None)
# Bomben (mit formbasierter Filterung & UI-Exklusion); min_area in px aus **aktueller** Framefläche ableiten
mb = cv2.inRange(hsv, self.black_lower, self.black_upper)
min_area_px = max(1, int(round(self.bomb_min_area_frac * h * w)))
bombs_xy = _centroids_from_contours(
mb.copy(),
self.ui_exclude_rects,
min_area_px=min_area_px,
circ_min=self.bomb_circ_min,
aspect_tol=self.bomb_aspect_tol,
extent_min=self.bomb_extent_min,
solidity_min=self.bomb_solidity_min,
)
# Turtle (grün: zwei Bereiche OR)
g1 = cv2.inRange(hsv, self.green1_lower, self.green1_upper)
g2 = cv2.inRange(hsv, self.green2_lower, self.green2_upper)
mg = cv2.bitwise_or(g1, g2)
mg = cv2.morphologyEx(mg, cv2.MORPH_OPEN, self.kernel, iterations=1)
mg = cv2.morphologyEx(mg, cv2.MORPH_DILATE, self.kernel, iterations=1)
tx, ty, _ = _centroid_from_mask(mg)
turtle_found = (tx is not None and ty is not None)
# State: nächste Bombe relativ zur Turtle (normierte Koordinaten)
def nxy(x, y):
if x is None or y is None:
return 0.0, 0.0
return x / float(w), y / float(h)
nbx, nby = 0.0, 0.0
if turtle_found and bombs_xy:
txf, tyf = float(tx), float(ty)
dists = [(np.hypot(bx - txf, by - tyf), (bx, by)) for (bx, by, _a) in bombs_xy]
_, (nbx_px, nby_px) = min(dists, key=lambda x: x[0])
nbx, nby = nxy(nbx_px, nby_px)
n_tx, n_ty = nxy(tx, ty)
n_fx, n_fy = nxy(fx, fy)
return {
"state_norm": np.array([n_tx, n_ty, n_fx, n_fy, nbx, nby], dtype=np.float32),
"turtle_xy": (tx, ty),
"flower_xy": (fx, fy),
"bombs_xy": bombs_xy,
"turtle_found": turtle_found,
"flower_found": flower_found,
"frame_hw": (h, w),
}
def _build_observation(self):
frame_bgr = self._grab_bgr()
det = self._detect_entities(frame_bgr)
self._last_cache = det
# Nur der positionsbasierte Zustand wird als Beobachtung exponiert
return {"state": det["state_norm"]}
# ---------------- Rewards ----------------
def _calculate_reward(self) -> float:
det = self._last_cache
reward = 0.0
tx, ty = det["turtle_xy"]
fx, fy = det["flower_xy"]
tf = det["turtle_found"]
ff = det["flower_found"]
bombs_xy = det["bombs_xy"]
h, w = det["frame_hw"]
# Distanz-Shaping (euklidisch, diagonal-normiert)
diag = float(np.hypot(h, w)) if (h > 0 and w > 0) else 1.0
if tf and ff:
dist_px = float(np.hypot(tx - fx, ty - fy))
dist_nd = dist_px / diag
if self.prev_dist_to_flower_nd is not None:
delta = self.prev_dist_to_flower_nd - dist_nd
reward += self.shaping_gain * delta
self.prev_dist_to_flower_nd = dist_nd
else:
self.prev_dist_to_flower_nd = None
# Eat-Event (Wenn die Blume woanders neu spawnt)
if self._cooldown == 0 and tf:
if not ff:
pyautogui.press("r")
print("Flower Reset")
self._cooldown = self.contact_cooldown_frames
else:
if self.prev_flower_x is None:
self.prev_flower_x = fx
if self.prev_flower_y is None:
self.prev_flower_y = fy
if abs(self.prev_flower_x - fx) >= float(w/20 if w > 0 else 1) and abs(self.prev_flower_y - fy) >= float(h/15 if h > 0 else 1):
print("Blume gegessen!")
reward += self.eat_reward
self._cooldown = self.contact_cooldown_frames
self.flowers_eaten += 1
self.prev_flower_x = fx
self.prev_flower_y = fy
# Überprüfung auf Kollision mit Bombe / Game Over Screen Farben
rgb_data = self.raw[:, :, :3]
target_color = np.array([114, 111, 84])
tolerance = 10
found_blue = np.any(np.all(np.abs(rgb_data - target_color) <= tolerance, axis=-1))
if found_blue:
print("In Bombe gelaufen!")
reward -= self.collision_penalty
time.sleep(0.5)
pyautogui.hotkey("ctrl", "p")
time.sleep(0.5)
print(reward)
return float(reward)
# ---------------- Hilfsinfo ----------------
@staticmethod
def _bombs_expected(flowers_eaten: int) -> int:
return max(0, flowers_eaten // 5)