| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482 | 
							- from functools import partial
 
- import logging
 
- import json
 
- from typing import List
 
- from concurrent.futures import Executor
 
- from sqlalchemy.orm import Session
 
- from app.models.token_relation import RelationType
 
- from config.config import settings
 
- from config.llm import llm_settings, tool_settings
 
- from app.core.runner.llm_backend import LLMBackend
 
- from app.core.runner.llm_callback_handler import LLMCallbackHandler
 
- from app.core.runner.memory import Memory, find_memory
 
- from app.core.runner.pub_handler import StreamEventHandler
 
- from app.core.runner.utils import message_util as msg_util
 
- from app.core.runner.utils.tool_call_util import (
 
-     tool_call_recognize,
 
-     internal_tool_call_invoke,
 
-     tool_call_request,
 
-     tool_call_id,
 
-     tool_call_output,
 
- )
 
- from app.core.tools import find_tools, BaseTool
 
- from app.libs.thread_executor import get_executor_for_config, run_with_executor
 
- from app.models.message import Message, MessageUpdate
 
- from app.models.run import Run
 
- from app.models.run_step import RunStep
 
- from app.models.token_relation import RelationType
 
- from app.services.assistant.assistant import AssistantService
 
- from app.services.file.file import FileService
 
- from app.services.message.message import MessageService
 
- from app.services.run.run import RunService
 
- from app.services.run.run_step import RunStepService
 
- from app.services.token.token import TokenService
 
- from app.services.token.token_relation import TokenRelationService
 
- class ThreadRunner:
 
-     """
 
-     ThreadRunner 封装 run 的执行逻辑
 
-     """
 
-     tool_executor: Executor = get_executor_for_config(
 
-         tool_settings.TOOL_WORKER_NUM, "tool_worker_"
 
-     )
 
-     def __init__(
 
-         self, run_id: str, token_id: str, session: Session, stream: bool = False
 
-     ):
 
-         self.run_id = run_id
 
-         self.token_id = token_id
 
-         self.session = session
 
-         self.stream = stream
 
-         self.max_step = llm_settings.LLM_MAX_STEP
 
-         self.event_handler: StreamEventHandler = None
 
-     def run(self):
 
-         """
 
-         完成一次 run 的执行,基本步骤
 
-         1. 初始化,获取 run 以及相关 tools, 构造 system instructions;
 
-         2. 开始循环,查询已有 run step, 进行 chat message 生成;
 
-         3. 调用 llm 并解析返回结果;
 
-         4. 根据返回结果,生成新的 run step(tool calls 处理) 或者 message
 
-         """
 
-         # TODO: 重构,将 run 的状态变更逻辑放到 RunService 中
 
-         run = RunService.get_run_sync(session=self.session, run_id=self.run_id)
 
-         self.event_handler = StreamEventHandler(
 
-             run_id=self.run_id, is_stream=self.stream
 
-         )
 
-         run = RunService.to_in_progress(session=self.session, run_id=self.run_id)
 
-         self.event_handler.pub_run_in_progress(run)
 
-         logging.info("processing ThreadRunner task, run_id: %s", self.run_id)
 
-         # get memory from assistant metadata
 
-         # format likes {"memory": {"type": "window", "window_size": 20, "max_token_size": 4000}}
 
-         ast = AssistantService.get_assistant_sync(
 
-             session=self.session, assistant_id=run.assistant_id
 
-         )
 
-         metadata = ast.metadata_ or {}
 
-         memory = find_memory(metadata.get("memory", {}))
 
-         instructions = (
 
-             [run.instructions or ""] if run.instructions else [ast.instructions or ""]
 
-         )
 
-         asst_ids = []
 
-         ids = []
 
-         if ast.tool_resources and "file_search" in ast.tool_resources:
 
-             ids = (
 
-                 ast.tool_resources.get("file_search")
 
-                 .get("vector_stores")[0]
 
-                 .get("folder_ids")
 
-             )
 
