Compare commits

..

38 Commits

Author SHA1 Message Date
6b13586154 wip 2025-05-19 20:45:50 +02:00
7192f4bc18 fix: update tool call content formatting and enhance mouse click description 2025-05-19 20:42:18 +02:00
36cfeffe9c fix: correct center-point calculation in OCR results 2025-05-19 20:35:11 +02:00
7f5296b2ef fix: increase wait time in search_pc function for improved reliability 2025-05-19 20:33:54 +02:00
e5b3ea8b57 fix: logger 2025-05-19 20:30:01 +02:00
ff7c362cfe fix 2025-05-19 20:28:01 +02:00
b035bee682 fix 2025-05-19 20:21:30 +02:00
c2fb041285 fix: truncate message content and image data to improve processing efficiency 2025-05-19 20:20:00 +02:00
4369611610 fix: add debug logging and visual indicators for OCR results 2025-05-19 20:19:36 +02:00
93a01b792b fix: update OCR result coordinates to center position 2025-05-19 20:11:56 +02:00
3d5f71ec84 fix 2025-05-19 17:19:24 +02:00
20f05ca991 fix: emphasize priority in search_pc function description 2025-05-19 17:09:21 +02:00
859e1c2f0b fix: missing bracket 2025-05-19 17:07:03 +02:00
d9a9eba4c7 updated win func 2025-05-19 17:05:38 +02:00
b89051a37f fix 2025-05-19 17:02:48 +02:00
72a876410c more context to gpt 2025-05-19 16:51:46 +02:00
46a5bce956 refactor: Update function descriptions for clarity and consistency 2025-05-19 16:41:02 +02:00
e639e1edd3 refactor: Rename press_windows_key to windows_key for consistency 2025-05-19 16:33:59 +02:00
9bd15d45c5 feat: Add functionality to press Windows key and update function registry 2025-05-19 16:32:09 +02:00
105ab4a04b feat: wip: give OCR+positions 2025-05-19 16:10:02 +02:00
5be7f9aadb feat: Add OCR functionality to process method; integrate Tesseract for text extraction from screenshots 2025-05-19 15:59:46 +02:00
20764d5d19 fix: Simplify click position extraction for screenshot crosshair in tool execution 2025-05-19 13:43:04 +02:00
158529a2bd fix: Parse tool call arguments as JSON for improved handling in process method 2025-05-19 13:41:25 +02:00
b583094e20 fix: Enhance screenshot functionality; add crosshair drawing and save screenshot to file 2025-05-19 13:39:26 +02:00
d7c4f9b0cb fix: Update image handling in process method; ensure only the last two messages retain images and improve debugging output 2025-05-19 13:27:16 +02:00
035252c146 fix: Enhance logging for tool calls in process method; handle potential errors in next steps assignment 2025-05-19 13:21:15 +02:00
892f41f78a fix: Shorten image data in message copies for better debugging; update logging to reflect changes 2025-05-19 13:17:51 +02:00
0af7dc7699 fix: bug 2025-05-19 13:14:38 +02:00
2bcddedca5 fix: Adjust message handling in process method; ensure correct image assignment and add next steps output 2025-05-19 13:13:28 +02:00
b881f04acc fix: Update process method return type and handle image attribute correctly; improve error handling 2025-05-19 13:10:46 +02:00
670066100f feat: Implement logging functionality; add logger configuration and retrieval 2025-05-19 13:05:36 +02:00
52c455b20c fix: Remove unused PyQt5 and tkinter overlay code; simplify click indicator function 2025-05-19 12:58:34 +02:00
a4e078bc19 tempfix: remove mouse overlay 2025-05-19 12:51:59 +02:00
1925a77d85 Add screenshot re-execution logic in AIProcessor; append outputs from tool calls 2025-05-19 09:34:21 +02:00
e573ecb553 Add confirmation function and re-execution logic in AIProcessor; clean up web server request handling 2025-05-19 09:30:58 +02:00
f7feb12946 Add screenshot functionality and new commands for wait and reprompt 2025-05-19 09:15:08 +02:00
66330bfc73 Implement click indicator with red circle display; update server run parameters 2025-05-19 09:00:39 +02:00
41f7d0e210 Refactor mouse button handling to use string literals instead of ButtonType constants; add debug print for screenshot action in web server 2025-05-19 08:53:46 +02:00
7 changed files with 310 additions and 22 deletions

