123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925 |
- import copy
- import json
- import logging
- import os
- import time
- import uuid
- from typing import (
- Any,
- AsyncGenerator,
- Generator,
- Optional,
- )
- from anthropic import Anthropic, AsyncAnthropic
- from anthropic.types import (
- ContentBlockStopEvent,
- Message,
- MessageStopEvent,
- RawContentBlockDeltaEvent,
- RawContentBlockStartEvent,
- RawMessageStartEvent,
- ToolUseBlock,
- )
- from core.base.abstractions import GenerationConfig, LLMChatCompletion
- from core.base.providers.llm import CompletionConfig, CompletionProvider
- from .utils import resize_base64_image
- logger = logging.getLogger(__name__)
- def generate_tool_id() -> str:
- """Generate a unique tool ID using UUID4."""
- return f"tool_{uuid.uuid4().hex[:12]}"
- def process_images_in_message(message: dict) -> dict:
- """
- Process all images in a message to ensure they're within Anthropic's recommended limits.
- """
- if not message or not isinstance(message, dict):
- return message
- # Handle nested image_data (old format)
- if (
- message.get("role")
- and message.get("image_data")
- and isinstance(message["image_data"], dict)
- ):
- if message["image_data"].get("data") and message["image_data"].get(
- "media_type"
- ):
- message["image_data"]["data"] = resize_base64_image(
- message["image_data"]["data"]
- )
- return message
- # Handle standard content list format
- if message.get("content") and isinstance(message["content"], list):
- for i, block in enumerate(message["content"]):
- if isinstance(block, dict) and block.get("type") == "image":
- if block.get("source", {}).get("type") == "base64" and block[
- "source"
- ].get("data"):
- message["content"][i]["source"]["data"] = (
- resize_base64_image(block["source"]["data"])
- )
- # Handle string content with base64 image detection (less common)
- elif (
- message.get("content")
- and isinstance(message["content"], str)
- and ";base64," in message["content"]
- ):
- # This is a basic detection for base64 images in text - might need more robust handling
- logger.warning(
- "Detected potential base64 image in string content - not auto-resizing"
- )
- return message
- def openai_message_to_anthropic_block(msg: dict) -> dict:
- """Converts a single OpenAI-style message (including function/tool calls)
- into one Anthropic-style message.
- Expected keys in `msg` can include:
- - role: "system" | "assistant" | "user" | "function" | "tool"
- - content: str (possibly JSON arguments or the final text)
- - name: str (tool/function name)
- - tool_call_id or function_call arguments
- - function_call: {"name": ..., "arguments": "..."}
- """
- role = msg.get("role", "")
- content = msg.get("content", "")
- tool_call_id = msg.get("tool_call_id")
- # Handle old-style image_data field
- image_data = msg.get("image_data")
- # Handle nested image_url (less common)
- image_url = msg.get("image_url")
- if role == "system":
- # System messages should not have images, extract any image to a separate user message
- if image_url or image_data:
- logger.warning(
- "Found image in system message - images should be in user messages only"
- )
- return msg
- if role in ["user", "assistant"]:
- # If content is already a list, assume it's properly formatted
- if isinstance(content, list):
- return {"role": role, "content": content}
- # Process old-style image_data or image_url
- if image_url or image_data:
- formatted_content = []
- # Add image content first (as recommended by Anthropic)
- if image_url:
- formatted_content.append(
- {
- "type": "image",
- "source": {"type": "url", "url": image_url},
- }
- )
- elif image_data:
- # Resize the image data if needed
- resized_data = image_data.get("data", "")
- if resized_data:
- resized_data = resize_base64_image(resized_data)
- formatted_content.append(
- {
- "type": "image",
- "source": {
- "type": "base64",
- "media_type": image_data.get(
- "media_type", "image/jpeg"
- ),
- "data": resized_data,
- },
- }
- )
- # Add text content after the image
- if content:
- if isinstance(content, str):
- formatted_content.append({"type": "text", "text": content})
- elif isinstance(content, list):
- # If it's already a list, extend with it
- formatted_content.extend(content)
- return {"role": role, "content": formatted_content}
- if role in ["function", "tool"]:
- return {
- "role": "user",
- "content": [
- {
- "type": "tool_result",
- "tool_use_id": tool_call_id,
- "content": content,
- }
- ],
- }
- return {"role": role, "content": content}
- class AnthropicCompletionProvider(CompletionProvider):
- def __init__(self, config: CompletionConfig, *args, **kwargs) -> None:
- super().__init__(config)
- self.client = Anthropic()
- self.async_client = AsyncAnthropic()
- logger.debug("AnthropicCompletionProvider initialized successfully")
- def _get_base_args(
- self, generation_config: GenerationConfig
- ) -> dict[str, Any]:
- """Build the arguments dictionary for Anthropic's messages.create().
- Handles tool configuration according to Anthropic's schema:
- {
- "type": "function", # Use 'function' type for custom tools
- "name": "tool_name",
- "description": "tool description",
- "parameters": { # Note: Anthropic expects 'parameters', not 'input_schema'
- "type": "object",
- "properties": {...},
- "required": [...]
- }
- }
- """
- model_str = generation_config.model or ""
- model_name = (
- model_str.split("anthropic/")[-1]
- if model_str
- else "claude-3-opus-20240229"
- )
- args: dict[str, Any] = {
- "model": model_name,
- "temperature": generation_config.temperature,
- "max_tokens": generation_config.max_tokens_to_sample,
- "stream": generation_config.stream,
- }
- if generation_config.top_p:
- args["top_p"] = generation_config.top_p
- if generation_config.tools is not None:
- # Convert tools to Anthropic's format
- anthropic_tools: list[dict[str, Any]] = []
- for tool in generation_config.tools:
- tool_def = {
- "name": tool["function"]["name"],
- "description": tool["function"]["description"],
- "input_schema": tool["function"]["parameters"],
- }
- anthropic_tools.append(tool_def)
- args["tools"] = anthropic_tools
- if hasattr(generation_config, "tool_choice"):
- tool_choice = generation_config.tool_choice
- if isinstance(tool_choice, str):
- if tool_choice == "auto":
- args["tool_choice"] = {"type": "auto"}
- elif tool_choice == "any":
- args["tool_choice"] = {"type": "any"}
- elif isinstance(tool_choice, dict):
- if tool_choice.get("type") == "function":
- args["tool_choice"] = {
- "type": "function",
- "name": tool_choice.get("name"),
- }
- if hasattr(generation_config, "disable_parallel_tool_use"):
- args["tool_choice"] = args.get("tool_choice", {})
- args["tool_choice"]["disable_parallel_tool_use"] = (
- generation_config.disable_parallel_tool_use
- )
- # --- Extended Thinking Support ---
- if getattr(generation_config, "extended_thinking", False):
- if (
- not hasattr(generation_config, "thinking_budget")
- or generation_config.thinking_budget is None
- ):
- raise ValueError(
- "Extended thinking is enabled but no thinking_budget is provided."
- )
- if (
- generation_config.thinking_budget
- >= generation_config.max_tokens_to_sample
- ):
- raise ValueError(
- "thinking_budget must be less than max_tokens_to_sample."
- )
- args["thinking"] = {
- "type": "enabled",
- "budget_tokens": generation_config.thinking_budget,
- }
- return args
- def _preprocess_messages(self, messages: list[dict]) -> list[dict]:
- """
- Preprocess all messages to optimize images before sending to Anthropic API.
- """
- if not messages or not isinstance(messages, list):
- return messages
- processed_messages = []
- for message in messages:
- processed_message = process_images_in_message(message)
- processed_messages.append(processed_message)
- return processed_messages
- def _create_openai_style_message(self, content_blocks, tool_calls=None):
- """
- Create an OpenAI-style message from Anthropic content blocks
- while preserving the original structure.
- """
- display_content = ""
- structured_content: list[Any] = []
- for block in content_blocks:
- if block.type == "text":
- display_content += block.text
- elif block.type == "thinking" and hasattr(block, "thinking"):
- # Store the complete thinking block
- structured_content.append(
- {
- "type": "thinking",
- "thinking": block.thinking,
- "signature": block.signature,
- }
- )
- # For display/logging
- # display_content += f"<think>{block.thinking}</think>"
- elif block.type == "redacted_thinking" and hasattr(block, "data"):
- # Store the complete redacted thinking block
- structured_content.append(
- {"type": "redacted_thinking", "data": block.data}
- )
- # Add a placeholder for display/logging
- display_content += "<redacted thinking block>"
- elif block.type == "tool_use":
- # Tool use blocks are handled separately via tool_calls
- pass
- # If we have structured content (thinking blocks), use that
- if structured_content:
- # Add any text block at the end if needed
- for block in content_blocks:
- if block.type == "text":
- structured_content.append(
- {"type": "text", "text": block.text}
- )
- return {
- "content": display_content or None,
- "structured_content": structured_content,
- }
- else:
- # If no structured content, just return the display content
- return {"content": display_content or None}
- def _convert_to_chat_completion(self, anthropic_msg: Message) -> dict:
- """
- Convert a non-streaming Anthropic Message into an OpenAI-style dict.
- Preserves thinking blocks for proper handling.
- """
- tool_calls: list[Any] = []
- message_data: dict[str, Any] = {"role": anthropic_msg.role}
- if anthropic_msg.content:
- # First, extract any tool use blocks
- for block in anthropic_msg.content:
- if hasattr(block, "type") and block.type == "tool_use":
- tool_calls.append(
- {
- "index": len(tool_calls),
- "id": block.id,
- "type": "function",
- "function": {
- "name": block.name,
- "arguments": json.dumps(block.input),
- },
- }
- )
- # Then create the message with appropriate content
- message_data.update(
- self._create_openai_style_message(
- anthropic_msg.content, tool_calls
- )
- )
- # If we have tool calls, add them
- if tool_calls:
- message_data["tool_calls"] = tool_calls
- finish_reason = (
- "stop"
- if anthropic_msg.stop_reason == "end_turn"
- else anthropic_msg.stop_reason
- )
- finish_reason = (
- "tool_calls"
- if anthropic_msg.stop_reason == "tool_use"
- else finish_reason
- )
- model_str = anthropic_msg.model or ""
- model_name = model_str.split("anthropic/")[-1] if model_str else ""
- return {
- "id": anthropic_msg.id,
- "object": "chat.completion",
- "created": int(time.time()),
- "model": model_name,
- "usage": {
- "prompt_tokens": (
- anthropic_msg.usage.input_tokens
- if anthropic_msg.usage
- else 0
- ),
- "completion_tokens": (
- anthropic_msg.usage.output_tokens
- if anthropic_msg.usage
- else 0
- ),
- "total_tokens": (
- (
- anthropic_msg.usage.input_tokens
- if anthropic_msg.usage
- else 0
- )
- + (
- anthropic_msg.usage.output_tokens
- if anthropic_msg.usage
- else 0
- )
- ),
- },
- "choices": [
- {
- "index": 0,
- "message": message_data,
- "finish_reason": finish_reason,
- }
- ],
- }
- def _split_system_messages(
- self, messages: list[dict]
- ) -> tuple[list[dict], Optional[str]]:
- """
- Process messages for Anthropic API, ensuring proper format for tool use and thinking blocks.
- Now with image optimization.
- """
- # First preprocess to resize any images
- messages = self._preprocess_messages(messages)
- system_msg = None
- filtered: list[dict[str, Any]] = []
- pending_tool_results: list[dict[str, Any]] = []
- # Look for pairs of tool_use and tool_result
- i = 0
- while i < len(messages):
- m = copy.deepcopy(messages[i])
- # Handle system message
- if m["role"] == "system" and system_msg is None:
- system_msg = m["content"]
- i += 1
- continue
- # Case 1: Message with list format content (thinking blocks or tool blocks)
- if (
- isinstance(m.get("content"), list)
- and len(m["content"]) > 0
- and isinstance(m["content"][0], dict)
- ):
- filtered.append({"role": m["role"], "content": m["content"]})
- i += 1
- continue
- # Case 2: Message with structured_content field
- elif m.get("structured_content") and m["role"] == "assistant":
- filtered.append(
- {"role": "assistant", "content": m["structured_content"]}
- )
- i += 1
- continue
- # Case 3: Tool calls in an assistant message
- elif m.get("tool_calls") and m["role"] == "assistant":
- # Add content if it exists
- if m.get("content") and not isinstance(m["content"], list):
- content_to_add = m["content"]
- # Handle content with thinking tags
- if "<think>" in content_to_add:
- thinking_start = content_to_add.find("<think>")
- thinking_end = content_to_add.find("</think>")
- if (
- thinking_start >= 0
- and thinking_end > thinking_start
- ):
- thinking_content = content_to_add[
- thinking_start + 7 : thinking_end
- ]
- text_content = content_to_add[
- thinking_end + 8 :
- ].strip()
- filtered.append(
- {
- "role": "assistant",
- "content": [
- {
- "type": "thinking",
- "thinking": thinking_content,
- "signature": "placeholder_signature", # This is a placeholder
- },
- {"type": "text", "text": text_content},
- ],
- }
- )
- else:
- filtered.append(
- {
- "role": "assistant",
- "content": content_to_add,
- }
- )
- else:
- filtered.append(
- {"role": "assistant", "content": content_to_add}
- )
- # Add tool use blocks
- tool_uses = []
- for call in m["tool_calls"]:
- tool_uses.append(
- {
- "type": "tool_use",
- "id": call["id"],
- "name": call["function"]["name"],
- "input": json.loads(call["function"]["arguments"]),
- }
- )
- filtered.append({"role": "assistant", "content": tool_uses})
- # Check if next message is a tool result for this tool call
- if i + 1 < len(messages) and messages[i + 1]["role"] in [
- "function",
- "tool",
- ]:
- next_m = copy.deepcopy(messages[i + 1])
- # Make sure this is a tool result for the current tool use
- if next_m.get("tool_call_id") in [
- call["id"] for call in m["tool_calls"]
- ]:
- # Add tool result as a user message
- filtered.append(
- {
- "role": "user",
- "content": [
- {
- "type": "tool_result",
- "tool_use_id": next_m["tool_call_id"],
- "content": next_m["content"],
- }
- ],
- }
- )
- i += 2 # Skip both the tool call and result
- continue
- i += 1
- continue
- # Case 4: Direct tool result (might be missing its paired tool call)
- elif m["role"] in ["function", "tool"] and m.get("tool_call_id"):
- # Add a user message with the tool result
- filtered.append(
- {
- "role": "user",
- "content": [
- {
- "type": "tool_result",
- "tool_use_id": m["tool_call_id"],
- "content": m["content"],
- }
- ],
- }
- )
- i += 1
- continue
- # Default case: normal message
- elif m["role"] in ["function", "tool"]:
- # Collect tool results to combine them
- pending_tool_results.append(
- {
- "type": "tool_result",
- "tool_use_id": m.get("tool_call_id"),
- "content": m["content"],
- }
- )
- # If we have all expected results, add them as one message
- if len(filtered) > 0 and len(
- filtered[-1].get("content", [])
- ) == len(pending_tool_results):
- filtered.append(
- {"role": "user", "content": pending_tool_results}
- )
- pending_tool_results = []
- else:
- filtered.append(openai_message_to_anthropic_block(m))
- i += 1
- # Final validation: ensure no tool_use is at the end without a tool_result
- if filtered and len(filtered) > 1:
- last_msg = filtered[-1]
- if (
- last_msg["role"] == "assistant"
- and isinstance(last_msg.get("content"), list)
- and any(
- block.get("type") == "tool_use"
- for block in last_msg["content"]
- )
- ):
- logger.warning(
- "Found tool_use at end of conversation without tool_result - removing it"
- )
- filtered.pop() # Remove problematic message
- return filtered, system_msg
- async def _execute_task(self, task: dict[str, Any]):
- """Async entry point.
- Decide if streaming or not, then call the appropriate helper.
- """
- api_key = os.getenv("ANTHROPIC_API_KEY")
- if not api_key:
- logger.error("Missing ANTHROPIC_API_KEY in environment.")
- raise ValueError(
- "Anthropic API key not found. Set ANTHROPIC_API_KEY env var."
- )
- messages = task["messages"]
- generation_config = task["generation_config"]
- extra_kwargs = task["kwargs"]
- base_args = self._get_base_args(generation_config)
- filtered_messages, system_msg = self._split_system_messages(messages)
- base_args["messages"] = filtered_messages
- if system_msg:
- base_args["system"] = system_msg
- args = {**base_args, **extra_kwargs}
- logger.debug(f"Anthropic async call with args={args}")
- if generation_config.stream:
- return self._execute_task_async_streaming(args)
- else:
- return await self._execute_task_async_nonstreaming(args)
- async def _execute_task_async_nonstreaming(
- self, args: dict[str, Any]
- ) -> LLMChatCompletion:
- api_key = os.getenv("ANTHROPIC_API_KEY")
- if not api_key:
- logger.error("Missing ANTHROPIC_API_KEY in environment.")
- raise ValueError(
- "Anthropic API key not found. Set ANTHROPIC_API_KEY env var."
- )
- try:
- logger.debug(f"Anthropic API request: {args}")
- response = await self.async_client.messages.create(**args)
- logger.debug(f"Anthropic API response: {response}")
- return LLMChatCompletion(
- **self._convert_to_chat_completion(response)
- )
- except Exception as e:
- logger.error(f"Anthropic async non-stream call failed: {e}")
- logger.error("message payload = ", args)
- raise
- async def _execute_task_async_streaming(
- self, args: dict
- ) -> AsyncGenerator[dict[str, Any], None]:
- """Streaming call (async): yields partial tokens in OpenAI-like SSE
- format."""
- # The `stream=True` is typically handled by Anthropics from the original args,
- # but we remove it to avoid conflicts and rely on `messages.stream()`.
- args.pop("stream", None)
- try:
- async with self.async_client.messages.stream(**args) as stream:
- # We'll track partial JSON for function calls in buffer_data
- buffer_data: dict[str, Any] = {
- "tool_json_buffer": "",
- "tool_name": None,
- "tool_id": None,
- "is_collecting_tool": False,
- "thinking_buffer": "",
- "is_collecting_thinking": False,
- "thinking_signature": None,
- "message_id": f"chatcmpl-{int(time.time())}",
- }
- model_name = args.get("model", "claude-2")
- if isinstance(model_name, str):
- model_name = model_name.split("anthropic/")[-1]
- async for event in stream:
- chunks = self._process_stream_event(
- event=event,
- buffer_data=buffer_data,
- model_name=model_name,
- )
- for chunk in chunks:
- yield chunk
- except Exception as e:
- logger.error(f"Failed to execute streaming Anthropic task: {e}")
- logger.error("message payload = ", args)
- raise
- def _execute_task_sync(self, task: dict[str, Any]):
- """Synchronous entry point."""
- messages = task["messages"]
- generation_config = task["generation_config"]
- extra_kwargs = task["kwargs"]
- base_args = self._get_base_args(generation_config)
- filtered_messages, system_msg = self._split_system_messages(messages)
- base_args["messages"] = filtered_messages
- if system_msg:
- base_args["system"] = system_msg
- args = {**base_args, **extra_kwargs}
- logger.debug(f"Anthropic sync call with args={args}")
- if generation_config.stream:
- return self._execute_task_sync_streaming(args)
- else:
- return self._execute_task_sync_nonstreaming(args)
- def _execute_task_sync_nonstreaming(
- self, args: dict[str, Any]
- ): # -> LLMChatCompletion: # FIXME: LLMChatCompletion is an object from the OpenAI API, which causes a validation error
- """Non-streaming synchronous call."""
- try:
- response = self.client.messages.create(**args)
- logger.debug("Anthropic sync non-stream call succeeded.")
- return LLMChatCompletion(
- **self._convert_to_chat_completion(response)
- )
- except Exception as e:
- logger.error(f"Anthropic sync call failed: {e}")
- raise
- def _execute_task_sync_streaming(
- self, args: dict[str, Any]
- ) -> Generator[dict[str, Any], None, None]:
- """
- Synchronous streaming call: yields partial tokens in a generator.
- """
- args.pop("stream", None)
- try:
- with self.client.messages.stream(**args) as stream:
- buffer_data: dict[str, Any] = {
- "tool_json_buffer": "",
- "tool_name": None,
- "tool_id": None,
- "is_collecting_tool": False,
- "thinking_buffer": "",
- "is_collecting_thinking": False,
- "thinking_signature": None,
- "message_id": f"chatcmpl-{int(time.time())}",
- }
- model_name = args.get("model", "anthropic/claude-2")
- if isinstance(model_name, str):
- model_name = model_name.split("anthropic/")[-1]
- for event in stream:
- yield from self._process_stream_event(
- event=event,
- buffer_data=buffer_data,
- model_name=model_name.split("anthropic/")[-1],
- )
- except Exception as e:
- logger.error(f"Anthropic sync streaming call failed: {e}")
- raise
- def _process_stream_event(
- self, event: Any, buffer_data: dict[str, Any], model_name: str
- ) -> list[dict[str, Any]]:
- chunks: list[dict[str, Any]] = []
- def make_base_chunk() -> dict[str, Any]:
- return {
- "id": buffer_data["message_id"],
- "object": "chat.completion.chunk",
- "created": int(time.time()),
- "model": model_name,
- "choices": [{"index": 0, "delta": {}, "finish_reason": None}],
- }
- if isinstance(event, RawMessageStartEvent):
- buffer_data["message_id"] = event.message.id
- chunk = make_base_chunk()
- input_tokens = (
- event.message.usage.input_tokens if event.message.usage else 0
- )
- chunk["usage"] = {
- "prompt_tokens": input_tokens,
- "completion_tokens": 0,
- "total_tokens": input_tokens,
- }
- chunks.append(chunk)
- elif isinstance(event, RawContentBlockStartEvent):
- if hasattr(event.content_block, "type"):
- block_type = event.content_block.type
- if block_type == "thinking":
- buffer_data["is_collecting_thinking"] = True
- buffer_data["thinking_buffer"] = ""
- # Don't emit anything yet
- elif block_type == "tool_use" or isinstance(
- event.content_block, ToolUseBlock
- ):
- buffer_data["tool_name"] = event.content_block.name # type: ignore
- buffer_data["tool_id"] = event.content_block.id # type: ignore
- buffer_data["tool_json_buffer"] = ""
- buffer_data["is_collecting_tool"] = True
- elif isinstance(event, RawContentBlockDeltaEvent):
- delta_obj = getattr(event, "delta", None)
- delta_type = getattr(delta_obj, "type", None)
- # Handle thinking deltas
- if delta_type == "thinking_delta" and hasattr(
- delta_obj, "thinking"
- ):
- thinking_chunk = delta_obj.thinking # type: ignore
- if buffer_data["is_collecting_thinking"]:
- buffer_data["thinking_buffer"] += thinking_chunk
- # Stream thinking chunks as they come in
- chunk = make_base_chunk()
- chunk["choices"][0]["delta"] = {"thinking": thinking_chunk}
- chunks.append(chunk)
- # Handle signature deltas for thinking blocks
- elif delta_type == "signature_delta" and hasattr(
- delta_obj, "signature"
- ):
- if buffer_data["is_collecting_thinking"]:
- buffer_data["thinking_signature"] = delta_obj.signature # type: ignore
- # No need to emit anything for the signature
- chunk = make_base_chunk()
- chunk["choices"][0]["delta"] = {
- "thinking_signature": delta_obj.signature # type: ignore
- }
- chunks.append(chunk)
- # Handle text deltas
- elif delta_type == "text_delta" and hasattr(delta_obj, "text"):
- text_chunk = delta_obj.text # type: ignore
- if not buffer_data["is_collecting_tool"] and text_chunk:
- chunk = make_base_chunk()
- chunk["choices"][0]["delta"] = {"content": text_chunk}
- chunks.append(chunk)
- # Handle partial JSON for tools
- elif hasattr(delta_obj, "partial_json"):
- if buffer_data["is_collecting_tool"]:
- buffer_data["tool_json_buffer"] += delta_obj.partial_json # type: ignore
- elif isinstance(event, ContentBlockStopEvent):
- # Handle the end of a thinking block
- if buffer_data.get("is_collecting_thinking"):
- # Emit a special "structured_content_delta" with the complete thinking block
- if (
- buffer_data["thinking_buffer"]
- and buffer_data["thinking_signature"]
- ):
- chunk = make_base_chunk()
- chunk["choices"][0]["delta"] = {
- "structured_content": [
- {
- "type": "thinking",
- "thinking": buffer_data["thinking_buffer"],
- "signature": buffer_data["thinking_signature"],
- }
- ]
- }
- chunks.append(chunk)
- # Reset thinking collection
- buffer_data["is_collecting_thinking"] = False
- buffer_data["thinking_buffer"] = ""
- buffer_data["thinking_signature"] = None
- # Handle the end of a tool use block
- elif buffer_data.get("is_collecting_tool"):
- try:
- json.loads(buffer_data["tool_json_buffer"])
- chunk = make_base_chunk()
- chunk["choices"][0]["delta"] = {
- "tool_calls": [
- {
- "index": 0,
- "type": "function",
- "id": buffer_data["tool_id"]
- or f"call_{generate_tool_id()}",
- "function": {
- "name": buffer_data["tool_name"],
- "arguments": buffer_data[
- "tool_json_buffer"
- ],
- },
- }
- ]
- }
- chunks.append(chunk)
- buffer_data["is_collecting_tool"] = False
- buffer_data["tool_json_buffer"] = ""
- buffer_data["tool_name"] = None
- buffer_data["tool_id"] = None
- except json.JSONDecodeError:
- logger.warning(
- "Incomplete JSON in tool call, skipping chunk"
- )
- elif isinstance(event, MessageStopEvent):
- # Check if the event has a message attribute before accessing it
- stop_reason = getattr(event, "message", None)
- if stop_reason and hasattr(stop_reason, "stop_reason"):
- stop_reason = stop_reason.stop_reason
- chunk = make_base_chunk()
- if stop_reason == "tool_use":
- chunk["choices"][0]["delta"] = {}
- chunk["choices"][0]["finish_reason"] = "tool_calls"
- else:
- chunk["choices"][0]["delta"] = {}
- chunk["choices"][0]["finish_reason"] = "stop"
- chunks.append(chunk)
- else:
- # Handle the case where message is not available
- chunk = make_base_chunk()
- chunk["choices"][0]["delta"] = {}
- chunk["choices"][0]["finish_reason"] = "stop"
- chunks.append(chunk)
- return chunks
|