anthropic.py 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925
  1. import copy
  2. import json
  3. import logging
  4. import os
  5. import time
  6. import uuid
  7. from typing import (
  8. Any,
  9. AsyncGenerator,
  10. Generator,
  11. Optional,
  12. )
  13. from anthropic import Anthropic, AsyncAnthropic
  14. from anthropic.types import (
  15. ContentBlockStopEvent,
  16. Message,
  17. MessageStopEvent,
  18. RawContentBlockDeltaEvent,
  19. RawContentBlockStartEvent,
  20. RawMessageStartEvent,
  21. ToolUseBlock,
  22. )
  23. from core.base.abstractions import GenerationConfig, LLMChatCompletion
  24. from core.base.providers.llm import CompletionConfig, CompletionProvider
  25. from .utils import resize_base64_image
  26. logger = logging.getLogger(__name__)
  27. def generate_tool_id() -> str:
  28. """Generate a unique tool ID using UUID4."""
  29. return f"tool_{uuid.uuid4().hex[:12]}"
  30. def process_images_in_message(message: dict) -> dict:
  31. """
  32. Process all images in a message to ensure they're within Anthropic's recommended limits.
  33. """
  34. if not message or not isinstance(message, dict):
  35. return message
  36. # Handle nested image_data (old format)
  37. if (
  38. message.get("role")
  39. and message.get("image_data")
  40. and isinstance(message["image_data"], dict)
  41. ):
  42. if message["image_data"].get("data") and message["image_data"].get(
  43. "media_type"
  44. ):
  45. message["image_data"]["data"] = resize_base64_image(
  46. message["image_data"]["data"]
  47. )
  48. return message
  49. # Handle standard content list format
  50. if message.get("content") and isinstance(message["content"], list):
  51. for i, block in enumerate(message["content"]):
  52. if isinstance(block, dict) and block.get("type") == "image":
  53. if block.get("source", {}).get("type") == "base64" and block[
  54. "source"
  55. ].get("data"):
  56. message["content"][i]["source"]["data"] = (
  57. resize_base64_image(block["source"]["data"])
  58. )
  59. # Handle string content with base64 image detection (less common)
  60. elif (
  61. message.get("content")
  62. and isinstance(message["content"], str)
  63. and ";base64," in message["content"]
  64. ):
  65. # This is a basic detection for base64 images in text - might need more robust handling
  66. logger.warning(
  67. "Detected potential base64 image in string content - not auto-resizing"
  68. )
  69. return message
  70. def openai_message_to_anthropic_block(msg: dict) -> dict:
  71. """Converts a single OpenAI-style message (including function/tool calls)
  72. into one Anthropic-style message.
  73. Expected keys in `msg` can include:
  74. - role: "system" | "assistant" | "user" | "function" | "tool"
  75. - content: str (possibly JSON arguments or the final text)
  76. - name: str (tool/function name)
  77. - tool_call_id or function_call arguments
  78. - function_call: {"name": ..., "arguments": "..."}
  79. """
  80. role = msg.get("role", "")
  81. content = msg.get("content", "")
  82. tool_call_id = msg.get("tool_call_id")
  83. # Handle old-style image_data field
  84. image_data = msg.get("image_data")
  85. # Handle nested image_url (less common)
  86. image_url = msg.get("image_url")
  87. if role == "system":
  88. # System messages should not have images, extract any image to a separate user message
  89. if image_url or image_data:
  90. logger.warning(
  91. "Found image in system message - images should be in user messages only"
  92. )
  93. return msg
  94. if role in ["user", "assistant"]:
  95. # If content is already a list, assume it's properly formatted
  96. if isinstance(content, list):
  97. return {"role": role, "content": content}
  98. # Process old-style image_data or image_url
  99. if image_url or image_data:
  100. formatted_content = []
  101. # Add image content first (as recommended by Anthropic)
  102. if image_url:
  103. formatted_content.append(
  104. {
  105. "type": "image",
  106. "source": {"type": "url", "url": image_url},
  107. }
  108. )
  109. elif image_data:
  110. # Resize the image data if needed
  111. resized_data = image_data.get("data", "")
  112. if resized_data:
  113. resized_data = resize_base64_image(resized_data)
  114. formatted_content.append(
  115. {
  116. "type": "image",
  117. "source": {
  118. "type": "base64",
  119. "media_type": image_data.get(
  120. "media_type", "image/jpeg"
  121. ),
  122. "data": resized_data,
  123. },
  124. }
  125. )
  126. # Add text content after the image
  127. if content:
  128. if isinstance(content, str):
  129. formatted_content.append({"type": "text", "text": content})
  130. elif isinstance(content, list):
  131. # If it's already a list, extend with it
  132. formatted_content.extend(content)
  133. return {"role": role, "content": formatted_content}
  134. if role in ["function", "tool"]:
  135. return {
  136. "role": "user",
  137. "content": [
  138. {
  139. "type": "tool_result",
  140. "tool_use_id": tool_call_id,
  141. "content": content,
  142. }
  143. ],
  144. }
  145. return {"role": role, "content": content}
  146. class AnthropicCompletionProvider(CompletionProvider):
  147. def __init__(self, config: CompletionConfig, *args, **kwargs) -> None:
  148. super().__init__(config)
  149. self.client = Anthropic()
  150. self.async_client = AsyncAnthropic()
  151. logger.debug("AnthropicCompletionProvider initialized successfully")
  152. def _get_base_args(
  153. self, generation_config: GenerationConfig
  154. ) -> dict[str, Any]:
  155. """Build the arguments dictionary for Anthropic's messages.create().
  156. Handles tool configuration according to Anthropic's schema:
  157. {
  158. "type": "function", # Use 'function' type for custom tools
  159. "name": "tool_name",
  160. "description": "tool description",
  161. "parameters": { # Note: Anthropic expects 'parameters', not 'input_schema'
  162. "type": "object",
  163. "properties": {...},
  164. "required": [...]
  165. }
  166. }
  167. """
  168. model_str = generation_config.model or ""
  169. model_name = (
  170. model_str.split("anthropic/")[-1]
  171. if model_str
  172. else "claude-3-opus-20240229"
  173. )
  174. args: dict[str, Any] = {
  175. "model": model_name,
  176. "temperature": generation_config.temperature,
  177. "max_tokens": generation_config.max_tokens_to_sample,
  178. "stream": generation_config.stream,
  179. }
  180. if generation_config.top_p:
  181. args["top_p"] = generation_config.top_p
  182. if generation_config.tools is not None:
  183. # Convert tools to Anthropic's format
  184. anthropic_tools: list[dict[str, Any]] = []
  185. for tool in generation_config.tools:
  186. tool_def = {
  187. "name": tool["function"]["name"],
  188. "description": tool["function"]["description"],
  189. "input_schema": tool["function"]["parameters"],
  190. }
  191. anthropic_tools.append(tool_def)
  192. args["tools"] = anthropic_tools
  193. if hasattr(generation_config, "tool_choice"):
  194. tool_choice = generation_config.tool_choice
  195. if isinstance(tool_choice, str):
  196. if tool_choice == "auto":
  197. args["tool_choice"] = {"type": "auto"}
  198. elif tool_choice == "any":
  199. args["tool_choice"] = {"type": "any"}
  200. elif isinstance(tool_choice, dict):
  201. if tool_choice.get("type") == "function":
  202. args["tool_choice"] = {
  203. "type": "function",
  204. "name": tool_choice.get("name"),
  205. }
  206. if hasattr(generation_config, "disable_parallel_tool_use"):
  207. args["tool_choice"] = args.get("tool_choice", {})
  208. args["tool_choice"]["disable_parallel_tool_use"] = (
  209. generation_config.disable_parallel_tool_use
  210. )
  211. # --- Extended Thinking Support ---
  212. if getattr(generation_config, "extended_thinking", False):
  213. if (
  214. not hasattr(generation_config, "thinking_budget")
  215. or generation_config.thinking_budget is None
  216. ):
  217. raise ValueError(
  218. "Extended thinking is enabled but no thinking_budget is provided."
  219. )
  220. if (
  221. generation_config.thinking_budget
  222. >= generation_config.max_tokens_to_sample
  223. ):
  224. raise ValueError(
  225. "thinking_budget must be less than max_tokens_to_sample."
  226. )
  227. args["thinking"] = {
  228. "type": "enabled",
  229. "budget_tokens": generation_config.thinking_budget,
  230. }
  231. return args
  232. def _preprocess_messages(self, messages: list[dict]) -> list[dict]:
  233. """
  234. Preprocess all messages to optimize images before sending to Anthropic API.
  235. """
  236. if not messages or not isinstance(messages, list):
  237. return messages
  238. processed_messages = []
  239. for message in messages:
  240. processed_message = process_images_in_message(message)
  241. processed_messages.append(processed_message)
  242. return processed_messages
  243. def _create_openai_style_message(self, content_blocks, tool_calls=None):
  244. """
  245. Create an OpenAI-style message from Anthropic content blocks
  246. while preserving the original structure.
  247. """
  248. display_content = ""
  249. structured_content: list[Any] = []
  250. for block in content_blocks:
  251. if block.type == "text":
  252. display_content += block.text
  253. elif block.type == "thinking" and hasattr(block, "thinking"):
  254. # Store the complete thinking block
  255. structured_content.append(
  256. {
  257. "type": "thinking",
  258. "thinking": block.thinking,
  259. "signature": block.signature,
  260. }
  261. )
  262. # For display/logging
  263. # display_content += f"<think>{block.thinking}</think>"
  264. elif block.type == "redacted_thinking" and hasattr(block, "data"):
  265. # Store the complete redacted thinking block
  266. structured_content.append(
  267. {"type": "redacted_thinking", "data": block.data}
  268. )
  269. # Add a placeholder for display/logging
  270. display_content += "<redacted thinking block>"
  271. elif block.type == "tool_use":
  272. # Tool use blocks are handled separately via tool_calls
  273. pass
  274. # If we have structured content (thinking blocks), use that
  275. if structured_content:
  276. # Add any text block at the end if needed
  277. for block in content_blocks:
  278. if block.type == "text":
  279. structured_content.append(
  280. {"type": "text", "text": block.text}
  281. )
  282. return {
  283. "content": display_content or None,
  284. "structured_content": structured_content,
  285. }
  286. else:
  287. # If no structured content, just return the display content
  288. return {"content": display_content or None}
  289. def _convert_to_chat_completion(self, anthropic_msg: Message) -> dict:
  290. """
  291. Convert a non-streaming Anthropic Message into an OpenAI-style dict.
  292. Preserves thinking blocks for proper handling.
  293. """
  294. tool_calls: list[Any] = []
  295. message_data: dict[str, Any] = {"role": anthropic_msg.role}
  296. if anthropic_msg.content:
  297. # First, extract any tool use blocks
  298. for block in anthropic_msg.content:
  299. if hasattr(block, "type") and block.type == "tool_use":
  300. tool_calls.append(
  301. {
  302. "index": len(tool_calls),
  303. "id": block.id,
  304. "type": "function",
  305. "function": {
  306. "name": block.name,
  307. "arguments": json.dumps(block.input),
  308. },
  309. }
  310. )
  311. # Then create the message with appropriate content
  312. message_data.update(
  313. self._create_openai_style_message(
  314. anthropic_msg.content, tool_calls
  315. )
  316. )
  317. # If we have tool calls, add them
  318. if tool_calls:
  319. message_data["tool_calls"] = tool_calls
  320. finish_reason = (
  321. "stop"
  322. if anthropic_msg.stop_reason == "end_turn"
  323. else anthropic_msg.stop_reason
  324. )
  325. finish_reason = (
  326. "tool_calls"
  327. if anthropic_msg.stop_reason == "tool_use"
  328. else finish_reason
  329. )
  330. model_str = anthropic_msg.model or ""
  331. model_name = model_str.split("anthropic/")[-1] if model_str else ""
  332. return {
  333. "id": anthropic_msg.id,
  334. "object": "chat.completion",
  335. "created": int(time.time()),
  336. "model": model_name,
  337. "usage": {
  338. "prompt_tokens": (
  339. anthropic_msg.usage.input_tokens
  340. if anthropic_msg.usage
  341. else 0
  342. ),
  343. "completion_tokens": (
  344. anthropic_msg.usage.output_tokens
  345. if anthropic_msg.usage
  346. else 0
  347. ),
  348. "total_tokens": (
  349. (
  350. anthropic_msg.usage.input_tokens
  351. if anthropic_msg.usage
  352. else 0
  353. )
  354. + (
  355. anthropic_msg.usage.output_tokens
  356. if anthropic_msg.usage
  357. else 0
  358. )
  359. ),
  360. },
  361. "choices": [
  362. {
  363. "index": 0,
  364. "message": message_data,
  365. "finish_reason": finish_reason,
  366. }
  367. ],
  368. }
  369. def _split_system_messages(
  370. self, messages: list[dict]
  371. ) -> tuple[list[dict], Optional[str]]:
  372. """
  373. Process messages for Anthropic API, ensuring proper format for tool use and thinking blocks.
  374. Now with image optimization.
  375. """
  376. # First preprocess to resize any images
  377. messages = self._preprocess_messages(messages)
  378. system_msg = None
  379. filtered: list[dict[str, Any]] = []
  380. pending_tool_results: list[dict[str, Any]] = []
  381. # Look for pairs of tool_use and tool_result
  382. i = 0
  383. while i < len(messages):
  384. m = copy.deepcopy(messages[i])
  385. # Handle system message
  386. if m["role"] == "system" and system_msg is None:
  387. system_msg = m["content"]
  388. i += 1
  389. continue
  390. # Case 1: Message with list format content (thinking blocks or tool blocks)
  391. if (
  392. isinstance(m.get("content"), list)
  393. and len(m["content"]) > 0
  394. and isinstance(m["content"][0], dict)
  395. ):
  396. filtered.append({"role": m["role"], "content": m["content"]})
  397. i += 1
  398. continue
  399. # Case 2: Message with structured_content field
  400. elif m.get("structured_content") and m["role"] == "assistant":
  401. filtered.append(
  402. {"role": "assistant", "content": m["structured_content"]}
  403. )
  404. i += 1
  405. continue
  406. # Case 3: Tool calls in an assistant message
  407. elif m.get("tool_calls") and m["role"] == "assistant":
  408. # Add content if it exists
  409. if m.get("content") and not isinstance(m["content"], list):
  410. content_to_add = m["content"]
  411. # Handle content with thinking tags
  412. if "<think>" in content_to_add:
  413. thinking_start = content_to_add.find("<think>")
  414. thinking_end = content_to_add.find("</think>")
  415. if (
  416. thinking_start >= 0
  417. and thinking_end > thinking_start
  418. ):
  419. thinking_content = content_to_add[
  420. thinking_start + 7 : thinking_end
  421. ]
  422. text_content = content_to_add[
  423. thinking_end + 8 :
  424. ].strip()
  425. filtered.append(
  426. {
  427. "role": "assistant",
  428. "content": [
  429. {
  430. "type": "thinking",
  431. "thinking": thinking_content,
  432. "signature": "placeholder_signature", # This is a placeholder
  433. },
  434. {"type": "text", "text": text_content},
  435. ],
  436. }
  437. )
  438. else:
  439. filtered.append(
  440. {
  441. "role": "assistant",
  442. "content": content_to_add,
  443. }
  444. )
  445. else:
  446. filtered.append(
  447. {"role": "assistant", "content": content_to_add}
  448. )
  449. # Add tool use blocks
  450. tool_uses = []
  451. for call in m["tool_calls"]:
  452. tool_uses.append(
  453. {
  454. "type": "tool_use",
  455. "id": call["id"],
  456. "name": call["function"]["name"],
  457. "input": json.loads(call["function"]["arguments"]),
  458. }
  459. )
  460. filtered.append({"role": "assistant", "content": tool_uses})
  461. # Check if next message is a tool result for this tool call
  462. if i + 1 < len(messages) and messages[i + 1]["role"] in [
  463. "function",
  464. "tool",
  465. ]:
  466. next_m = copy.deepcopy(messages[i + 1])
  467. # Make sure this is a tool result for the current tool use
  468. if next_m.get("tool_call_id") in [
  469. call["id"] for call in m["tool_calls"]
  470. ]:
  471. # Add tool result as a user message
  472. filtered.append(
  473. {
  474. "role": "user",
  475. "content": [
  476. {
  477. "type": "tool_result",
  478. "tool_use_id": next_m["tool_call_id"],
  479. "content": next_m["content"],
  480. }
  481. ],
  482. }
  483. )
  484. i += 2 # Skip both the tool call and result
  485. continue
  486. i += 1
  487. continue
  488. # Case 4: Direct tool result (might be missing its paired tool call)
  489. elif m["role"] in ["function", "tool"] and m.get("tool_call_id"):
  490. # Add a user message with the tool result
  491. filtered.append(
  492. {
  493. "role": "user",
  494. "content": [
  495. {
  496. "type": "tool_result",
  497. "tool_use_id": m["tool_call_id"],
  498. "content": m["content"],
  499. }
  500. ],
  501. }
  502. )
  503. i += 1
  504. continue
  505. # Default case: normal message
  506. elif m["role"] in ["function", "tool"]:
  507. # Collect tool results to combine them
  508. pending_tool_results.append(
  509. {
  510. "type": "tool_result",
  511. "tool_use_id": m.get("tool_call_id"),
  512. "content": m["content"],
  513. }
  514. )
  515. # If we have all expected results, add them as one message
  516. if len(filtered) > 0 and len(
  517. filtered[-1].get("content", [])
  518. ) == len(pending_tool_results):
  519. filtered.append(
  520. {"role": "user", "content": pending_tool_results}
  521. )
  522. pending_tool_results = []
  523. else:
  524. filtered.append(openai_message_to_anthropic_block(m))
  525. i += 1
  526. # Final validation: ensure no tool_use is at the end without a tool_result
  527. if filtered and len(filtered) > 1:
  528. last_msg = filtered[-1]
  529. if (
  530. last_msg["role"] == "assistant"
  531. and isinstance(last_msg.get("content"), list)
  532. and any(
  533. block.get("type") == "tool_use"
  534. for block in last_msg["content"]
  535. )
  536. ):
  537. logger.warning(
  538. "Found tool_use at end of conversation without tool_result - removing it"
  539. )
  540. filtered.pop() # Remove problematic message
  541. return filtered, system_msg
  542. async def _execute_task(self, task: dict[str, Any]):
  543. """Async entry point.
  544. Decide if streaming or not, then call the appropriate helper.
  545. """
  546. api_key = os.getenv("ANTHROPIC_API_KEY")
  547. if not api_key:
  548. logger.error("Missing ANTHROPIC_API_KEY in environment.")
  549. raise ValueError(
  550. "Anthropic API key not found. Set ANTHROPIC_API_KEY env var."
  551. )
  552. messages = task["messages"]
  553. generation_config = task["generation_config"]
  554. extra_kwargs = task["kwargs"]
  555. base_args = self._get_base_args(generation_config)
  556. filtered_messages, system_msg = self._split_system_messages(messages)
  557. base_args["messages"] = filtered_messages
  558. if system_msg:
  559. base_args["system"] = system_msg
  560. args = {**base_args, **extra_kwargs}
  561. logger.debug(f"Anthropic async call with args={args}")
  562. if generation_config.stream:
  563. return self._execute_task_async_streaming(args)
  564. else:
  565. return await self._execute_task_async_nonstreaming(args)
  566. async def _execute_task_async_nonstreaming(
  567. self, args: dict[str, Any]
  568. ) -> LLMChatCompletion:
  569. api_key = os.getenv("ANTHROPIC_API_KEY")
  570. if not api_key:
  571. logger.error("Missing ANTHROPIC_API_KEY in environment.")
  572. raise ValueError(
  573. "Anthropic API key not found. Set ANTHROPIC_API_KEY env var."
  574. )
  575. try:
  576. logger.debug(f"Anthropic API request: {args}")
  577. response = await self.async_client.messages.create(**args)
  578. logger.debug(f"Anthropic API response: {response}")
  579. return LLMChatCompletion(
  580. **self._convert_to_chat_completion(response)
  581. )
  582. except Exception as e:
  583. logger.error(f"Anthropic async non-stream call failed: {e}")
  584. logger.error("message payload = ", args)
  585. raise
  586. async def _execute_task_async_streaming(
  587. self, args: dict
  588. ) -> AsyncGenerator[dict[str, Any], None]:
  589. """Streaming call (async): yields partial tokens in OpenAI-like SSE
  590. format."""
  591. # The `stream=True` is typically handled by Anthropics from the original args,
  592. # but we remove it to avoid conflicts and rely on `messages.stream()`.
  593. args.pop("stream", None)
  594. try:
  595. async with self.async_client.messages.stream(**args) as stream:
  596. # We'll track partial JSON for function calls in buffer_data
  597. buffer_data: dict[str, Any] = {
  598. "tool_json_buffer": "",
  599. "tool_name": None,
  600. "tool_id": None,
  601. "is_collecting_tool": False,
  602. "thinking_buffer": "",
  603. "is_collecting_thinking": False,
  604. "thinking_signature": None,
  605. "message_id": f"chatcmpl-{int(time.time())}",
  606. }
  607. model_name = args.get("model", "claude-2")
  608. if isinstance(model_name, str):
  609. model_name = model_name.split("anthropic/")[-1]
  610. async for event in stream:
  611. chunks = self._process_stream_event(
  612. event=event,
  613. buffer_data=buffer_data,
  614. model_name=model_name,
  615. )
  616. for chunk in chunks:
  617. yield chunk
  618. except Exception as e:
  619. logger.error(f"Failed to execute streaming Anthropic task: {e}")
  620. logger.error("message payload = ", args)
  621. raise
  622. def _execute_task_sync(self, task: dict[str, Any]):
  623. """Synchronous entry point."""
  624. messages = task["messages"]
  625. generation_config = task["generation_config"]
  626. extra_kwargs = task["kwargs"]
  627. base_args = self._get_base_args(generation_config)
  628. filtered_messages, system_msg = self._split_system_messages(messages)
  629. base_args["messages"] = filtered_messages
  630. if system_msg:
  631. base_args["system"] = system_msg
  632. args = {**base_args, **extra_kwargs}
  633. logger.debug(f"Anthropic sync call with args={args}")
  634. if generation_config.stream:
  635. return self._execute_task_sync_streaming(args)
  636. else:
  637. return self._execute_task_sync_nonstreaming(args)
  638. def _execute_task_sync_nonstreaming(
  639. self, args: dict[str, Any]
  640. ): # -> LLMChatCompletion: # FIXME: LLMChatCompletion is an object from the OpenAI API, which causes a validation error
  641. """Non-streaming synchronous call."""
  642. try:
  643. response = self.client.messages.create(**args)
  644. logger.debug("Anthropic sync non-stream call succeeded.")
  645. return LLMChatCompletion(
  646. **self._convert_to_chat_completion(response)
  647. )
  648. except Exception as e:
  649. logger.error(f"Anthropic sync call failed: {e}")
  650. raise
  651. def _execute_task_sync_streaming(
  652. self, args: dict[str, Any]
  653. ) -> Generator[dict[str, Any], None, None]:
  654. """
  655. Synchronous streaming call: yields partial tokens in a generator.
  656. """
  657. args.pop("stream", None)
  658. try:
  659. with self.client.messages.stream(**args) as stream:
  660. buffer_data: dict[str, Any] = {
  661. "tool_json_buffer": "",
  662. "tool_name": None,
  663. "tool_id": None,
  664. "is_collecting_tool": False,
  665. "thinking_buffer": "",
  666. "is_collecting_thinking": False,
  667. "thinking_signature": None,
  668. "message_id": f"chatcmpl-{int(time.time())}",
  669. }
  670. model_name = args.get("model", "anthropic/claude-2")
  671. if isinstance(model_name, str):
  672. model_name = model_name.split("anthropic/")[-1]
  673. for event in stream:
  674. yield from self._process_stream_event(
  675. event=event,
  676. buffer_data=buffer_data,
  677. model_name=model_name.split("anthropic/")[-1],
  678. )
  679. except Exception as e:
  680. logger.error(f"Anthropic sync streaming call failed: {e}")
  681. raise
  682. def _process_stream_event(
  683. self, event: Any, buffer_data: dict[str, Any], model_name: str
  684. ) -> list[dict[str, Any]]:
  685. chunks: list[dict[str, Any]] = []
  686. def make_base_chunk() -> dict[str, Any]:
  687. return {
  688. "id": buffer_data["message_id"],
  689. "object": "chat.completion.chunk",
  690. "created": int(time.time()),
  691. "model": model_name,
  692. "choices": [{"index": 0, "delta": {}, "finish_reason": None}],
  693. }
  694. if isinstance(event, RawMessageStartEvent):
  695. buffer_data["message_id"] = event.message.id
  696. chunk = make_base_chunk()
  697. input_tokens = (
  698. event.message.usage.input_tokens if event.message.usage else 0
  699. )
  700. chunk["usage"] = {
  701. "prompt_tokens": input_tokens,
  702. "completion_tokens": 0,
  703. "total_tokens": input_tokens,
  704. }
  705. chunks.append(chunk)
  706. elif isinstance(event, RawContentBlockStartEvent):
  707. if hasattr(event.content_block, "type"):
  708. block_type = event.content_block.type
  709. if block_type == "thinking":
  710. buffer_data["is_collecting_thinking"] = True
  711. buffer_data["thinking_buffer"] = ""
  712. # Don't emit anything yet
  713. elif block_type == "tool_use" or isinstance(
  714. event.content_block, ToolUseBlock
  715. ):
  716. buffer_data["tool_name"] = event.content_block.name # type: ignore
  717. buffer_data["tool_id"] = event.content_block.id # type: ignore
  718. buffer_data["tool_json_buffer"] = ""
  719. buffer_data["is_collecting_tool"] = True
  720. elif isinstance(event, RawContentBlockDeltaEvent):
  721. delta_obj = getattr(event, "delta", None)
  722. delta_type = getattr(delta_obj, "type", None)
  723. # Handle thinking deltas
  724. if delta_type == "thinking_delta" and hasattr(
  725. delta_obj, "thinking"
  726. ):
  727. thinking_chunk = delta_obj.thinking # type: ignore
  728. if buffer_data["is_collecting_thinking"]:
  729. buffer_data["thinking_buffer"] += thinking_chunk
  730. # Stream thinking chunks as they come in
  731. chunk = make_base_chunk()
  732. chunk["choices"][0]["delta"] = {"thinking": thinking_chunk}
  733. chunks.append(chunk)
  734. # Handle signature deltas for thinking blocks
  735. elif delta_type == "signature_delta" and hasattr(
  736. delta_obj, "signature"
  737. ):
  738. if buffer_data["is_collecting_thinking"]:
  739. buffer_data["thinking_signature"] = delta_obj.signature # type: ignore
  740. # No need to emit anything for the signature
  741. chunk = make_base_chunk()
  742. chunk["choices"][0]["delta"] = {
  743. "thinking_signature": delta_obj.signature # type: ignore
  744. }
  745. chunks.append(chunk)
  746. # Handle text deltas
  747. elif delta_type == "text_delta" and hasattr(delta_obj, "text"):
  748. text_chunk = delta_obj.text # type: ignore
  749. if not buffer_data["is_collecting_tool"] and text_chunk:
  750. chunk = make_base_chunk()
  751. chunk["choices"][0]["delta"] = {"content": text_chunk}
  752. chunks.append(chunk)
  753. # Handle partial JSON for tools
  754. elif hasattr(delta_obj, "partial_json"):
  755. if buffer_data["is_collecting_tool"]:
  756. buffer_data["tool_json_buffer"] += delta_obj.partial_json # type: ignore
  757. elif isinstance(event, ContentBlockStopEvent):
  758. # Handle the end of a thinking block
  759. if buffer_data.get("is_collecting_thinking"):
  760. # Emit a special "structured_content_delta" with the complete thinking block
  761. if (
  762. buffer_data["thinking_buffer"]
  763. and buffer_data["thinking_signature"]
  764. ):
  765. chunk = make_base_chunk()
  766. chunk["choices"][0]["delta"] = {
  767. "structured_content": [
  768. {
  769. "type": "thinking",
  770. "thinking": buffer_data["thinking_buffer"],
  771. "signature": buffer_data["thinking_signature"],
  772. }
  773. ]
  774. }
  775. chunks.append(chunk)
  776. # Reset thinking collection
  777. buffer_data["is_collecting_thinking"] = False
  778. buffer_data["thinking_buffer"] = ""
  779. buffer_data["thinking_signature"] = None
  780. # Handle the end of a tool use block
  781. elif buffer_data.get("is_collecting_tool"):
  782. try:
  783. json.loads(buffer_data["tool_json_buffer"])
  784. chunk = make_base_chunk()
  785. chunk["choices"][0]["delta"] = {
  786. "tool_calls": [
  787. {
  788. "index": 0,
  789. "type": "function",
  790. "id": buffer_data["tool_id"]
  791. or f"call_{generate_tool_id()}",
  792. "function": {
  793. "name": buffer_data["tool_name"],
  794. "arguments": buffer_data[
  795. "tool_json_buffer"
  796. ],
  797. },
  798. }
  799. ]
  800. }
  801. chunks.append(chunk)
  802. buffer_data["is_collecting_tool"] = False
  803. buffer_data["tool_json_buffer"] = ""
  804. buffer_data["tool_name"] = None
  805. buffer_data["tool_id"] = None
  806. except json.JSONDecodeError:
  807. logger.warning(
  808. "Incomplete JSON in tool call, skipping chunk"
  809. )
  810. elif isinstance(event, MessageStopEvent):
  811. # Check if the event has a message attribute before accessing it
  812. stop_reason = getattr(event, "message", None)
  813. if stop_reason and hasattr(stop_reason, "stop_reason"):
  814. stop_reason = stop_reason.stop_reason
  815. chunk = make_base_chunk()
  816. if stop_reason == "tool_use":
  817. chunk["choices"][0]["delta"] = {}
  818. chunk["choices"][0]["finish_reason"] = "tool_calls"
  819. else:
  820. chunk["choices"][0]["delta"] = {}
  821. chunk["choices"][0]["finish_reason"] = "stop"
  822. chunks.append(chunk)
  823. else:
  824. # Handle the case where message is not available
  825. chunk = make_base_chunk()
  826. chunk["choices"][0]["delta"] = {}
  827. chunk["choices"][0]["finish_reason"] = "stop"
  828. chunks.append(chunk)
  829. return chunks