View File

@@ -1,18 +1,99 @@
import pyautogui import pyautogui
import threading
import pytesseract
import time, io, base64
import sys
from objects.inputs import MouseInput, KeyboardInput, ButtonType from objects.inputs import MouseInput, KeyboardInput, ButtonType
from PIL import ImageGrab, ImageDraw # type: ignore
from objects import logger as logger_module
import logging
logger: logging.Logger = logger_module.get_logger(__name__)
def take_screenshot(cross_position: list[tuple[int, int]] | None = None) -> bytes:
"""Take a screenshot of the current screen and return it as bytes."""
screenshot = ImageGrab.grab()
buf = io.BytesIO()
# Optionally draw a crosshair at the specified position
if cross_position:
for pos in cross_position:
x, y = pos
draw = ImageDraw.Draw(screenshot)
size = 20 # halflength of each arm
color = (255, 0, 0)
width = 2
# horizontal line
draw.line((x - size, y, x + size, y), fill=color, width=width)
# vertical line
draw.line((x, y - size, x, y + size), fill=color, width=width)
screenshot.save(buf, format='PNG')
# save in a file
screenshot.save("screenshot.png", format='PNG')
return buf.getvalue()
def perform_ocr(screenshot: bytes) -> list[dict]:
"""Perform OCR on screenshot bytes and return list of text blocks with positions."""
from PIL import Image # type: ignore
import io
# open image from bytes
img = Image.open(io.BytesIO(screenshot))
# perform OCR, get data dictionary
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
results = []
n = len(data.get('level', []))
for i in range(n):
text = data['text'][i]
if text and text.strip():
# Fix the center-point calculation (add first, then divide)
results.append({
'text': text,
'x': data['left'][i] + data['width'][i] // 2,
'y': data['top'][i] + data['height'][i] // 2
})
# check if debug is enabled
if logger.isEnabledFor(logging.DEBUG):
# take screenshot + put blue circle with x, y on screenshot for each component
screenshot_with_circles = Image.open(io.BytesIO(screenshot))
draw = ImageDraw.Draw(screenshot_with_circles)
for result in results:
x, y = result['x'], result['y']
size = 10
color = (0, 0, 255) # blue
width = 2
# horizontal line
draw.line((x - size, y, x + size, y), fill=color, width=width)
# vertical line
draw.line((x, y - size, x, y + size), fill=color, width=width)
screenshot_with_circles.save("screenshot_with_circles.png", format='PNG')
# save in a file
logger.debug("Debug, saving ocr results screenshot with circles")
screenshot_with_circles.save("ocr_results.png", format='PNG')
return results
def screenshot_to_base64(screenshot: bytes) -> str:
"""Convert screenshot bytes to a base64 encoded string."""
return base64.b64encode(screenshot).decode('utf-8')
def show_click_indicator(x: int, y: int, duration: float = 2.0, size: int = 50) -> None:
"""Display a red circle at (x, y) for the given duration, can be clicked through."""
pass
def press_mouse(mouse_input: MouseInput) -> None: def press_mouse(mouse_input: MouseInput) -> None:
"""Presses mouse buttons at the given position.""" """Presses mouse buttons at the given position."""
x, y = mouse_input.x, mouse_input.y x, y = mouse_input.x, mouse_input.y
button = mouse_input.click_type button = mouse_input.click_type
if button == ButtonType.LEFT: if button == "left":
pyautogui.click(x, y, button='left') pyautogui.click(x, y, button='left')
elif button == ButtonType.DOUBLE_LEFT: elif button == "double_left":
pyautogui.doubleClick(x, y) pyautogui.doubleClick(x, y)
elif button == ButtonType.RIGHT: elif button == "right":
pyautogui.click(x, y, button='right') pyautogui.click(x, y, button='right')
elif button == ButtonType.MIDDLE: elif button == "middle":
pyautogui.click(x, y, button='middle') pyautogui.click(x, y, button='middle')
# Show red circle indicator at click position for 2 seconds
threading.Thread(target=show_click_indicator, args=(x, y), daemon=True).start()
def press_keyboard(keyboard_input: KeyboardInput) -> None: def press_keyboard(keyboard_input: KeyboardInput) -> None:
"""Types the given sequence of keys.""" """Types the given sequence of keys."""
@@ -22,8 +103,29 @@ def press_keyboard(keyboard_input: KeyboardInput) -> None:
if keyboard_input.press_enter: if keyboard_input.press_enter:
pyautogui.press('enter') pyautogui.press('enter')
def _execute(name, args): def wait(duration: float) -> None:
"""Waits for the specified duration in seconds."""
time.sleep(duration)
def search_pc(query: str) -> None:
"""Presses the Windows key."""
pyautogui.hotkey('win')
wait(4)
press_keyboard(KeyboardInput(text=query))
def reprompt(nextsteps: str, processor) -> None:
"""Re-execute GPT and take a new screenshot."""
scr = screenshot_to_base64(take_screenshot())
return processor.process(nextsteps, img_data=scr)
def _execute(name, args=[], processor=None):
if name == "click_button": if name == "click_button":
press_mouse(MouseInput(**args)) press_mouse(MouseInput(**args))
elif name == "type_text": elif name == "type_text":
press_keyboard(KeyboardInput(**args)) press_keyboard(KeyboardInput(**args))
elif name == "wait":
wait(**args)
elif name == "search_pc":
search_pc(**args)
elif name == "reprompt":
reprompt(**args, processor=processor)

