gpt-agent/ai/compute.py
2025-05-19 20:21:30 +02:00

131 lines
4.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import pyautogui
import threading
import pytesseract
import time, io, base64
import sys
from objects.inputs import MouseInput, KeyboardInput, ButtonType
from PIL import ImageGrab, ImageDraw # type: ignore
from objects import logger as logger_module
import logging
logger: logging.Logger = logger_module.get_logger(__name__)
def take_screenshot(cross_position: list[tuple[int, int]] | None = None) -> bytes:
"""Take a screenshot of the current screen and return it as bytes."""
screenshot = ImageGrab.grab()
buf = io.BytesIO()
# Optionally draw a crosshair at the specified position
if cross_position:
for pos in cross_position:
x, y = pos
draw = ImageDraw.Draw(screenshot)
size = 20 # halflength of each arm
color = (255, 0, 0)
width = 2
# horizontal line
draw.line((x - size, y, x + size, y), fill=color, width=width)
# vertical line
draw.line((x, y - size, x, y + size), fill=color, width=width)
screenshot.save(buf, format='PNG')
# save in a file
screenshot.save("screenshot.png", format='PNG')
return buf.getvalue()
def perform_ocr(screenshot: bytes) -> list[dict]:
"""Perform OCR on screenshot bytes and return list of text blocks with positions."""
from PIL import Image # type: ignore
import io
# open image from bytes
img = Image.open(io.BytesIO(screenshot))
# perform OCR, get data dictionary
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
results = []
n = len(data.get('level', []))
for i in range(n):
text = data['text'][i]
if text and text.strip():
results.append({
'text': text,
'x': data['width'][i] + data['left'][i] // 2, # center x position
'y': data['height'][i] + data['top'][i] // 2 # center y position
})
# check if debug is enabled
if logging.getLogger().isEnabledFor(logging.DEBUG):
# take screenshot + put blue circle with x, y on screenshot for each component
screenshot_with_circles = Image.open(io.BytesIO(screenshot))
draw = ImageDraw.Draw(screenshot_with_circles)
for result in results:
x, y = result['x'], result['y']
size = 10
color = (0, 0, 255) # blue
width = 2
# horizontal line
draw.line((x - size, y, x + size, y), fill=color, width=width)
# vertical line
draw.line((x, y - size, x, y + size), fill=color, width=width)
screenshot_with_circles.save("screenshot_with_circles.png", format='PNG')
# save in a file
logger.debug("Debug, saving ocr results screenshot with circles")
screenshot_with_circles.save("ocr_results.png", format='PNG')
return results
def screenshot_to_base64(screenshot: bytes) -> str:
"""Convert screenshot bytes to a base64 encoded string."""
return base64.b64encode(screenshot).decode('utf-8')
def show_click_indicator(x: int, y: int, duration: float = 2.0, size: int = 50) -> None:
"""Display a red circle at (x, y) for the given duration, can be clicked through."""
pass
def press_mouse(mouse_input: MouseInput) -> None:
"""Presses mouse buttons at the given position."""
x, y = mouse_input.x, mouse_input.y
button = mouse_input.click_type
if button == "left":
pyautogui.click(x, y, button='left')
elif button == "double_left":
pyautogui.doubleClick(x, y)
elif button == "right":
pyautogui.click(x, y, button='right')
elif button == "middle":
pyautogui.click(x, y, button='middle')
# Show red circle indicator at click position for 2 seconds
threading.Thread(target=show_click_indicator, args=(x, y), daemon=True).start()
def press_keyboard(keyboard_input: KeyboardInput) -> None:
"""Types the given sequence of keys."""
text = keyboard_input.text
if text:
pyautogui.typewrite(text)
if keyboard_input.press_enter:
pyautogui.press('enter')
def wait(duration: float) -> None:
"""Waits for the specified duration in seconds."""
time.sleep(duration)
def search_pc(query: str) -> None:
"""Presses the Windows key."""
pyautogui.hotkey('win')
wait(2)
press_keyboard(KeyboardInput(text=query))
def reprompt(nextsteps: str, processor) -> None:
"""Re-execute GPT and take a new screenshot."""
scr = screenshot_to_base64(take_screenshot())
return processor.process(nextsteps, img_data=scr)
def _execute(name, args=[], processor=None):
if name == "click_button":
press_mouse(MouseInput(**args))
elif name == "type_text":
press_keyboard(KeyboardInput(**args))
elif name == "wait":
wait(**args)
elif name == "search_pc":
search_pc(**args)
elif name == "reprompt":
reprompt(**args, processor=processor)