Compare commits

..

38 Commits

Author SHA1 Message Date
6b13586154 wip 2025-05-19 20:45:50 +02:00
7192f4bc18 fix: update tool call content formatting and enhance mouse click description 2025-05-19 20:42:18 +02:00
36cfeffe9c fix: correct center-point calculation in OCR results 2025-05-19 20:35:11 +02:00
7f5296b2ef fix: increase wait time in search_pc function for improved reliability 2025-05-19 20:33:54 +02:00
e5b3ea8b57 fix: logger 2025-05-19 20:30:01 +02:00
ff7c362cfe fix 2025-05-19 20:28:01 +02:00
b035bee682 fix 2025-05-19 20:21:30 +02:00
c2fb041285 fix: truncate message content and image data to improve processing efficiency 2025-05-19 20:20:00 +02:00
4369611610 fix: add debug logging and visual indicators for OCR results 2025-05-19 20:19:36 +02:00
93a01b792b fix: update OCR result coordinates to center position 2025-05-19 20:11:56 +02:00
3d5f71ec84 fix 2025-05-19 17:19:24 +02:00
20f05ca991 fix: emphasize priority in search_pc function description 2025-05-19 17:09:21 +02:00
859e1c2f0b fix: missing bracket 2025-05-19 17:07:03 +02:00
d9a9eba4c7 updated win func 2025-05-19 17:05:38 +02:00
b89051a37f fix 2025-05-19 17:02:48 +02:00
72a876410c more context to gpt 2025-05-19 16:51:46 +02:00
46a5bce956 refactor: Update function descriptions for clarity and consistency 2025-05-19 16:41:02 +02:00
e639e1edd3 refactor: Rename press_windows_key to windows_key for consistency 2025-05-19 16:33:59 +02:00
9bd15d45c5 feat: Add functionality to press Windows key and update function registry 2025-05-19 16:32:09 +02:00
105ab4a04b feat: wip: give OCR+positions 2025-05-19 16:10:02 +02:00
5be7f9aadb feat: Add OCR functionality to process method; integrate Tesseract for text extraction from screenshots 2025-05-19 15:59:46 +02:00
20764d5d19 fix: Simplify click position extraction for screenshot crosshair in tool execution 2025-05-19 13:43:04 +02:00
158529a2bd fix: Parse tool call arguments as JSON for improved handling in process method 2025-05-19 13:41:25 +02:00
b583094e20 fix: Enhance screenshot functionality; add crosshair drawing and save screenshot to file 2025-05-19 13:39:26 +02:00
d7c4f9b0cb fix: Update image handling in process method; ensure only the last two messages retain images and improve debugging output 2025-05-19 13:27:16 +02:00
035252c146 fix: Enhance logging for tool calls in process method; handle potential errors in next steps assignment 2025-05-19 13:21:15 +02:00
892f41f78a fix: Shorten image data in message copies for better debugging; update logging to reflect changes 2025-05-19 13:17:51 +02:00
0af7dc7699 fix: bug 2025-05-19 13:14:38 +02:00
2bcddedca5 fix: Adjust message handling in process method; ensure correct image assignment and add next steps output 2025-05-19 13:13:28 +02:00
b881f04acc fix: Update process method return type and handle image attribute correctly; improve error handling 2025-05-19 13:10:46 +02:00
670066100f feat: Implement logging functionality; add logger configuration and retrieval 2025-05-19 13:05:36 +02:00
52c455b20c fix: Remove unused PyQt5 and tkinter overlay code; simplify click indicator function 2025-05-19 12:58:34 +02:00
a4e078bc19 tempfix: remove mouse overlay 2025-05-19 12:51:59 +02:00
1925a77d85 Add screenshot re-execution logic in AIProcessor; append outputs from tool calls 2025-05-19 09:34:21 +02:00
e573ecb553 Add confirmation function and re-execution logic in AIProcessor; clean up web server request handling 2025-05-19 09:30:58 +02:00
f7feb12946 Add screenshot functionality and new commands for wait and reprompt 2025-05-19 09:15:08 +02:00
66330bfc73 Implement click indicator with red circle display; update server run parameters 2025-05-19 09:00:39 +02:00
41f7d0e210 Refactor mouse button handling to use string literals instead of ButtonType constants; add debug print for screenshot action in web server 2025-05-19 08:53:46 +02:00
7 changed files with 310 additions and 22 deletions

View File

