396 lines
15 KiB
Python
396 lines
15 KiB
Python
import gymnasium as gym
|
|
from gymnasium import spaces
|
|
import numpy as np
|
|
import mss
|
|
import cv2
|
|
import pyautogui
|
|
import time
|
|
from typing import Tuple, Optional, List
|
|
|
|
|
|
# ---------------- Hilfsfunktionen ----------------
|
|
def _centroid_from_mask(mask: np.ndarray) -> Tuple[Optional[int], Optional[int], int]:
|
|
cnt = int(cv2.countNonZero(mask))
|
|
if cnt == 0:
|
|
return None, None, 0
|
|
M = cv2.moments(mask)
|
|
if M["m00"] == 0:
|
|
return None, None, cnt
|
|
return int(M["m10"] / M["m00"]), int(M["m01"] / M["m00"]), cnt
|
|
|
|
|
|
def _centroids_from_contours(
|
|
mask: np.ndarray,
|
|
ui_exclude_rects: List[Tuple[float, float, float, float]],
|
|
min_area_px: int,
|
|
circ_min: float,
|
|
aspect_tol: float,
|
|
extent_min: float,
|
|
solidity_min: float,
|
|
) -> List[Tuple[int, int, int]]:
|
|
"""
|
|
Liefert (cx, cy, area) für Konturen, die Bomben-Formkriterien erfüllen.
|
|
|
|
ui_exclude_rects akzeptiert Einträge entweder in Pixeln (x0,y0,x1,y1)
|
|
oder normiert (0..1). Normierte Werte werden auf die aktuelle Framegröße
|
|
umgerechnet.
|
|
"""
|
|
h, w = mask.shape
|
|
|
|
# UI-Zonen ausmaskieren (Pixel- oder Normalformate unterstützen)
|
|
if ui_exclude_rects:
|
|
for (x0, y0, x1, y1) in ui_exclude_rects:
|
|
# Wenn alle Koordinaten in [0,1], als normierte Eingaben interpretieren
|
|
if 0.0 <= x0 <= 1.0 and 0.0 <= x1 <= 1.0 and 0.0 <= y0 <= 1.0 and 0.0 <= y1 <= 1.0:
|
|
px0 = int(round(x0 * w))
|
|
px1 = int(round(x1 * w))
|
|
py0 = int(round(y0 * h))
|
|
py1 = int(round(y1 * h))
|
|
else:
|
|
px0, py0, px1, py1 = int(x0), int(y0), int(x1), int(y1)
|
|
|
|
# clamp
|
|
px0 = max(0, min(w, px0))
|
|
px1 = max(0, min(w, px1))
|
|
py0 = max(0, min(h, py0))
|
|
py1 = max(0, min(h, py1))
|
|
if py0 < py1 and px0 < px1:
|
|
mask[py0:py1, px0:px1] = 0
|
|
|
|
contours, _ = cv2.findContours(mask, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
out = []
|
|
for c in contours:
|
|
area = float(cv2.contourArea(c))
|
|
if area < float(min_area_px):
|
|
continue
|
|
|
|
x, y, w_b, h_b = cv2.boundingRect(c)
|
|
if h_b == 0 or w_b == 0:
|
|
continue
|
|
aspect = w_b / float(h_b)
|
|
if not (1.0 - aspect_tol <= aspect <= 1.0 + aspect_tol):
|
|
continue
|
|
|
|
per = float(cv2.arcLength(c, True))
|
|
if per <= 0:
|
|
continue
|
|
circularity = 4.0 * np.pi * area / (per * per)
|
|
if circularity < circ_min:
|
|
continue
|
|
|
|
hull = cv2.convexHull(c)
|
|
hull_area = float(cv2.contourArea(hull))
|
|
if hull_area <= 0:
|
|
continue
|
|
solidity = area / hull_area
|
|
extent = area / float(w_b * h_b)
|
|
|
|
if solidity < solidity_min or extent < extent_min:
|
|
continue
|
|
|
|
M = cv2.moments(c)
|
|
if M["m00"] == 0:
|
|
continue
|
|
cx = int(M["m10"] / M["m00"])
|
|
cy = int(M["m01"] / M["m00"])
|
|
out.append((cx, cy, int(area)))
|
|
return out
|
|
|
|
|
|
# ---------------- Environment ----------------
|
|
class FlowerGameEnv(gym.Env):
|
|
"""
|
|
Beobachtung (nur positionsbasiert, kein Bild im Learning-Interface!):
|
|
obs = {"state": [tx, ty, fx, fy, bx, by]}
|
|
Alle Werte in [0,1] relativ zur aktuellen Framebreite/-höhe.
|
|
(bx,by) ist die nächste gültige Bombe relativ zur Turtle, sonst 0.
|
|
|
|
Actions: 0=W, 1=A, 2=S, 3=D
|
|
|
|
Rewards (größeninvariant, **achsenweise normiert**):
|
|
+1.0 bei Kontakt, definiert als Rechtecktest um die Turtle:
|
|
|tx - fx|/w <= eat_x_nd/2 UND |ty - fy|/h <= eat_y_nd/2
|
|
+shaping_gain * (eukl. Distanzabnahme zur Blume) [Distanz/diag]
|
|
-5.0 wenn irgendeine Bombe im Kollisionsrechteck liegt:
|
|
|tx - bx|/w <= collision_x_nd/2 UND |ty - by|/h <= collision_y_nd/2
|
|
|
|
Größeninvarianz:
|
|
- Schwellen sind feste Bruchteile der **Breite** bzw. **Höhe** eines
|
|
festen Referenzmaßes (ref_size). Standardmäßig (1900,1263), passend
|
|
zu deinem Setup; kann überschrieben werden.
|
|
- Bomben-Minimalfläche als Anteil an der Referenzfläche (w_ref*h_ref)
|
|
und pro Frame in Pixel umgerechnet.
|
|
- UI-Exclude-Rects: optional normiert (0..1) oder in Pixeln.
|
|
"""
|
|
|
|
metadata = {"render_modes": []}
|
|
|
|
def __init__(
|
|
self,
|
|
monitor_area,
|
|
ui_exclude_rects: Optional[List[Tuple[float, float, float, float]]] = None,
|
|
ref_size: Optional[Tuple[int, int]] = (1900, 1263), # feste Baseline
|
|
):
|
|
super().__init__()
|
|
self.monitor_area = monitor_area
|
|
self.sct = mss.mss()
|
|
|
|
# --- Observation & Actions (nur STATE) ---
|
|
self.observation_space = spaces.Dict(
|
|
{
|
|
"state": spaces.Box(low=0.0, high=1.0, shape=(6,), dtype=np.float32)
|
|
}
|
|
)
|
|
self.action_space = spaces.Discrete(4)
|
|
|
|
# --- HSV-Grenzen ---
|
|
self.yellow_lower = np.array([15, 40, 200], dtype=np.uint8)
|
|
self.yellow_upper = np.array([25, 120, 255], dtype=np.uint8)
|
|
self.white_lower = np.array([0, 0, 220], dtype=np.uint8)
|
|
self.white_upper = np.array([180, 50, 255], dtype=np.uint8)
|
|
self.black_lower = np.array([0, 0, 0], dtype=np.uint8)
|
|
self.black_upper = np.array([180, 80, 60], dtype=np.uint8)
|
|
self.green1_lower = np.array([30, 80, 80], dtype=np.uint8)
|
|
self.green1_upper = np.array([45, 255, 255], dtype=np.uint8)
|
|
self.green2_lower = np.array([65, 100, 80], dtype=np.uint8)
|
|
self.green2_upper = np.array([90, 255, 255], dtype=np.uint8)
|
|
|
|
self.kernel = np.ones((3, 3), np.uint8)
|
|
|
|
# --- Rechteckige Default-Schwellen (volle Breite/Höhe in Pixel) ---
|
|
self._eat_x_px_default = 320
|
|
self._eat_y_px_default = 220
|
|
self._collision_x_px_default = 320
|
|
self._collision_y_px_default = 220
|
|
self._bomb_min_area_px_default = 400
|
|
|
|
# --- Feste Referenzgröße (größeninvariante Bruchteile) ---
|
|
if ref_size is not None:
|
|
w_ref, h_ref = int(ref_size[0]), int(ref_size[1])
|
|
else:
|
|
w_ref = int(self.monitor_area.get("width", 1))
|
|
h_ref = int(self.monitor_area.get("height", 1))
|
|
self.w_ref = max(1, w_ref)
|
|
self.h_ref = max(1, h_ref)
|
|
|
|
# --- Bruchteile relativ zu w_ref/h_ref (achsenweise Normierung) ---
|
|
# Semantik: Werte beziehen sich auf die **volle** Rechteckbreite/-höhe;
|
|
# in den Tests werden Halbachsen (= */2) verwendet.
|
|
self.eat_x_nd = float(self._eat_x_px_default) / float(self.w_ref)
|
|
self.eat_y_nd = float(self._eat_y_px_default) / float(self.h_ref)
|
|
self.collision_x_nd = float(self._collision_x_px_default) / float(self.w_ref)
|
|
self.collision_y_nd = float(self._collision_y_px_default) / float(self.h_ref)
|
|
self.bomb_min_area_frac = float(self._bomb_min_area_px_default) / float(self.w_ref * self.h_ref)
|
|
|
|
# Reward-/Heuristik-Parameter (dimensionslos)
|
|
self.shaping_gain = 1.0
|
|
self.eat_reward = 1.0
|
|
self.collision_penalty = 5.0
|
|
|
|
# Event-Cooldown
|
|
self.contact_cooldown_frames = 8
|
|
self._cooldown = 0
|
|
self.prev_dist_to_flower_nd = None
|
|
self.prev_flower_x = None
|
|
self.prev_flower_y = None
|
|
self.flowers_eaten = 0
|
|
|
|
# Bomben-Filter (konstant, bis auf min_area -> wird aus frac abgeleitet)
|
|
self.bomb_circ_min = 0.60
|
|
self.bomb_aspect_tol = 0.35
|
|
self.bomb_extent_min = 0.60
|
|
self.bomb_solidity_min = 0.85
|
|
|
|
# UI-Ausschlusszonen (px oder normiert), relativ zum monitor_area
|
|
self.ui_exclude_rects = ui_exclude_rects or []
|
|
|
|
# Cache
|
|
self._last_cache = {
|
|
"turtle_xy": (None, None),
|
|
"flower_xy": (None, None),
|
|
"bombs_xy": [],
|
|
"turtle_found": False,
|
|
"flower_found": False,
|
|
"frame_hw": (1, 1),
|
|
}
|
|
|
|
self.raw = None;
|
|
|
|
# ---------------- Gymnasium API ----------------
|
|
def reset(self, seed=None, options=None):
|
|
super().reset(seed=seed)
|
|
self.prev_dist_to_flower_nd = None
|
|
self._cooldown = 0
|
|
self.flowers_eaten = 0
|
|
obs = self._build_observation()
|
|
return obs, {}
|
|
|
|
def step(self, action):
|
|
if action == 0:
|
|
pyautogui.press("w")
|
|
elif action == 1:
|
|
pyautogui.press("a")
|
|
elif action == 2:
|
|
pyautogui.press("s")
|
|
elif action == 3:
|
|
pyautogui.press("d")
|
|
|
|
time.sleep(0.01)
|
|
|
|
obs = self._build_observation()
|
|
reward = self._calculate_reward()
|
|
if self._cooldown > 0:
|
|
self._cooldown -= 1
|
|
|
|
info = {
|
|
"flowers_eaten": self.flowers_eaten,
|
|
"bombs_expected": self._bombs_expected(self.flowers_eaten),
|
|
"bombs_detected": len(self._last_cache["bombs_xy"]),
|
|
# Semantik: *_nd sind Bruchteile von w bzw. h (nicht Diagonale)
|
|
"eat_x_nd": self.eat_x_nd,
|
|
"eat_y_nd": self.eat_y_nd,
|
|
"collision_x_nd": self.collision_x_nd,
|
|
"collision_y_nd": self.collision_y_nd,
|
|
# Anteil bezogen auf Referenzfläche, in Pixel pro Frame: frac * (w*h)
|
|
"bomb_min_area_frac": self.bomb_min_area_frac,
|
|
"ref_size": (self.w_ref, self.h_ref),
|
|
}
|
|
return obs, reward, False, False, info
|
|
|
|
# ---------------- Erkennung & Beobachtung ----------------
|
|
def _grab_bgr(self) -> np.ndarray:
|
|
raw = np.array(self.sct.grab(self.monitor_area)) # BGRA
|
|
self.raw = raw
|
|
return cv2.cvtColor(raw, cv2.COLOR_BGRA2BGR)
|
|
|
|
def _detect_entities(self, frame_bgr):
|
|
h, w, _ = frame_bgr.shape
|
|
hsv = cv2.cvtColor(frame_bgr, cv2.COLOR_BGR2HSV)
|
|
|
|
# Flower (weiß ∧ gelb)
|
|
mw = cv2.inRange(hsv, self.white_lower, self.white_upper)
|
|
my = cv2.inRange(hsv, self.yellow_lower, self.yellow_upper)
|
|
mw = cv2.morphologyEx(mw, cv2.MORPH_DILATE, self.kernel, iterations=1)
|
|
my = cv2.morphologyEx(my, cv2.MORPH_DILATE, self.kernel, iterations=1)
|
|
mf = cv2.bitwise_and(mw, my)
|
|
mf = cv2.morphologyEx(mf, cv2.MORPH_CLOSE, self.kernel, iterations=1)
|
|
fx, fy, _ = _centroid_from_mask(mf)
|
|
flower_found = (fx is not None and fy is not None)
|
|
|
|
# Bomben (mit formbasierter Filterung & UI-Exklusion); min_area in px aus **aktueller** Framefläche ableiten
|
|
mb = cv2.inRange(hsv, self.black_lower, self.black_upper)
|
|
min_area_px = max(1, int(round(self.bomb_min_area_frac * h * w)))
|
|
bombs_xy = _centroids_from_contours(
|
|
mb.copy(),
|
|
self.ui_exclude_rects,
|
|
min_area_px=min_area_px,
|
|
circ_min=self.bomb_circ_min,
|
|
aspect_tol=self.bomb_aspect_tol,
|
|
extent_min=self.bomb_extent_min,
|
|
solidity_min=self.bomb_solidity_min,
|
|
)
|
|
|
|
# Turtle (grün: zwei Bereiche OR)
|
|
g1 = cv2.inRange(hsv, self.green1_lower, self.green1_upper)
|
|
g2 = cv2.inRange(hsv, self.green2_lower, self.green2_upper)
|
|
mg = cv2.bitwise_or(g1, g2)
|
|
mg = cv2.morphologyEx(mg, cv2.MORPH_OPEN, self.kernel, iterations=1)
|
|
mg = cv2.morphologyEx(mg, cv2.MORPH_DILATE, self.kernel, iterations=1)
|
|
tx, ty, _ = _centroid_from_mask(mg)
|
|
turtle_found = (tx is not None and ty is not None)
|
|
|
|
# State: nächste Bombe relativ zur Turtle (normierte Koordinaten)
|
|
def nxy(x, y):
|
|
if x is None or y is None:
|
|
return 0.0, 0.0
|
|
return x / float(w), y / float(h)
|
|
|
|
nbx, nby = 0.0, 0.0
|
|
if turtle_found and bombs_xy:
|
|
txf, tyf = float(tx), float(ty)
|
|
dists = [(np.hypot(bx - txf, by - tyf), (bx, by)) for (bx, by, _a) in bombs_xy]
|
|
_, (nbx_px, nby_px) = min(dists, key=lambda x: x[0])
|
|
nbx, nby = nxy(nbx_px, nby_px)
|
|
|
|
n_tx, n_ty = nxy(tx, ty)
|
|
n_fx, n_fy = nxy(fx, fy)
|
|
|
|
return {
|
|
"state_norm": np.array([n_tx, n_ty, n_fx, n_fy, nbx, nby], dtype=np.float32),
|
|
"turtle_xy": (tx, ty),
|
|
"flower_xy": (fx, fy),
|
|
"bombs_xy": bombs_xy,
|
|
"turtle_found": turtle_found,
|
|
"flower_found": flower_found,
|
|
"frame_hw": (h, w),
|
|
}
|
|
|
|
def _build_observation(self):
|
|
frame_bgr = self._grab_bgr()
|
|
det = self._detect_entities(frame_bgr)
|
|
self._last_cache = det
|
|
# Nur der positionsbasierte Zustand wird als Beobachtung exponiert
|
|
return {"state": det["state_norm"]}
|
|
|
|
# ---------------- Rewards ----------------
|
|
def _calculate_reward(self) -> float:
|
|
det = self._last_cache
|
|
reward = 0.0
|
|
|
|
tx, ty = det["turtle_xy"]
|
|
fx, fy = det["flower_xy"]
|
|
tf = det["turtle_found"]
|
|
ff = det["flower_found"]
|
|
bombs_xy = det["bombs_xy"]
|
|
h, w = det["frame_hw"]
|
|
|
|
# Distanz-Shaping (euklidisch, diagonal-normiert)
|
|
diag = float(np.hypot(h, w)) if (h > 0 and w > 0) else 1.0
|
|
if tf and ff:
|
|
dist_px = float(np.hypot(tx - fx, ty - fy))
|
|
dist_nd = dist_px / diag
|
|
if self.prev_dist_to_flower_nd is not None:
|
|
delta = self.prev_dist_to_flower_nd - dist_nd
|
|
reward += self.shaping_gain * delta
|
|
self.prev_dist_to_flower_nd = dist_nd
|
|
else:
|
|
self.prev_dist_to_flower_nd = None
|
|
|
|
# Eat-Event (Wenn die Blume woanders neu spawnt)
|
|
if self._cooldown == 0 and tf:
|
|
if not ff:
|
|
pyautogui.press("r")
|
|
else:
|
|
if self.prev_flower_x is None:
|
|
self.prev_flower_x = fx
|
|
if self.prev_flower_y is None:
|
|
self.prev_flower_y = fy
|
|
if abs(self.prev_flower_x - fx) >= float(w/20 if w > 0 else 1) and abs(self.prev_flower_y - fy) >= float(h/15 if h > 0 else 1):
|
|
print("Blume gegessen!")
|
|
reward += self.eat_reward
|
|
self._cooldown = self.contact_cooldown_frames
|
|
self.flowers_eaten += 1
|
|
self.prev_flower_x = fx
|
|
self.prev_flower_y = fy
|
|
|
|
# Überprüfung auf Kollision mit Bombe / Game Over Screen Farben
|
|
rgb_data = self.raw[:, :, :3]
|
|
target_color = np.array([114, 111, 84])
|
|
tolerance = 10
|
|
found_blue = np.any(np.all(np.abs(rgb_data - target_color) <= tolerance, axis=-1))
|
|
if found_blue:
|
|
print("In Bombe gelaufen!")
|
|
reward -= self.collision_penalty
|
|
time.sleep(0.5)
|
|
pyautogui.hotkey("ctrl", "p")
|
|
time.sleep(0.5)
|
|
|
|
print(reward)
|
|
|
|
return float(reward)
|
|
|
|
# ---------------- Hilfsinfo ----------------
|
|
@staticmethod
|
|
def _bombs_expected(flowers_eaten: int) -> int:
|
|
return max(0, flowers_eaten // 5)
|