-             if ids:
 
-                 asst_ids += ids
 
-             ids = (
 
-                 ast.tool_resources.get("file_search")
 
-                 .get("vector_stores")[0]
 
-                 .get("file_ids")
 
-             )
 
-             if ids:
 
-                 asst_ids += ids
 
-         if len(asst_ids) > 0:
 
-             if len(run.file_ids) > 0:
 
-                 run.tools.append({"type": "knowledge_search"})
 
-             else:
 
-                 for tool in run.tools:
 
-                     if tool.get("type") == "file_search":
 
-                         tool["type"] = "knowledge_search"
 
-         tools = find_tools(run, self.session)
 
-         for tool in tools:
 
-             tool.configure(session=self.session, run=run)
 
-             instruction_supplement = tool.instruction_supplement()
 
-             if instruction_supplement:
 
-                 instructions += [instruction_supplement or ""]
 
-         instruction = "\n".join(instructions)
 
-         llm = self.__init_llm_backend(run.assistant_id)
 
-         loop = True
 
-         while loop:
 
-             print(
 
-                 "looplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooplooploop"
 
-             )
 
-             run_steps = RunStepService.get_run_step_list(
 
-                 session=self.session, run_id=self.run_id, thread_id=run.thread_id
 
-             )
 
-             loop = self.__run_step(llm, run, run_steps, instruction, tools, memory)
 
-         # 任务结束
 
-         self.event_handler.pub_run_completed(run)
 
-         self.event_handler.pub_done()
 
-     def __run_step(
 
-         self,
 
-         llm: LLMBackend,
 
-         run: Run,
 
-         run_steps: List[RunStep],
 
-         instruction: str,
 
-         tools: List[BaseTool],
 
-         memory: Memory,
 
-     ):
 
-         """
 
-         执行 run step
 
-         """
 
-         logging.info("step %d is running", len(run_steps) + 1)
 
-         if instruction == "":
 
-             instruction = (
 
-                 "You are a multilingual AI assistant.\n"
 
-                 + "- Detect user language; reply in same language unless told otherwise.\n"
 
-                 + "- Default to English if detection is unclear.\n"
 
-                 + "- Give concise, accurate, and safe answers; admit when unsure.\n"
 
-                 + "- Keep tone and style consistent; adapt examples to user's context.\n"
 
-                 + "- For code, include explanations and comments in user's language.\n"
 
-                 + "- If a question is ambiguous, ask for clarification.\n"
 
-             )
 
-         assistant_system_message = [msg_util.system_message(instruction)]
 
-         # 获取已有 message 上下文记录
 
-         chat_messages = self.__generate_chat_messages(
 
-             MessageService.get_message_list(
 
-                 session=self.session, thread_id=run.thread_id
 
-             ),
 
-             run,
 
-         )
 
-         tool_call_messages = []
 
-         for step in run_steps:
 
-             if step.type == "tool_calls" and step.status == "completed":
 
-                 tool_call_messages += (
 
-                     self.__convert_assistant_tool_calls_to_chat_messages(step)
 
-                 )
 
-         # tool_call_messages = tool_call_messages
 
-         # memory
 
-         messages = (
 
-             assistant_system_message
 
-             + memory.integrate_context(chat_messages)
 
-             + tool_call_messages
 
-         )
 
-         logging.info("messages: run %s", run)
 
-         logging.info(messages)
 
-         logging.info(tools)
 
-         response_stream = llm.run(
 
-             messages=messages,
 
-             model=run.model,
 
-             tools=[tool.openai_function for tool in tools],
 
-             tool_choice="auto" if len(run_steps) < self.max_step else "none",
 
-             stream=self.stream,
 
-             stream_options=run.stream_options,
 
-             extra_body=run.extra_body,
 
-             temperature=run.temperature,
 
-             top_p=run.top_p,
 
-             response_format=run.response_format,
 
-             parallel_tool_calls=run.parallel_tool_calls,
 
-             audio=run.audio,
 
-             modalities=run.modalities,
 
-         )
 