@@ -1,18 +1,99 @@
import pyautogui
import threading
import pytesseract
import time, io, base64
import sys
from objects.inputs import MouseInput, KeyboardInput, ButtonType
from PIL import ImageGrab, ImageDraw # type: ignore
from objects import logger as logger_module
import logging
logger: logging.Logger = logger_module.get_logger(__name__)
def take_screenshot(cross_position: list[tuple[int, int]] | None = None) -> bytes:
"""Take a screenshot of the current screen and return it as bytes."""
screenshot = ImageGrab.grab()
buf = io.BytesIO()
# Optionally draw a crosshair at the specified position
if cross_position:
for pos in cross_position:
x, y = pos
draw = ImageDraw.Draw(screenshot)
size = 20 # halflength of each arm
color = (255, 0, 0)
width = 2
# horizontal line
draw.line((x - size, y, x + size, y), fill=color, width=width)
# vertical line
draw.line((x, y - size, x, y + size), fill=color, width=width)
screenshot.save(buf, format='PNG')
# save in a file
screenshot.save("screenshot.png", format='PNG')
return buf.getvalue()
def perform_ocr(screenshot: bytes) -> list[dict]:
"""Perform OCR on screenshot bytes and return list of text blocks with positions."""
from PIL import Image # type: ignore
import io
# open image from bytes
img = Image.open(io.BytesIO(screenshot))
# perform OCR, get data dictionary
data = pytesseract.image_to_data(img, output_type=pytesseract.Output.DICT)
results = []
n = len(data.get('level', []))
for i in range(n):
text = data['text'][i]
if text and text.strip():
# Fix the center-point calculation (add first, then divide)
results.append({
'text': text,
'x': data['left'][i] + data['width'][i] // 2,
'y': data['top'][i] + data['height'][i] // 2
})
# check if debug is enabled
if logger.isEnabledFor(logging.DEBUG):
# take screenshot + put blue circle with x, y on screenshot for each component
screenshot_with_circles = Image.open(io.BytesIO(screenshot))
draw = ImageDraw.Draw(screenshot_with_circles)
for result in results:
x, y = result['x'], result['y']
size = 10
color = (0, 0, 255) # blue
width = 2
# horizontal line
draw.line((x - size, y, x + size, y), fill=color, width=width)
# vertical line
draw.line((x, y - size, x, y + size), fill=color, width=width)
screenshot_with_circles.save("screenshot_with_circles.png", format='PNG')
# save in a file
logger.debug("Debug, saving ocr results screenshot with circles")
screenshot_with_circles.save("ocr_results.png", format='PNG')
return results
def screenshot_to_base64(screenshot: bytes) -> str:
"""Convert screenshot bytes to a base64 encoded string."""
return base64.b64encode(screenshot).decode('utf-8')
def show_click_indicator(x: int, y: int, duration: float = 2.0, size: int = 50) -> None:
"""Display a red circle at (x, y) for the given duration, can be clicked through."""
pass
def press_mouse(mouse_input: MouseInput) -> None:
"""Presses mouse buttons at the given position."""
x, y = mouse_input.x, mouse_input.y
button = mouse_input.click_type
if button == ButtonType.LEFT:
if button == "left":
pyautogui.click(x, y, button='left')
elif button == ButtonType.DOUBLE_LEFT:
elif button == "double_left":
pyautogui.doubleClick(x, y)
elif button == ButtonType.RIGHT:
elif button == "right":
pyautogui.click(x, y, button='right')
elif button == ButtonType.MIDDLE:
elif button == "middle":
pyautogui.click(x, y, button='middle')
# Show red circle indicator at click position for 2 seconds
threading.Thread(target=show_click_indicator, args=(x, y), daemon=True).start()
def press_keyboard(keyboard_input: KeyboardInput) -> None:
"""Types the given sequence of keys."""
@@ -22,8 +103,29 @@ def press_keyboard(keyboard_input: KeyboardInput) -> None:
if keyboard_input.press_enter:
pyautogui.press('enter')
def _execute(name, args):
def wait(duration: float) -> None:
"""Waits for the specified duration in seconds."""
time.sleep(duration)
def search_pc(query: str) -> None:
"""Presses the Windows key."""
pyautogui.hotkey('win')
wait(4)
press_keyboard(KeyboardInput(text=query))
def reprompt(nextsteps: str, processor) -> None:
"""Re-execute GPT and take a new screenshot."""
scr = screenshot_to_base64(take_screenshot())
return processor.process(nextsteps, img_data=scr)
def _execute(name, args=[], processor=None):
if name == "click_button":
press_mouse(MouseInput(**args))
elif name == "type_text":
press_keyboard(KeyboardInput(**args))
elif name == "wait":
wait(**args)
elif name == "search_pc":
search_pc(**args)
elif name == "reprompt":
reprompt(**args, processor=processor)

