Add screenshot functionality and new commands for wait and reprompt
This commit is contained in:
parent
66330bfc73
commit
f7feb12946
@ -1,8 +1,20 @@
|
|||||||
import pyautogui
|
import pyautogui
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time, io, base64
|
||||||
import tkinter as tk
|
import tkinter as tk
|
||||||
from objects.inputs import MouseInput, KeyboardInput, ButtonType
|
from objects.inputs import MouseInput, KeyboardInput, ButtonType
|
||||||
|
from PIL import ImageGrab # type: ignore
|
||||||
|
|
||||||
|
def take_screenshot() -> bytes:
|
||||||
|
"""Take a screenshot of the current screen and return it as bytes."""
|
||||||
|
screenshot = ImageGrab.grab()
|
||||||
|
buf = io.BytesIO()
|
||||||
|
screenshot.save(buf, format='PNG')
|
||||||
|
return buf.getvalue()
|
||||||
|
|
||||||
|
def screenshot_to_base64(screenshot: bytes) -> str:
|
||||||
|
"""Convert screenshot bytes to a base64 encoded string."""
|
||||||
|
return base64.b64encode(screenshot).decode('utf-8')
|
||||||
|
|
||||||
def show_click_indicator(x: int, y: int, duration: float = 2.0, size: int = 50) -> None:
|
def show_click_indicator(x: int, y: int, duration: float = 2.0, size: int = 50) -> None:
|
||||||
"""Display a red circle at (x, y) for the given duration."""
|
"""Display a red circle at (x, y) for the given duration."""
|
||||||
@ -51,8 +63,22 @@ def press_keyboard(keyboard_input: KeyboardInput) -> None:
|
|||||||
if keyboard_input.press_enter:
|
if keyboard_input.press_enter:
|
||||||
pyautogui.press('enter')
|
pyautogui.press('enter')
|
||||||
|
|
||||||
def _execute(name, args):
|
def wait(duration: float) -> None:
|
||||||
|
"""Waits for the specified duration in seconds."""
|
||||||
|
time.sleep(duration)
|
||||||
|
|
||||||
|
def reprompt(nextsteps: str, processor) -> None:
|
||||||
|
"""Re-execute GPT and take a new screenshot."""
|
||||||
|
scr = screenshot_to_base64(take_screenshot())
|
||||||
|
return processor.process(nextsteps, img_data=scr)
|
||||||
|
|
||||||
|
|
||||||
|
def _execute(name, args, processor):
|
||||||
if name == "click_button":
|
if name == "click_button":
|
||||||
press_mouse(MouseInput(**args))
|
press_mouse(MouseInput(**args))
|
||||||
elif name == "type_text":
|
elif name == "type_text":
|
||||||
press_keyboard(KeyboardInput(**args))
|
press_keyboard(KeyboardInput(**args))
|
||||||
|
elif name == "wait":
|
||||||
|
wait(**args)
|
||||||
|
elif name == "reprompt":
|
||||||
|
reprompt(**args, processor=processor)
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
import traceback
|
import traceback
|
||||||
import json # new
|
import json
|
||||||
import openai
|
import openai
|
||||||
|
from flask import jsonify
|
||||||
from objects import aic
|
from objects import aic
|
||||||
import ai.compute
|
import ai.compute
|
||||||
|
|
||||||
@ -34,7 +35,8 @@ class AIProcessor:
|
|||||||
return f"Error executing {name}: {e}"
|
return f"Error executing {name}: {e}"
|
||||||
|
|
||||||
# -------------------------- main entry -------------------------- #
|
# -------------------------- main entry -------------------------- #
|
||||||
def process(self, prompt: str, img_data: str | bytes | None = None) -> str | list[dict]:
|
def process(self, prompt: str, img_data: str | bytes | None = None) -> str | list[str | dict]:
|
||||||
|
outputs = [] # type: list[str | dict]
|
||||||
try:
|
try:
|
||||||
self.session.messages.append(
|
self.session.messages.append(
|
||||||
aic.Message(role="user", content=prompt, image=img_data)
|
aic.Message(role="user", content=prompt, image=img_data)
|
||||||
@ -49,10 +51,12 @@ class AIProcessor:
|
|||||||
tool_calls = getattr(response.choices[0].message, "tool_calls", None)
|
tool_calls = getattr(response.choices[0].message, "tool_calls", None)
|
||||||
if tool_calls:
|
if tool_calls:
|
||||||
for tc in tool_calls:
|
for tc in tool_calls:
|
||||||
ai.compute._execute(
|
r = ai.compute._execute(
|
||||||
name=tc.function.name,
|
name=tc.function.name,
|
||||||
args=json.loads(tc.function.arguments)
|
args=json.loads(tc.function.arguments),
|
||||||
|
processor=self,
|
||||||
)
|
)
|
||||||
|
outputs.append(r) if r else None
|
||||||
return [
|
return [
|
||||||
{
|
{
|
||||||
"name": tc.function.name,
|
"name": tc.function.name,
|
||||||
@ -64,10 +68,11 @@ class AIProcessor:
|
|||||||
# otherwise return final assistant content
|
# otherwise return final assistant content
|
||||||
print(f"Response: {json.dumps(response.to_dict(), indent=4)}") # debug
|
print(f"Response: {json.dumps(response.to_dict(), indent=4)}") # debug
|
||||||
output_text: str = response.choices[0].message.content # type: ignore
|
output_text: str = response.choices[0].message.content # type: ignore
|
||||||
|
outputs.append(output_text)
|
||||||
self.session.messages.append(
|
self.session.messages.append(
|
||||||
aic.Message(role="assistant", content=output_text)
|
aic.Message(role="assistant", content=output_text)
|
||||||
)
|
)
|
||||||
return output_text
|
return outputs
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
return f"Error processing request: {str(e)}"
|
return f"Error processing request: {str(e)}"
|
||||||
|
@ -58,6 +58,40 @@ FUNCTIONS = [
|
|||||||
"required": ["text", "press_enter"],
|
"required": ["text", "press_enter"],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "function",
|
||||||
|
"function": {
|
||||||
|
"name": "wait",
|
||||||
|
"description": "Wait for a specified amount of time.",
|
||||||
|
"parameters": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"duration": {
|
||||||
|
"type": "number",
|
||||||
|
"description": "The duration to wait in seconds."
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["duration"],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"type": "function",
|
||||||
|
"function": {
|
||||||
|
"name": "reprompt",
|
||||||
|
"description": "After executing what you asked for, re-perform a screenshot to determine the next steps. Best combined with a wait.",
|
||||||
|
"parameters": {
|
||||||
|
"type": "object",
|
||||||
|
"properties": {
|
||||||
|
"nextsteps": {
|
||||||
|
"type": "string",
|
||||||
|
"description": "The next steps to take after the screenshot."
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"required": ["nextsteps"],
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -5,4 +5,6 @@ python-dotenv
|
|||||||
# libraries to control mouse+keyboard+see screen
|
# libraries to control mouse+keyboard+see screen
|
||||||
pyautogui
|
pyautogui
|
||||||
pynput
|
pynput
|
||||||
Pillow
|
pillow
|
||||||
|
|
||||||
|
# --index-url https://mirrors.sustech.edu.cn/pypi/simple
|
||||||
|
Loading…
Reference in New Issue
Block a user