-         # create message callback
 
-         create_message_callback = partial(
 
-             MessageService.new_message,
 
-             session=self.session,
 
-             assistant_id=run.assistant_id,
 
-             thread_id=run.thread_id,
 
-             run_id=run.id,
 
-             role="assistant",
 
-         )
 
-         # create 'message creation' run step callback
 
-         def _create_message_creation_run_step(message_id):
 
-             return RunStepService.new_run_step(
 
-                 session=self.session,
 
-                 type="message_creation",
 
-                 assistant_id=run.assistant_id,
 
-                 thread_id=run.thread_id,
 
-                 run_id=run.id,
 
-                 step_details={
 
-                     "type": "message_creation",
 
-                     "message_creation": {"message_id": message_id},
 
-                 },
 
-             )
 
-         llm_callback_handler = LLMCallbackHandler(
 
-             run_id=run.id,
 
-             on_step_create_func=_create_message_creation_run_step,
 
-             on_message_create_func=create_message_callback,
 
-             event_handler=self.event_handler,
 
-         )
 
-         if self.stream == False and hasattr(response_stream, "choices"):
 
-             response_stream = [response_stream]
 
-         response_msg = llm_callback_handler.handle_llm_response(response_stream)
 
-         print("response_msg================================================================response_msg")
 
-         print(llm_callback_handler)
 
-         message_creation_run_step = llm_callback_handler.step
 
-         print("444444444444444444444444455555555577777777777777777777777")
 
-         logging.info("chat_response_message: %s", response_msg)
 
-         if msg_util.is_tool_call(response_msg):
 
-             # tool & tool_call definition dict
 
-             tool_calls = [
 
-                 tool_call_recognize(tool_call, tools)
 
-                 for tool_call in response_msg.tool_calls
 
-             ]
 
-             # new run step for tool calls
 
-             new_run_step = RunStepService.new_run_step(
 
-                 session=self.session,
 
-                 type="tool_calls",
 
-                 assistant_id=run.assistant_id,
 
-                 thread_id=run.thread_id,
 
-                 run_id=run.id,
 
-                 step_details={
 
-                     "type": "tool_calls",
 
-                     "tool_calls": [tool_call_dict for _, tool_call_dict in tool_calls],
 
-                 },
 
-             )
 
-             self.event_handler.pub_run_step_created(new_run_step)
 
-             self.event_handler.pub_run_step_in_progress(new_run_step)
 
-             internal_tool_calls = list(
 
-                 filter(lambda _tool_calls: _tool_calls[0] is not None, tool_calls)
 
-             )
 
-             external_tool_call_dict = [
 
-                 tool_call_dict for tool, tool_call_dict in tool_calls if tool is None
 
-             ]
 
-             # 为减少线程同步逻辑,依次处理内/外 tool_call 调用
 
-             if internal_tool_calls:
 
-                 try:
 
-                     print(
 
-                         "==========================internal_tool_callsinternal_tool_callsinternal_tool_calls"
 
-                     )
 
-                     print(internal_tool_calls)
 
-                     ## 线程执行有问题 可以改成异步, 这里如果是filesearch要确定只执行一次
 
-                     tool_calls_with_outputs = run_with_executor(
 
-                         executor=ThreadRunner.tool_executor,
 
-                         func=internal_tool_call_invoke,
 
-                         tasks=internal_tool_calls,
 
-                         timeout=tool_settings.TOOL_WORKER_EXECUTION_TIMEOUT,
 
-                     )
 
-                     new_run_step = RunStepService.update_step_details(
 
-                         session=self.session,
 
-                         run_step_id=new_run_step.id,
 
-                         step_details={
 
-                             "type": "tool_calls",
 
-                             "tool_calls": tool_calls_with_outputs,
 
-                         },
 
-                         completed=not external_tool_call_dict,
 
-                     )
 