View File

@@ -1,8 +1,14 @@
import traceback
import json # new
import json
import openai
import base64
from flask import jsonify
from objects import aic
import ai.compute
from objects import logger as logger_module
import logging
logger: logging.Logger = logger_module.get_logger(__name__)
class AIProcessor:
def __init__(self, api_key: str, model: str = "gpt-4.1"):
@@ -34,11 +40,17 @@ class AIProcessor:
return f"Error executing {name}: {e}"
# -------------------------- main entry -------------------------- #
def process(self, prompt: str, img_data: str | bytes | None = None) -> str | list[dict]:
def process(self, prompt: str, img_data: str | bytes | None = None) -> list[str | dict]:
outputs = [] # type: list[str | dict]
reexec = True
click_positions = [] # used for screenshot crosshair position
nextsteps = ""
try:
# append user prompt with optional image
self.session.messages.append(
aic.Message(role="user", content=prompt, image=img_data)
)
# if image provided, perform OCR and include text positions
response = self.oai.chat.completions.create(
model=self.model,
messages=self.session.messages_dict(),
@@ -49,25 +61,84 @@ class AIProcessor:
tool_calls = getattr(response.choices[0].message, "tool_calls", None)
if tool_calls:
for tc in tool_calls:
ai.compute._execute(
name=tc.function.name,
args=json.loads(tc.function.arguments)
ags = json.loads(tc.function.arguments)
logger.debug(
"Processing tool call: %s with arguments: %s",
tc.function.name,
tc.function.arguments,
)
if tc.function.name == "confirm":
reexec = False
try:
nextsteps = ags.get("goal", "")
except:
nextsteps = str(tc.function.arguments)
print('ERROR NEXT STEPS IS STR, ', nextsteps)
if tc.function.name == "click_button":
# extract click position for screenshot crosshair
click_positions.append((ags.get("x", 0), ags.get("y", 0)))
r = ai.compute._execute(
name=tc.function.name,
args=json.loads(tc.function.arguments),
processor=self,
)
outputs.append(r) if r else None
# Make sure every images except the two last are removed
for msg in self.session.messages[:-3]:
if msg.image and not msg.disable_image:
msg.image = None
# copy of self.session.messages, but shorten the image data for better debugging
cps = [
aic.Message(
role=msg.role,
content=msg.content[:80],
image=msg.image[:20] if isinstance(msg.image, str) else None, # type: ignore
disable_image=msg.disable_image,
name=msg.name,
)
for msg in self.session.messages
]
logger.debug(
"Shortened message copies for processing: %s", cps
)
if reexec:
img_bytes = ai.compute.take_screenshot(cross_position=click_positions)
img = ai.compute.screenshot_to_base64(
img_bytes
)
ocr_results = []
try:
ocr_results = ai.compute.perform_ocr(img_bytes)
except Exception as e:
traceback.print_exc()
logger.debug("OCR failed: %s", e)
self.session.messages.append(
aic.Message(
role="assistant",
content=str(((tc.function.name, tc.function.arguments) for tc in tool_calls)),
)
)
outputs.extend( self.process(nextsteps+f"\nOCR Positions: {ocr_results}", img) )
return [
{
"name": tc.function.name,
"arguments": json.loads(tc.function.arguments),
}
for tc in tool_calls
]
] + outputs # type: ignore
# otherwise return final assistant content
print(f"Response: {json.dumps(response.to_dict(), indent=4)}") # debug
output_text: str = response.choices[0].message.content # type: ignore
outputs.append(output_text)
self.session.messages.append(
aic.Message(role="assistant", content=output_text)
aic.Message(role="assistant", content="Executed: " + (str(*outputs)))
)
return output_text
return [*outputs]
except Exception as e:
traceback.print_exc()
return f"Error processing request: {str(e)}"
return [f"Error processing request: {str(e)}"]

View File

@@ -12,7 +12,7 @@ def main():
model=os.getenv("OPENAI_MODEL", "gpt-4.1")
)
server = webserver.web.WebServerApp(aip)
server.run()
server.run(host="0.0.0.0", port=int(os.getenv("PORT", 5000)), debug=int(os.getenv("DEBUG", 0)) > 0)
if __name__ == "__main__":
main()

