llm_callback_handler.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. import logging
  2. from openai import Stream
  3. from openai.types.chat import ChatCompletionChunk, ChatCompletionMessage
  4. from app.core.runner.pub_handler import StreamEventHandler
  5. from app.core.runner.utils import message_util
  6. class LLMCallbackHandler:
  7. """
  8. LLM chat callback handler, handling message sending and message merging
  9. """
  10. def __init__(
  11. self, run_id: str, on_step_create_func, on_message_create_func, event_handler: StreamEventHandler
  12. ) -> None:
  13. super().__init__()
  14. self.run_id = run_id
  15. self.final_message_started = False
  16. self.on_step_create_func = on_step_create_func
  17. self.step = None
  18. self.on_message_create_func = on_message_create_func
  19. self.message = None
  20. self.event_handler: StreamEventHandler = event_handler
  21. def handle_llm_response(
  22. self,
  23. response_stream: Stream[ChatCompletionChunk],
  24. ) -> ChatCompletionMessage:
  25. """
  26. Handle LLM response stream
  27. :param response_stream: ChatCompletionChunk stream
  28. :return: ChatCompletionMessage
  29. """
  30. message = ChatCompletionMessage(content="", role="assistant", tool_calls=[])
  31. index = 0
  32. try:
  33. for chunk in response_stream:
  34. logging.debug(chunk)
  35. if chunk.usage:
  36. self.event_handler.pub_message_usage(chunk)
  37. continue
  38. if not chunk.choices:
  39. continue
  40. choice = chunk.choices[0]
  41. delta = choice.delta
  42. if not delta:
  43. continue
  44. # merge tool call delta
  45. if delta.tool_calls:
  46. for tool_call_delta in delta.tool_calls:
  47. message_util.merge_tool_call_delta(message.tool_calls, tool_call_delta)
  48. elif delta.content:
  49. # call on delta message received
  50. if not self.final_message_started:
  51. self.final_message_started = True
  52. self.message = self.on_message_create_func(content="")
  53. self.step = self.on_step_create_func(self.message.id)
  54. logging.debug("create message and step (%s), (%s)", self.message, self.step)
  55. self.event_handler.pub_run_step_created(self.step)
  56. self.event_handler.pub_run_step_in_progress(self.step)
  57. self.event_handler.pub_message_created(self.message)
  58. self.event_handler.pub_message_in_progress(self.message)
  59. # append message content delta
  60. message.content += delta.content
  61. self.event_handler.pub_message_delta(self.message.id, index, delta.content, delta.role)
  62. except Exception as e:
  63. logging.error("handle_llm_response error: %s", e)
  64. raise e
  65. return message