| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183 | 
							- import logging
 
- from openai import Stream
 
- from openai.types.chat import ChatCompletionChunk, ChatCompletionMessage
 
- from app.core.runner.pub_handler import StreamEventHandler
 
- from app.core.runner.utils import message_util
 
- class LLMCallbackHandler:
 
-     """
 
-     LLM chat callback handler, handling message sending and message merging
 
-     """
 
-     def __init__(
 
-         self,
 
-         run_id: str,
 
-         on_step_create_func,
 
-         on_message_create_func,
 
-         event_handler: StreamEventHandler,
 
-     ) -> None:
 
-         super().__init__()
 
-         self.run_id = run_id
 
-         self.final_message_started = False
 
-         self.on_step_create_func = on_step_create_func
 
-         self.step = None
 
-         self.on_message_create_func = on_message_create_func
 
-         self.message = None
 
-         self.event_handler: StreamEventHandler = event_handler
 
-     def handle_llm_response(
 
-         self,
 
-         response_stream: Stream[ChatCompletionChunk],
 
-     ) -> ChatCompletionMessage:
 
-         """
 
-         Handle LLM response stream
 
-         :param response_stream: ChatCompletionChunk stream
 
-         :return: ChatCompletionMessage
 
-         """
 
-         message = ChatCompletionMessage(
 
-             content="", role="assistant", tool_calls=[], audio=None
 
-         )
 
-         index = 0
 
-         try:
 
-             for chunk in response_stream:
 
-                 logging.debug(chunk)
 
-                 if not chunk.choices:
 
-                     if chunk.usage:
 
-                         self.event_handler.pub_message_usage(chunk)
 
-                         continue
 
-                     continue
 
-                 choice = chunk.choices[0]
 
-                 # logging.debug(choice)
 
-                 delta = None
 
-                 if hasattr(choice, "delta"):
 
-                     delta = choice.delta
 
-                     # logging.debug(delta)
 
-                 elif hasattr(choice, "message"):
 
-                     delta = choice.message
 
-                     # logging.debug(delta)
 
-                 if not delta:
 
-                     if chunk.usage:
 
-                         self.event_handler.pub_message_usage(chunk)
 
-                         continue
 
-                     continue
 
-                 logging.debug(
 
-                     "delta.tool_callstool_callstool_callstool_callstool_callstool_callstool_callstool_callstool_callstool_calls"
 
-                 )
 
-                 logging.debug(delta.tool_calls)
 
-                     # call on delta message received
 
-                 if not self.final_message_started:
 
-                     self.final_message_started = True
 
-                     self.message = self.on_message_create_func(content="")
 
-                     self.step = self.on_step_create_func(self.message.id)
 
-                     logging.debug(
 
-                         "create message and step (%s), (%s)",
 
-                         self.message,
 
-                         self.step,
 
-                     )
 
-                     self.event_handler.pub_run_step_created(self.step)
 
-                     self.event_handler.pub_run_step_in_progress(self.step)
 
-                     self.event_handler.pub_message_created(self.message)
 
-                     self.event_handler.pub_message_in_progress(self.message)
 
-                 # merge tool call delta
 
-                 if delta.tool_calls:
 
-                     for tool_call_delta in delta.tool_calls:
 
-                         message_util.merge_tool_call_delta(
 
-                             message.tool_calls, tool_call_delta
 
-                         )
 
-                 elif delta.content is not None:
 
-                     # append message content delta
 
-                     message.content += delta.content
 
-                     self.event_handler.pub_message_delta(
 
-                         self.message.id, index, delta.content, delta.role
 
-                     )
 
-                 elif (
 
-                     hasattr(delta, "reasoning_content")
 
-                     and delta.reasoning_content is not None
 
-                 ):
 
-                     '''
 
-                     # call on delta message received
 
-                     if not self.final_message_started:
 
-                         self.final_message_started = True
 
-                         self.message = self.on_message_create_func(content="")
 
-                         self.step = self.on_step_create_func(self.message.id)
 
-                         logging.debug(
 
-                             "create message and step (%s), (%s)",
 
-                             self.message,
 
-                             self.step,
 
-                         )
 
-                         self.event_handler.pub_run_step_created(self.step)
 
-                         self.event_handler.pub_run_step_in_progress(self.step)
 
-                         self.event_handler.pub_message_created(self.message)
 
-                         self.event_handler.pub_message_in_progress(self.message)
 
-                     '''
 
-                     # append message content delta
 
-                     # message.reasoning_content += delta.reasoning_content
 
-                     self.event_handler.pub_message_delta(
 
-                         self.message.id,
 
-                         index,
 
-                         delta.content,
 
-                         delta.role,
 
-                         delta.reasoning_content,
 
-                     )
 
-                 elif hasattr(delta, "audio") and delta.audio is not None:
 
-                     '''
 
-                     if not self.final_message_started:
 
-                         self.final_message_started = True
 
-                         self.message = self.on_message_create_func(content="")
 
-                         self.step = self.on_step_create_func(self.message.id)
 
-                         logging.debug(
 
-                             "create message and step (%s), (%s)",
 
-                             self.message,
 
-                             self.step,
 
-                         )
 
-                         self.event_handler.pub_run_step_created(self.step)
 
-                         self.event_handler.pub_run_step_in_progress(self.step)
 
-                         self.event_handler.pub_message_created(self.message)
 
-                         self.event_handler.pub_message_in_progress(self.message)
 
-                     '''
 
-                     """
 
-                                     if 'transcript' in chunk.choices[0].delta.audio:
 
-                                         print(chunk.choices[0].delta.audio['transcript'])
 
-                                         text_chunk = chunk.choices[0].delta.audio['transcript']
 
-                                         yield "text", text_chunk
 
-                                     if 'data' in chunk.choices[0].delta.audio:
 
-                                         audio_chunk = chunk.choices[0].delta.audio['data']
 
-                                         yield "audio", base64.b64decode(audio_chunk)
 
-                     """
 
-                     # append message content delta
 
-                     # message.audio += delta.audio
 
-                     self.event_handler.pub_message_delta(
 
-                         self.message.id,
 
-                         index,
 
-                         delta.content,
 
-                         delta.role,
 
-                         None,
 
-                         delta.audio,
 
-                     )
 
-                 if chunk.usage:
 
-                     self.event_handler.pub_message_usage(chunk)
 
-                     continue
 
-         except Exception as e:
 
-             logging.error("handle_llm_response error: %s", e)
 
-             raise e
 
-         print(
 
-             "handle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_responsehandle_llm_response"
 
-         )
 
-         print(message)
 
-         return message
 
 
  |