View File

@@ -1,8 +1,14 @@
import traceback import traceback
import json # new import json
import openai import openai
import base64
from flask import jsonify
from objects import aic from objects import aic
import ai.compute import ai.compute
from objects import logger as logger_module
import logging
logger: logging.Logger = logger_module.get_logger(__name__)
class AIProcessor: class AIProcessor:
def __init__(self, api_key: str, model: str = "gpt-4.1"): def __init__(self, api_key: str, model: str = "gpt-4.1"):
@@ -34,11 +40,17 @@ class AIProcessor:
return f"Error executing {name}: {e}" return f"Error executing {name}: {e}"
# -------------------------- main entry -------------------------- # # -------------------------- main entry -------------------------- #
def process(self, prompt: str, img_data: str | bytes | None = None) -> str | list[dict]: def process(self, prompt: str, img_data: str | bytes | None = None) -> list[str | dict]:
outputs = [] # type: list[str | dict]
reexec = True
click_positions = [] # used for screenshot crosshair position
nextsteps = ""
try: try:
# append user prompt with optional image
self.session.messages.append( self.session.messages.append(
aic.Message(role="user", content=prompt, image=img_data) aic.Message(role="user", content=prompt, image=img_data)
) )
# if image provided, perform OCR and include text positions
response = self.oai.chat.completions.create( response = self.oai.chat.completions.create(
model=self.model, model=self.model,
messages=self.session.messages_dict(), messages=self.session.messages_dict(),
@@ -49,25 +61,84 @@ class AIProcessor:
tool_calls = getattr(response.choices[0].message, "tool_calls", None) tool_calls = getattr(response.choices[0].message, "tool_calls", None)
if tool_calls: if tool_calls:
for tc in tool_calls: for tc in tool_calls:
ai.compute._execute( ags = json.loads(tc.function.arguments)
name=tc.function.name, logger.debug(
args=json.loads(tc.function.arguments) "Processing tool call: %s with arguments: %s",
tc.function.name,
tc.function.arguments,
) )
if tc.function.name == "confirm":
reexec = False
try:
nextsteps = ags.get("goal", "")
except:
nextsteps = str(tc.function.arguments)
print('ERROR NEXT STEPS IS STR, ', nextsteps)
if tc.function.name == "click_button":
# extract click position for screenshot crosshair
click_positions.append((ags.get("x", 0), ags.get("y", 0)))
r = ai.compute._execute(
name=tc.function.name,
args=json.loads(tc.function.arguments),
processor=self,
)
outputs.append(r) if r else None
# Make sure every images except the two last are removed
for msg in self.session.messages[:-3]:
if msg.image and not msg.disable_image:
msg.image = None
# copy of self.session.messages, but shorten the image data for better debugging
cps = [
aic.Message(
role=msg.role,
content=msg.content[:80],
image=msg.image[:20] if isinstance(msg.image, str) else None, # type: ignore
disable_image=msg.disable_image,
name=msg.name,
)
for msg in self.session.messages
]
logger.debug(
"Shortened message copies for processing: %s", cps
)
if reexec:
img_bytes = ai.compute.take_screenshot(cross_position=click_positions)
img = ai.compute.screenshot_to_base64(
img_bytes
)
ocr_results = []
try:
ocr_results = ai.compute.perform_ocr(img_bytes)
except Exception as e:
traceback.print_exc()
logger.debug("OCR failed: %s", e)
self.session.messages.append(
aic.Message(
role="assistant",
content=str(((tc.function.name, tc.function.arguments) for tc in tool_calls)),
)
)
outputs.extend( self.process(nextsteps+f"\nOCR Positions: {ocr_results}", img) )
return [ return [
{ {
"name": tc.function.name, "name": tc.function.name,
"arguments": json.loads(tc.function.arguments), "arguments": json.loads(tc.function.arguments),
} }
for tc in tool_calls for tc in tool_calls
] ] + outputs # type: ignore
# otherwise return final assistant content # otherwise return final assistant content
print(f"Response: {json.dumps(response.to_dict(), indent=4)}") # debug print(f"Response: {json.dumps(response.to_dict(), indent=4)}") # debug
output_text: str = response.choices[0].message.content # type: ignore output_text: str = response.choices[0].message.content # type: ignore
outputs.append(output_text)
self.session.messages.append( self.session.messages.append(
aic.Message(role="assistant", content=output_text) aic.Message(role="assistant", content="Executed: " + (str(*outputs)))
) )
return output_text
return [*outputs]
except Exception as e: except Exception as e:
traceback.print_exc() traceback.print_exc()
return f"Error processing request: {str(e)}" return [f"Error processing request: {str(e)}"]

View File

@@ -12,7 +12,7 @@ def main():
model=os.getenv("OPENAI_MODEL", "gpt-4.1") model=os.getenv("OPENAI_MODEL", "gpt-4.1")
) )
server = webserver.web.WebServerApp(aip) server = webserver.web.WebServerApp(aip)
server.run() server.run(host="0.0.0.0", port=int(os.getenv("PORT", 5000)), debug=int(os.getenv("DEBUG", 0)) > 0)
if __name__ == "__main__": if __name__ == "__main__":
main() main()