-                     print("llm_callback_handler.message================================================================llm_callback_handler.message")
 
-                     print(llm_callback_handler.message)
 
-                     self.event_handler.pub_message_delta_tool(
 
-                         message_id=llm_callback_handler.message.id,
 
-                         index=0,
 
-                         content=json.dumps(tool_calls_with_outputs)
 
-                     )
 
-                 except Exception as e:
 
-                     RunStepService.to_failed(
 
-                         session=self.session, run_step_id=new_run_step.id, last_error=e
 
-                     )
 
-                     raise e
 
-             print(
 
-                 "aaaaaaaaaaaaaaa===============================================================8888888888888888888888888"
 
-             )
 
-             print(external_tool_call_dict)
 
-             if external_tool_call_dict:
 
-                 # run 设置为 action required,等待业务完成更新并再次拉起
 
-                 run = RunService.to_requires_action(
 
-                     session=self.session,
 
-                     run_id=run.id,
 
-                     required_action={
 
-                         "type": "submit_tool_outputs",
 
-                         "submit_tool_outputs": {"tool_calls": external_tool_call_dict},
 
-                     },
 
-                 )
 
-                 self.event_handler.pub_run_step_delta(
 
-                     step_id=new_run_step.id,
 
-                     step_details={
 
-                         "type": "tool_calls",
 
-                         "tool_calls": external_tool_call_dict,
 
-                     },
 
-                 )
 
-                 print(run)
 
-                 self.event_handler.pub_run_requires_action(run)
 
-             else:
 
-                 self.event_handler.pub_run_step_completed(new_run_step)
 
-                 return True
 
-         else:
 
-             if response_msg.content == "":
 
-                 response_msg.content = (
 
-                     '[{"text": {"value": "", "annotations": []}, "type": "text"}]'
 
-                 )
 
-             # 无 tool call 信息,message 生成结束,更新状态
 
-             new_message = MessageService.modify_message_sync(
 
-                 session=self.session,
 
-                 thread_id=run.thread_id,
 
-                 message_id=llm_callback_handler.message.id,
 
-                 body=MessageUpdate(content=response_msg.content),
 
-             )
 
-             self.event_handler.pub_message_completed(new_message)
 
-             new_step = RunStepService.update_step_details(
 
-                 session=self.session,
 
-                 run_step_id=message_creation_run_step.id,
 
-                 step_details={
 
-                     "type": "message_creation",
 
-                     "message_creation": {"message_id": new_message.id},
 
-                 },
 
-                 completed=True,
 
-             )
 
-             RunService.to_completed(session=self.session, run_id=run.id)
 
-             self.event_handler.pub_run_step_completed(new_step)
 
-         return False
 
-     def __init_llm_backend(self, assistant_id):
 
-         print("settings.AUTH_ENABLE", settings.AUTH_ENABLE)
 
-         if settings.AUTH_ENABLE:
 
-             # init llm backend with token id
 
-             if self.token_id:
 
-                 token_id = self.token_id
 
-             else:
 
-                 token_id = TokenRelationService.get_token_id_by_relation(
 
-                     session=self.session,
 
-                     relation_type=RelationType.Assistant,
 
-                     relation_id=assistant_id,
 
-                 )
 
-             print(
 
-                 "token_idtoken_idtoken_idtoken_idtoken_idtoken_idtoken_idtoken_idtoken_idtoken_idtoken_idtoken_id"
 
-             )
 
-             print(self.token_id)
 
-             print(token_id)
 
-             try:
 
-                 if token_id is not None and len(token_id) > 0:
 
-                     token = TokenService.get_token_by_id(self.session, token_id)
 
-                     print(token)
 
-                     return LLMBackend(
 
-                         base_url=token.llm_base_url, api_key=token.llm_api_key
 
-                     )
 
