Add initial implementation of AI agent with mouse and keyboard control features
This commit is contained in:
parent
ed34ebca6a
commit
7e612c1af7
16
.vscode/launch.json
vendored
Normal file
16
.vscode/launch.json
vendored
Normal file
@ -0,0 +1,16 @@
|
||||
{
|
||||
// Use IntelliSense to learn about possible attributes.
|
||||
// Hover to view descriptions of existing attributes.
|
||||
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
|
||||
"version": "0.2.0",
|
||||
"configurations": [
|
||||
|
||||
{
|
||||
"name": "Python Debugger: Current File",
|
||||
"type": "debugpy",
|
||||
"request": "launch",
|
||||
"program": "main.py",
|
||||
"console": "integratedTerminal"
|
||||
}
|
||||
]
|
||||
}
|
29
ai/compute.py
Normal file
29
ai/compute.py
Normal file
@ -0,0 +1,29 @@
|
||||
import pyautogui
|
||||
from objects.inputs import MouseInput, KeyboardInput, ButtonType
|
||||
|
||||
def press_mouse(mouse_input: MouseInput) -> None:
|
||||
"""Presses mouse buttons at the given position."""
|
||||
x, y = mouse_input.x, mouse_input.y
|
||||
button = mouse_input.click_type
|
||||
if button == ButtonType.LEFT:
|
||||
pyautogui.click(x, y, button='left')
|
||||
elif button == ButtonType.DOUBLE_LEFT:
|
||||
pyautogui.doubleClick(x, y)
|
||||
elif button == ButtonType.RIGHT:
|
||||
pyautogui.click(x, y, button='right')
|
||||
elif button == ButtonType.MIDDLE:
|
||||
pyautogui.click(x, y, button='middle')
|
||||
|
||||
def press_keyboard(keyboard_input: KeyboardInput) -> None:
|
||||
"""Types the given sequence of keys."""
|
||||
text = keyboard_input.text
|
||||
if text:
|
||||
pyautogui.typewrite(text)
|
||||
if keyboard_input.press_enter:
|
||||
pyautogui.press('enter')
|
||||
|
||||
def _execute(name, args):
|
||||
if name == "click_button":
|
||||
press_mouse(MouseInput(**args))
|
||||
elif name == "type_text":
|
||||
press_keyboard(KeyboardInput(**args))
|
73
ai/processor.py
Normal file
73
ai/processor.py
Normal file
@ -0,0 +1,73 @@
|
||||
import traceback
|
||||
import json # new
|
||||
import openai
|
||||
from objects import aic
|
||||
import ai.compute
|
||||
|
||||
class AIProcessor:
|
||||
def __init__(self, api_key: str, model: str = "gpt-4.1"):
|
||||
self.oai = openai.Client(api_key=api_key)
|
||||
self.model = model
|
||||
self.session = aic.Session(messages=[aic.Message(role="system", content=aic.SYSTEM_PROMPT)], model=model) # type: ignore
|
||||
self._tools_map = { # local binding of python callables
|
||||
"click_button": self._click_button,
|
||||
"type_text": self._type_text,
|
||||
}
|
||||
|
||||
# --------------------- tool implementations --------------------- #
|
||||
def _click_button(self, x: int, y: int, click_type: str) -> str:
|
||||
# TODO: integrate real mouse automation.
|
||||
return f"Performed {click_type} click at ({x}, {y})."
|
||||
|
||||
def _type_text(self, text: str) -> str:
|
||||
# TODO: integrate real typing automation.
|
||||
return f'Typed text: "{text}"'
|
||||
|
||||
def _execute_tool(self, name: str, arguments: dict) -> str:
|
||||
func = self._tools_map.get(name)
|
||||
if not func:
|
||||
return f"Unknown tool: {name}"
|
||||
try:
|
||||
return func(**arguments)
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
return f"Error executing {name}: {e}"
|
||||
|
||||
# -------------------------- main entry -------------------------- #
|
||||
def process(self, prompt: str, img_data: str | bytes | None = None) -> str | list[dict]:
|
||||
try:
|
||||
self.session.messages.append(
|
||||
aic.Message(role="user", content=prompt, image=img_data)
|
||||
)
|
||||
response = self.oai.chat.completions.create(
|
||||
model=self.model,
|
||||
messages=self.session.messages_dict(),
|
||||
tools=aic.FUNCTIONS, # type: ignore
|
||||
)
|
||||
|
||||
# return tool call requests if any
|
||||
tool_calls = getattr(response.choices[0].message, "tool_calls", None)
|
||||
if tool_calls:
|
||||
for tc in tool_calls:
|
||||
ai.compute._execute(
|
||||
name=tc.function.name,
|
||||
args=json.loads(tc.function.arguments)
|
||||
)
|
||||
return [
|
||||
{
|
||||
"name": tc.function.name,
|
||||
"arguments": json.loads(tc.function.arguments),
|
||||
}
|
||||
for tc in tool_calls
|
||||
]
|
||||
|
||||
# otherwise return final assistant content
|
||||
print(f"Response: {json.dumps(response.to_dict(), indent=4)}") # debug
|
||||
output_text: str = response.choices[0].message.content # type: ignore
|
||||
self.session.messages.append(
|
||||
aic.Message(role="assistant", content=output_text)
|
||||
)
|
||||
return output_text
|
||||
except Exception as e:
|
||||
traceback.print_exc()
|
||||
return f"Error processing request: {str(e)}"
|
18
main.py
Normal file
18
main.py
Normal file
@ -0,0 +1,18 @@
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
|
||||
import ai.processor
|
||||
import webserver.web
|
||||
|
||||
load_dotenv()
|
||||
|
||||
def main():
|
||||
aip = ai.processor.AIProcessor(
|
||||
api_key=os.getenv("OPENAI_API_KEY", ""),
|
||||
model=os.getenv("OPENAI_MODEL", "gpt-4.1")
|
||||
)
|
||||
server = webserver.web.WebServerApp(aip)
|
||||
server.run()
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
97
objects/aic.py
Normal file
97
objects/aic.py
Normal file
@ -0,0 +1,97 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional, Union
|
||||
from typing_extensions import Literal
|
||||
|
||||
compatible_models = ("gpt-4.1", "gpt-4.1-mini", 'o4-mini', 'gpt-4.1-nano')
|
||||
|
||||
SYSTEM_PROMPT = """
|
||||
You are CopeAI Windows Agent. You are currently controlling a Windows 11 machine. \
|
||||
You are capable to see the screen, click buttons, type text, and interact with the system. \
|
||||
You will use the functions provided. The resolution of the machine is 1920x1080. \
|
||||
Your text response must indicate what you are doing."""
|
||||
|
||||
FUNCTIONS = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "click_button",
|
||||
"description": "Click a button at the specified (x, y) position with the given click type.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"x": {
|
||||
"type": "integer",
|
||||
"description": "The X coordinate of the button."
|
||||
},
|
||||
"y": {
|
||||
"type": "integer",
|
||||
"description": "The Y coordinate of the button."
|
||||
},
|
||||
"click_type": {
|
||||
"type": "string",
|
||||
"enum": ["left", "double_left", "middle", "right"],
|
||||
"description": "The type of mouse click to perform."
|
||||
}
|
||||
},
|
||||
"required": ["click_type", "x", "y"],
|
||||
}
|
||||
}
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "type_text",
|
||||
"description": "Type the given text at the current cursor location.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"text": {
|
||||
"type": "string",
|
||||
"description": "The text to type."
|
||||
},
|
||||
"press_enter": {
|
||||
"type": "boolean",
|
||||
"default": False,
|
||||
"description": "Whether to press Enter after typing the text."
|
||||
}
|
||||
},
|
||||
"required": ["text", "press_enter"],
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Message:
|
||||
role: Literal['user', 'assistant', 'system', 'tool'] # + tool
|
||||
content: str
|
||||
image: Optional[Union[str, bytes]] = None
|
||||
disable_image: bool = False
|
||||
name: Optional[str] = None # new – only for tool messages
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
base = {
|
||||
"role": self.role,
|
||||
"content": self.content
|
||||
if (not self.image and not self.disable_image)
|
||||
else [{"type": "text", "text": self.content},
|
||||
{"type": "image_url", "image_url": {"url":f"data:image/png;base64,{self.image}"}}],
|
||||
}
|
||||
if self.role == "tool" and self.name: # include tool name if present
|
||||
base["name"] = self.name
|
||||
return base
|
||||
|
||||
@dataclass
|
||||
class Session:
|
||||
messages: list[Message]
|
||||
model: str = "gpt-4.1"
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"messages": [message.to_dict() for message in self.messages],
|
||||
"model": self.model
|
||||
}
|
||||
|
||||
def messages_dict(self) -> list:
|
||||
return [message.to_dict() for message in self.messages]
|
20
objects/inputs.py
Normal file
20
objects/inputs.py
Normal file
@ -0,0 +1,20 @@
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum
|
||||
|
||||
class ButtonType(Enum):
|
||||
LEFT = "left"
|
||||
DOUBLE_LEFT = "double_left"
|
||||
RIGHT = "right"
|
||||
MIDDLE = "middle"
|
||||
|
||||
@dataclass
|
||||
class KeyboardInput:
|
||||
text: str
|
||||
press_enter: bool = False
|
||||
|
||||
@dataclass
|
||||
class MouseInput:
|
||||
x: int
|
||||
y: int
|
||||
click_type: list[ButtonType]
|
||||
|
7
requirements.txt
Normal file
7
requirements.txt
Normal file
@ -0,0 +1,7 @@
|
||||
openai
|
||||
requests
|
||||
flask
|
||||
python-dotenv
|
||||
# libraries to control mouse+keyboard+see screen
|
||||
pyautogui
|
||||
pynput
|
60
webserver/web.py
Normal file
60
webserver/web.py
Normal file
@ -0,0 +1,60 @@
|
||||
from flask import Flask, request, jsonify
|
||||
import os, ai.processor
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
class WebServerApp:
|
||||
def __init__(self, aip):
|
||||
self.app = Flask(__name__)
|
||||
self._register_routes()
|
||||
self.aip: ai.processor.AIProcessor = aip
|
||||
|
||||
def _register_routes(self):
|
||||
@self.app.route('/api/test')
|
||||
def test():
|
||||
return jsonify({"message": "Hello, World!"})
|
||||
|
||||
@self.app.route('/api/request', methods=['POST'])
|
||||
def handle_request():
|
||||
# sent as form-data
|
||||
data = request.form.to_dict()
|
||||
if not data:
|
||||
return jsonify({"error": "No data provided"}), 400
|
||||
|
||||
# Process the data as needed
|
||||
prompt = data.get('prompt', '')
|
||||
|
||||
|
||||
|
||||
if not prompt:
|
||||
return jsonify({"error": "No prompt provided"}), 400
|
||||
img_data = None
|
||||
if 'img' in request.files:
|
||||
img_file = request.files['img']
|
||||
if img_file:
|
||||
img_data = img_file.read()
|
||||
else:
|
||||
img_data = None
|
||||
|
||||
import base64
|
||||
# Convert image data to base64 if provided
|
||||
if img_data and isinstance(img_data, bytes):
|
||||
img_data = base64.b64encode(img_data).decode('utf-8')
|
||||
elif img_data and isinstance(img_data, str):
|
||||
img_data = img_data.encode('utf-8')
|
||||
|
||||
response = self.aip.process(prompt, img_data)
|
||||
return jsonify({"response": response}), 200
|
||||
|
||||
@self.app.route('/api/health')
|
||||
def health():
|
||||
return jsonify({"status": "healthy"})
|
||||
|
||||
def run(self, *args, **kwargs):
|
||||
self.app.run(*args, **kwargs)
|
||||
|
||||
# Example usage:
|
||||
# if __name__ == "__main__":
|
||||
# server = WebServerApp()
|
||||
# server.run()
|
Loading…
Reference in New Issue
Block a user