feat: wip, v2
This commit is contained in:
3
copeai_backend/__init__.py
Normal file
3
copeai_backend/__init__.py
Normal file
@@ -0,0 +1,3 @@
|
||||
from .conversation import Conversation, ConversationResponse, Role
|
||||
from .generate import process_text_streaming, simple_process_text
|
||||
from .models import Model, Service, GPT_3, GPT_4
|
102
copeai_backend/conversation.py
Normal file
102
copeai_backend/conversation.py
Normal file
@@ -0,0 +1,102 @@
|
||||
from dataclasses import dataclass
|
||||
import typing
|
||||
from openai import AsyncStream
|
||||
from openai.types.chat import ChatCompletionChunk, ChatCompletion
|
||||
import tiktoken
|
||||
from enum import Enum
|
||||
|
||||
from copeai_backend.exception import ConversationLockedException
|
||||
|
||||
from . import models
|
||||
|
||||
encoding = tiktoken.get_encoding("cl100k_base")
|
||||
|
||||
BASE_PROMPT = ""
|
||||
|
||||
class Role(Enum):
|
||||
SYSTEM = "system"
|
||||
USER = "user"
|
||||
ASSISTANT = "assistant"
|
||||
|
||||
|
||||
@dataclass
|
||||
class GeneratingResponseChunk:
|
||||
"""A chunk of a response from the model. You receive this when the **generation is still going on**, and streamed."""
|
||||
|
||||
text: str
|
||||
raw: ChatCompletionChunk
|
||||
|
||||
|
||||
class Conversation:
|
||||
def __init__(self, add_base_prompt: bool = True, storage: dict = {}) -> None:
|
||||
self.messages = []
|
||||
self.last_used_model: models.Model | None = None
|
||||
self.locked = False
|
||||
self.interruput = False
|
||||
self.store = storage
|
||||
|
||||
if add_base_prompt and BASE_PROMPT:
|
||||
self.messages.append({"role": Role.SYSTEM.value, "content": BASE_PROMPT})
|
||||
|
||||
def add_message(self, role: Role, message, username: str | None = None):
|
||||
if not self.locked:
|
||||
d = {"role": role.value, "content": message}
|
||||
if username:
|
||||
d["name"] = username
|
||||
self.messages.append(d)
|
||||
else:
|
||||
raise ConversationLockedException()
|
||||
|
||||
def interrupt(self):
|
||||
"""Interrupts any conversations going on."""
|
||||
self.interruput = True
|
||||
|
||||
def get_tokens(self):
|
||||
return text_to_tokens(self.messages)
|
||||
|
||||
def last_role(self):
|
||||
return Role(self.messages[-1]["role"])
|
||||
|
||||
def last_message(self):
|
||||
return self.messages[-1]["content"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class ConversationResponse:
|
||||
"""A response from the generation. You receive this when the **generation is done**, or non-streamed requests."""
|
||||
|
||||
conversation: Conversation
|
||||
text: str | list[str]
|
||||
raw_response: list[ChatCompletion] | list[ChatCompletionChunk]
|
||||
|
||||
def text_to_tokens(string_or_messages: str | list[str | dict | list] | Conversation) -> int:
|
||||
"""Returns the number of tokens in a text string."""
|
||||
num_tokens = 0
|
||||
|
||||
messages = []
|
||||
if isinstance(string_or_messages, str):
|
||||
messages = [{"role": "user", "content": string_or_messages}]
|
||||
else:
|
||||
messages = string_or_messages
|
||||
|
||||
for message in messages:
|
||||
# every message follows <im_start>{role/name}\n{content}<im_end>\n
|
||||
num_tokens += 4
|
||||
|
||||
if isinstance(message, dict):
|
||||
for key, value in message.items():
|
||||
num_tokens += len(encoding.encode(str(value)))
|
||||
if key == "name": # if there's a name, the role is omitted
|
||||
num_tokens += 1 # role is always required and always 1 token
|
||||
elif isinstance(message, list):
|
||||
for item in message:
|
||||
if item["type"] == "text":
|
||||
num_tokens += len(encoding.encode(item["text"]))
|
||||
elif isinstance(message, str):
|
||||
num_tokens += len(encoding.encode(message))
|
||||
elif isinstance(messages, Conversation):
|
||||
for message in messages.messages:
|
||||
num_tokens += text_to_tokens(message["content"])
|
||||
num_tokens += 2 # every reply is primed with <im_start>assistant
|
||||
|
||||
return num_tokens
|
7
copeai_backend/exception/LockedConversationException.py
Normal file
7
copeai_backend/exception/LockedConversationException.py
Normal file
@@ -0,0 +1,7 @@
|
||||
class ConversationLockedException(Exception):
|
||||
"""Raised when there is already an ongoing conversation."""
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
"There is already an ongoing conversation. Please wait until it is finished."
|
||||
)
|
1
copeai_backend/exception/__init__.py
Normal file
1
copeai_backend/exception/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
from .LockedConversationException import ConversationLockedException
|
91
copeai_backend/generate.py
Normal file
91
copeai_backend/generate.py
Normal file
@@ -0,0 +1,91 @@
|
||||
import json
|
||||
import traceback
|
||||
from typing import Any, AsyncGenerator, Coroutine, Generator
|
||||
import requests
|
||||
import openai
|
||||
|
||||
import asyncio
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
from .conversation import (
|
||||
Conversation,
|
||||
Role,
|
||||
ConversationResponse,
|
||||
GeneratingResponseChunk,
|
||||
)
|
||||
from .models import Model
|
||||
from .exception import ConversationLockedException
|
||||
|
||||
load_dotenv()
|
||||
|
||||
oclient = openai.AsyncOpenAI(api_key=os.environ.get("OPENAI_KEY"))
|
||||
|
||||
|
||||
async def simple_process_text(
|
||||
conversation: Conversation,
|
||||
model: Model,
|
||||
new_message: str,
|
||||
additional_args: dict = {},
|
||||
) -> ConversationResponse:
|
||||
conversation.add_message(Role.USER, new_message)
|
||||
conversation.last_used_model = model
|
||||
r = await oclient.chat.completions.create(
|
||||
model=model.id, messages=conversation.messages, **additional_args
|
||||
)
|
||||
conversation.add_message(Role.ASSISTANT, r.choices[0].message.content)
|
||||
return ConversationResponse(conversation, r.choices[0].message.content, r)
|
||||
|
||||
|
||||
async def process_text_streaming(
|
||||
conversation: Conversation,
|
||||
model: Model,
|
||||
new_message: str,
|
||||
additional_args: dict = {},
|
||||
) -> [ConversationResponse, GeneratingResponseChunk]: # FIXME change type
|
||||
if conversation.locked:
|
||||
raise ConversationLockedException()
|
||||
|
||||
try:
|
||||
text_parts = []
|
||||
resp_parts = []
|
||||
|
||||
conversation.add_message(
|
||||
Role.USER,
|
||||
new_message,
|
||||
(additional_args["userid"] if "userid" in additional_args else "unknown"),
|
||||
)
|
||||
conversation.last_used_model = model
|
||||
conversation.locked = True
|
||||
if model.service == "openai":
|
||||
response = await oclient.chat.completions.create(
|
||||
model=model.id,
|
||||
messages=conversation.messages,
|
||||
temperature=0.9,
|
||||
top_p=1.0,
|
||||
presence_penalty=0.6,
|
||||
frequency_penalty=0.0,
|
||||
max_tokens=4096,
|
||||
stream=True,
|
||||
)
|
||||
|
||||
async for chunk in response:
|
||||
partition = chunk.choices[0].delta
|
||||
if (
|
||||
"content"
|
||||
in json.loads(chunk.model_dump_json())["choices"][0]["delta"].keys()
|
||||
):
|
||||
if partition.content is not None:
|
||||
text_parts.append(partition.content)
|
||||
resp_parts.append(chunk)
|
||||
yield GeneratingResponseChunk(partition.content, chunk)
|
||||
|
||||
if conversation.interruput:
|
||||
yield ConversationResponse(conversation, text_parts, resp_parts)
|
||||
|
||||
conversation.locked = False
|
||||
|
||||
conversation.add_message(Role.ASSISTANT, ''.join(text_parts))
|
||||
yield ConversationResponse(conversation, text_parts, resp_parts)
|
||||
except Exception as e:
|
||||
conversation.locked = False
|
||||
raise e
|
17
copeai_backend/models.py
Normal file
17
copeai_backend/models.py
Normal file
@@ -0,0 +1,17 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import Literal
|
||||
|
||||
|
||||
Service = Literal["openai", "bard"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class Model:
|
||||
id: str
|
||||
usage_name: str
|
||||
service: Service
|
||||
|
||||
|
||||
GPT_3 = Model(id="gpt-3.5-turbo-16k-0613", usage_name="GPT-3", service="openai")
|
||||
|
||||
GPT_4 = Model(id="gpt-4-16k-0613", usage_name="GPT-4", service="openai")
|
Reference in New Issue
Block a user