View File

@@ -8,7 +8,8 @@ SYSTEM_PROMPT = """
You are CopeAI Windows Agent. You are currently controlling a Windows 11 machine. \ You are CopeAI Windows Agent. You are currently controlling a Windows 11 machine. \
You are capable to see the screen, click buttons, type text, and interact with the system. \ You are capable to see the screen, click buttons, type text, and interact with the system. \
You will use the functions provided. The resolution of the machine is 1920x1080. \ You will use the functions provided. The resolution of the machine is 1920x1080. \
Your text response must indicate what you are doing.""" Your text response must indicate what you are doing. If the place where you clicked seems incorrect, \
you will use everything you can to find the position of the location of the goal and click again. You will see a red cross on where you previously clicked."""
FUNCTIONS = [ FUNCTIONS = [
{ {
@@ -30,7 +31,7 @@ FUNCTIONS = [
"click_type": { "click_type": {
"type": "string", "type": "string",
"enum": ["left", "double_left", "middle", "right"], "enum": ["left", "double_left", "middle", "right"],
"description": "The type of mouse click to perform." "description": "The type of mouse click to perform. `double_left` is a double click, used to open apps or files."
} }
}, },
"required": ["click_type", "x", "y"], "required": ["click_type", "x", "y"],
@@ -58,7 +59,75 @@ FUNCTIONS = [
"required": ["text", "press_enter"], "required": ["text", "press_enter"],
} }
} }
} },
{
"type": "function",
"function": {
"name": "wait",
"description": "Wait for a specified amount of time.",
"parameters": {
"type": "object",
"properties": {
"duration": {
"type": "number",
"description": "The duration to wait in seconds."
}
},
"required": ["duration"],
}
}
},
{
"type": "function",
"function": {
"name": "reprompt",
"description": "After doing what you had to do, re-execute once again with a new screenshot.",
"parameters": {
"type": "object",
"properties": {
"nextsteps": {
"type": "string",
"description": "The new steps to perform."
}
},
"required": ["nextsteps"],
}
}
},
{
"type": "function",
"function": {
"name": "confirm",
"description": "Confirm that the task is completed and no further actions are needed. ONLY execute this when you fulfilled the user's request. This can be the only function called.",
"parameters": {
"type": "object",
"properties": {
"goal": {
"type": "string",
"description": "The goal that was achieved."
}
},
"required": ["goal"],
}
}
},
{
"type": "function",
"function": {
"name": "search_pc",
"description": "Open the start menu, then searches for content. Use to open apps, open file explorer, or search the web. Use this in priority!!!",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query to perform."
}
},
"required": ["query"],
}
}
},
] ]

44
objects/logger.py Normal file
View File

@@ -0,0 +1,44 @@
import logging
import os
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv
load_dotenv()
# Configuration values
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_DIR = os.getenv("LOG_DIR", os.path.join(os.getcwd(), "logs"))
# Ensure log directory exists
os.makedirs(LOG_DIR, exist_ok=True)
# Log file path
LOG_FILE = os.path.join(LOG_DIR, "app.log")
# Create root logger
logger = logging.getLogger("gpt-agent")
logger.setLevel(LOG_LEVEL)
# Formatter
formatter = logging.Formatter(LOG_FORMAT)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(LOG_LEVEL)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Rotating file handler
file_handler = RotatingFileHandler(LOG_FILE, maxBytes=5*1024*1024, backupCount=5)
file_handler.setLevel(LOG_LEVEL)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def get_logger(name: str | None = None) -> logging.Logger:
"""
Retrieve a configured logger instance. If name is provided,
returns a child logger of the configured root logger.
"""
if name:
return logger.getChild(name)
return logger

View File

@@ -5,4 +5,7 @@ python-dotenv
# libraries to control mouse+keyboard+see screen # libraries to control mouse+keyboard+see screen
pyautogui pyautogui
pynput pynput
Pillow pillow
pytesseract
# --index-url https://mirrors.sustech.edu.cn/pypi/simple

View File

@@ -2,7 +2,7 @@ from flask import Flask, request, jsonify
import os, ai.processor import os, ai.processor
from dotenv import load_dotenv from dotenv import load_dotenv
import io import io
from PIL import ImageGrab from PIL import ImageGrab # type: ignore
load_dotenv() load_dotenv()
@@ -27,8 +27,6 @@ class WebServerApp:
# Process the data as needed # Process the data as needed
prompt = data.get('prompt', '') prompt = data.get('prompt', '')
if not prompt: if not prompt:
return jsonify({"error": "No prompt provided"}), 400 return jsonify({"error": "No prompt provided"}), 400
img_data = None img_data = None
@@ -40,6 +38,7 @@ class WebServerApp:
img_data = None img_data = None
else: else:
if 'host_screenshot' in data: if 'host_screenshot' in data:
print('Taking screenshot...')
# take a screenshot right here # take a screenshot right here
# capture the full screen # capture the full screen
screenshot_img = ImageGrab.grab() screenshot_img = ImageGrab.grab()