View File

@@ -8,7 +8,8 @@ SYSTEM_PROMPT = """
You are CopeAI Windows Agent. You are currently controlling a Windows 11 machine. \
You are capable to see the screen, click buttons, type text, and interact with the system. \
You will use the functions provided. The resolution of the machine is 1920x1080. \
Your text response must indicate what you are doing."""
Your text response must indicate what you are doing. If the place where you clicked seems incorrect, \
you will use everything you can to find the position of the location of the goal and click again. You will see a red cross on where you previously clicked."""
FUNCTIONS = [
{
@@ -30,7 +31,7 @@ FUNCTIONS = [
"click_type": {
"type": "string",
"enum": ["left", "double_left", "middle", "right"],
"description": "The type of mouse click to perform."
"description": "The type of mouse click to perform. `double_left` is a double click, used to open apps or files."
}
},
"required": ["click_type", "x", "y"],
@@ -58,7 +59,75 @@ FUNCTIONS = [
"required": ["text", "press_enter"],
}
}
}
},
{
"type": "function",
"function": {
"name": "wait",
"description": "Wait for a specified amount of time.",
"parameters": {
"type": "object",
"properties": {
"duration": {
"type": "number",
"description": "The duration to wait in seconds."
}
},
"required": ["duration"],
}
}
},
{
"type": "function",
"function": {
"name": "reprompt",
"description": "After doing what you had to do, re-execute once again with a new screenshot.",
"parameters": {
"type": "object",
"properties": {
"nextsteps": {
"type": "string",
"description": "The new steps to perform."
}
},
"required": ["nextsteps"],
}
}
},
{
"type": "function",
"function": {
"name": "confirm",
"description": "Confirm that the task is completed and no further actions are needed. ONLY execute this when you fulfilled the user's request. This can be the only function called.",
"parameters": {
"type": "object",
"properties": {
"goal": {
"type": "string",
"description": "The goal that was achieved."
}
},
"required": ["goal"],
}
}
},
{
"type": "function",
"function": {
"name": "search_pc",
"description": "Open the start menu, then searches for content. Use to open apps, open file explorer, or search the web. Use this in priority!!!",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "The search query to perform."
}
},
"required": ["query"],
}
}
},
]

44
objects/logger.py Normal file
View File

@@ -0,0 +1,44 @@
import logging
import os
from logging.handlers import RotatingFileHandler
from dotenv import load_dotenv
load_dotenv()
# Configuration values
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
LOG_FORMAT = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
LOG_DIR = os.getenv("LOG_DIR", os.path.join(os.getcwd(), "logs"))
# Ensure log directory exists
os.makedirs(LOG_DIR, exist_ok=True)
# Log file path
LOG_FILE = os.path.join(LOG_DIR, "app.log")
# Create root logger
logger = logging.getLogger("gpt-agent")
logger.setLevel(LOG_LEVEL)
# Formatter
formatter = logging.Formatter(LOG_FORMAT)
# Console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(LOG_LEVEL)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
# Rotating file handler
file_handler = RotatingFileHandler(LOG_FILE, maxBytes=5*1024*1024, backupCount=5)
file_handler.setLevel(LOG_LEVEL)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
def get_logger(name: str | None = None) -> logging.Logger:
"""
Retrieve a configured logger instance. If name is provided,
returns a child logger of the configured root logger.
"""
if name:
return logger.getChild(name)
return logger

View File

@@ -5,4 +5,7 @@ python-dotenv
# libraries to control mouse+keyboard+see screen
pyautogui
pynput
Pillow
pillow
pytesseract
# --index-url https://mirrors.sustech.edu.cn/pypi/simple

View File

@@ -2,7 +2,7 @@ from flask import Flask, request, jsonify
import os, ai.processor
from dotenv import load_dotenv
import io
from PIL import ImageGrab
from PIL import ImageGrab # type: ignore
load_dotenv()
@@ -27,8 +27,6 @@ class WebServerApp:
# Process the data as needed
prompt = data.get('prompt', '')
if not prompt:
return jsonify({"error": "No prompt provided"}), 400
img_data = None
@@ -40,6 +38,7 @@ class WebServerApp:
img_data = None
else:
if 'host_screenshot' in data:
print('Taking screenshot...')
# take a screenshot right here
# capture the full screen
screenshot_img = ImageGrab.grab()