-             except Exception as e:
 
-                 print(e)
 
-             token = {
 
-                 "llm_base_url": "http://172.16.12.13:3000/v1",
 
-                 "llm_api_key": "sk-vTqeBKDC2j6osbGt89A2202dAd1c4fE8B1D294388b569e54",
 
-             }
 
-             return LLMBackend(
 
-                 base_url=token.get("llm_base_url"), api_key=token.get("llm_api_key")
 
-             )
 
-         else:
 
-             # init llm backend with llm settings
 
-             return LLMBackend(
 
-                 base_url=llm_settings.OPENAI_API_BASE,
 
-                 api_key=llm_settings.OPENAI_API_KEY,
 
-             )
 
-     def __generate_chat_messages(self, messages: List[Message], run: Run):
 
-         """
 
-         根据历史信息生成 chat message
 
-         """
 
-         chat_messages = []
 
-         is_audio_num = 0
 
-         for message in messages:
 
-             role = message.role
 
-             if role == "user":
 
-                 message_content = []
 
-                 """
 
-                 if message.file_ids:
 
-                     files = FileService.get_file_list_by_ids(
 
-                         session=self.session, file_ids=message.file_ids
 
-                     )
 
-                     for file in files:
 
-                         chat_messages.append(
 
-                             msg_util.new_message(
 
-                                 role,
 
-                                 f'The file "{file.filename}" can be used as a reference',
 
-                             )
 
-                         )
 
-                 else:
 
-                 """
 
-                 for content in message.content:
 
-                     if content["type"] == "text":
 
-                         message_content.append(
 
-                             {"type": "text", "text": content["text"]["value"]}
 
-                         )
 
-                     elif content["type"] == "image_url" and run.audio is None:
 
-                         message_content.append(content)
 
-                     elif (
 
-                         content.get("type") == "input_audio"
 
-                         and run.audio is not None
 
-                         and is_audio_num < 2
 
-                     ):
 
-                         message_content.append(content)
 
-                         is_audio_num += 1
 
-                 chat_messages.append(msg_util.new_message(role, message_content))
 
-             elif role == "assistant":
 
-                 message_content = ""
 
-                 for content in message.content:
 
-                     if content["type"] == "text":
 
-                         message_content += content["text"]["value"]
 
-                 if message_content == "":
 
-                     message_content = (
 
-                         "You are a multilingual AI assistant.\n"
 
-                         + "- Detect user language; reply in same language unless told otherwise.\n"
 
-                         + "- Default to English if detection is unclear.\n"
 
-                         + "- Give concise, accurate, and safe answers; admit when unsure.\n"
 
-                         + "- Keep tone and style consistent; adapt examples to user's context.\n"
 
-                         + "- For code, include explanations and comments in user's language.\n"
 
-                         + "- If a question is ambiguous, ask for clarification.\n"
 
-                     )
 
-                 chat_messages.append(msg_util.new_message(role, message_content))
 
-         chat_messages.reverse()  # 倒序排列,最新的消息在前面
 
-         return chat_messages  # 暂时只支持5条消息,后续正价token上限
 
-     def __convert_assistant_tool_calls_to_chat_messages(self, run_step: RunStep):
 
-         """
 
-         根据 run step 执行结果生成 message 信息
 
-         每个 tool call run step 包含两部分,调用与结果(结果可能为多个信息)
 
-         """
 
-         tool_calls = run_step.step_details["tool_calls"]
 
-         tool_call_requests = [
 
-             msg_util.tool_calls(
 
-                 [tool_call_request(tool_call) for tool_call in tool_calls]
 
-             )
 
-         ]
 
-         tool_call_outputs = [
 
-             msg_util.tool_call_result(
 
-                 tool_call_id(tool_call), tool_call_output(tool_call)
 
-             )
 
-             for tool_call in tool_calls
 
-         ]
 
-         return tool_call_requests + tool_call_outputs
 
 
  |