import time import traceback import discord from discord import app_commands from discord.ext import tasks import os import openai import sqlite3 import tiktoken from dotenv import load_dotenv from typing import Dict import copeai_backend import views.GenerationState load_dotenv() openai.api_key = os.environ['OPENAI_API_KEY'] db = sqlite3.connect('tokens.db') typing = [] intents = discord.Intents.default() intents.members = True intents.presences = True intents.dm_messages = True cached_conversations: Dict[discord.User, copeai_backend.conversation.Conversation] = {} class App(discord.Client): def __init__(self): super().__init__(intents=intents) self.tree = app_commands.CommandTree(client=self) print('Running') async def setup_hook(self) -> None: return app = App() @tasks.loop(seconds=5) async def typing_loop(): for channel in typing: try: await channel.typing() except: typing.remove(channel) @app.event async def on_ready(): print('We have logged in as {0.user}'.format(app)) await app.change_presence(activity=discord.Activity(type=discord.ActivityType.watching, name="your direct messages!")) typing_loop.start() encoding = tiktoken.get_encoding('cl100k_base') def num_tokens_from_string(string: str) -> int: """Returns the number of tokens in a text string.""" num_tokens = len(encoding.encode(string)) return num_tokens @app.event async def on_message(message: discord.Message): if not isinstance(message.channel, discord.DMChannel): return if message.author.id == app.user.id: return try: with open('base-prompt.txt', 'r', encoding='utf-8') as f: bprompt = f.read() activs = "" for activity in message.author.mutual_guilds[0].get_member(message.author.id).activities: if isinstance(activity, discord.Spotify): activs += f"- Listening to {activity.title} by {activity.artist} on Spotify\n" elif isinstance(activity, discord.Streaming): activs += f"- Streaming {activity.name}\n" elif isinstance(activity, discord.Game): activs += f"- Playing {activity.name}\n" elif isinstance(activity, discord.CustomActivity): activs += f"- Custom Activity: {activity.name}\n" elif isinstance(activity, discord.Activity): activs += f"- {activity.type.name.capitalize()} {activity.name}\n" arguments = { "username": message.author.name, "status": message.author.mutual_guilds[0].get_member(message.author.id).raw_status, "activities": activs.strip('\n') } for arg in arguments.keys(): bprompt = bprompt.replace(f'|{arg}|', arguments[arg]) c = db.cursor() if message.author not in cached_conversations: cached_conversations[message.author] = copeai_backend.conversation.Conversation() c.execute('SELECT * FROM message_history WHERE user_id = ? ORDER BY timestamp DESC', (message.author.id,)) msgs = c.fetchall() message_token_usage = num_tokens_from_string(message.content) max_token = int(os.environ['MAX_TOKEN_PER_REQUEST']) previous_tokens = 200+len(bprompt)+message_token_usage # (message_id, user_id, content, token, role, timestamp) # order by timestamp (most recent to least recent) usable_messages = [] for msg in msgs: print(msg) d = previous_tokens + msg[3] if d >= max_token: print("je break") break previous_tokens += msg[3] print('jappend') usable_messages.append(msg) usable_messages.reverse() cached_conversations[message.author].add_message( role=copeai_backend.conversation.Role.SYSTEM, message=bprompt ) for v in usable_messages: cached_conversations[message.author].add_message( role=copeai_backend.conversation.Role(v[4]), message=v[2] ) else: total_tokens = copeai_backend.conversation.text_to_tokens(cached_conversations[message.author]) while total_tokens > int(os.environ['MAX_TOKEN_PER_REQUEST']) - 400: cached_conversations[message.author].messages.pop(0) total_tokens = copeai_backend.conversation.text_to_tokens(cached_conversations[message.author]) cached_conversations[message.author].add_message( role=copeai_backend.conversation.Role.USER, message=message.content ) await message.channel.typing() typing.append(message.channel) req = copeai_backend.generate.process_text_streaming( conversation=cached_conversations[message.author], model=copeai_backend.models.GPT_3, new_message=message.content, additional_args={ "max_tokens": int(os.environ['MAX_TOKEN_PER_REQUEST']), } ) typing.remove(message.channel) last_generation = 0 MSG = await message.reply('** **', view=views.GenerationState.GenerationStateView(views.GenerationState.GenerationState.GENERATING)) all_generated = [] async for response in req: print(response.text) if isinstance(response, copeai_backend.ConversationResponse): response = ''.join(response.text) else: all_generated.append(response.text) if last_generation < time.time(): compiled = ''.join(all_generated) last_generation = time.time() + 1.5 if len(compiled) > 2000: await MSG.edit(content=None, embed=discord.Embed(description=compiled, color=0xfce75d)) else: await MSG.edit(content=compiled) if len(response) > 2000: await MSG.edit(content=None, embed=discord.Embed(description=response, color=0xfce75d), view=views.GenerationState.GenerationStateView(views.GenerationState.GenerationState.FINISHED)) else: await MSG.edit(content=response, view=views.GenerationState.GenerationStateView(views.GenerationState.GenerationState.FINISHED)) with db: db.executemany('INSERT INTO message_history VALUES (?, ?, ?, ?, ?, ?)', [ (message.id, message.author.id, message.content, copeai_backend.conversation.text_to_tokens(message.content), 'user', int(message.created_at.timestamp())), (MSG.id, message.author.id, response, copeai_backend.conversation.text_to_tokens(response), 'assistant', int(time.time())) ] ) except Exception as e: traceback.print_exc() if message.channel in typing: typing.remove(message.channel) await message.reply('I just encountered an issue. Can you please report this problem to the administrator of the bot, or try again later?\n```py\n'+str(e)+'```') app.run(os.environ['TOKEN'])