123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297 |
- from builtins import list as _list
- from pathlib import Path
- from typing import Any, Optional
- from uuid import UUID
- import aiofiles
- from shared.api.models.base import WrappedBooleanResponse
- from shared.api.models.management.responses import (
- WrappedConversationMessagesResponse,
- WrappedConversationResponse,
- WrappedConversationsResponse,
- WrappedMessageResponse,
- )
- class ConversationsSDK:
- def __init__(self, client):
- self.client = client
- async def create(
- self,
- name: Optional[str] = None,
- ) -> WrappedConversationResponse:
- """
- Create a new conversation.
- Returns:
- dict: Created conversation information
- """
- data: dict[str, Any] = {}
- if name:
- data["name"] = name
- return await self.client._make_request(
- "POST",
- "conversations",
- data=data,
- version="v3",
- )
- async def list(
- self,
- ids: Optional[list[str | UUID]] = None,
- offset: Optional[int] = 0,
- limit: Optional[int] = 100,
- ) -> WrappedConversationsResponse:
- """
- List conversations with pagination and sorting options.
- Args:
- ids (Optional[list[str | UUID]]): List of conversation IDs to retrieve
- offset (int, optional): Specifies the number of objects to skip. Defaults to 0.
- limit (int, optional): Specifies a limit on the number of objects to return, ranging between 1 and 100. Defaults to 100.
- Returns:
- dict: List of conversations and pagination information
- """
- params: dict = {
- "offset": offset,
- "limit": limit,
- }
- if ids:
- params["ids"] = ids
- return await self.client._make_request(
- "GET",
- "conversations",
- params=params,
- version="v3",
- )
- async def retrieve(
- self,
- id: str | UUID,
- ) -> WrappedConversationMessagesResponse:
- """
- Get detailed information about a specific conversation.
- Args:
- id (str | UUID): The ID of the conversation to retrieve
- Returns:
- dict: Detailed conversation information
- """
- return await self.client._make_request(
- "GET",
- f"conversations/{str(id)}",
- version="v3",
- )
- async def update(
- self,
- id: str | UUID,
- name: str,
- ) -> WrappedConversationResponse:
- """
- Update an existing conversation.
- Args:
- id (str | UUID): The ID of the conversation to update
- name (str): The new name of the conversation
- Returns:
- dict: The updated conversation
- """
- data: dict[str, Any] = {
- "name": name,
- }
- return await self.client._make_request(
- "POST",
- f"conversations/{str(id)}",
- json=data,
- version="v3",
- )
- async def delete(
- self,
- id: str | UUID,
- ) -> WrappedBooleanResponse:
- """
- Delete a conversation.
- Args:
- id (str | UUID): The ID of the conversation to delete
- Returns:
- bool: True if deletion was successful
- """
- return await self.client._make_request(
- "DELETE",
- f"conversations/{str(id)}",
- version="v3",
- )
- async def add_message(
- self,
- id: str | UUID,
- content: str,
- role: str,
- metadata: Optional[dict] = None,
- parent_id: Optional[str] = None,
- ) -> WrappedMessageResponse:
- """
- Add a new message to a conversation.
- Args:
- id (str | UUID): The ID of the conversation to add the message to
- content (str): The content of the message
- role (str): The role of the message (e.g., "user" or "assistant")
- parent_id (Optional[str]): The ID of the parent message
- metadata (Optional[dict]): Additional metadata to attach to the message
- Returns:
- dict: Result of the operation, including the new message ID
- """
- data: dict[str, Any] = {
- "content": content,
- "role": role,
- }
- if parent_id:
- data["parent_id"] = parent_id
- if metadata:
- data["metadata"] = metadata
- return await self.client._make_request(
- "POST",
- f"conversations/{str(id)}/messages",
- json=data,
- version="v3",
- )
- async def update_message(
- self,
- id: str | UUID,
- message_id: str,
- content: Optional[str] = None,
- metadata: Optional[dict] = None,
- ) -> dict:
- """
- Update an existing message in a conversation.
- Args:
- id (str | UUID): The ID of the conversation containing the message
- message_id (str): The ID of the message to update
- content (str): The new content of the message
- metadata (dict): Additional metadata to attach to the message
- Returns:
- dict: Result of the operation, including the new message ID and branch ID
- """
- data: dict[str, Any] = {"content": content}
- if metadata:
- data["metadata"] = metadata
- return await self.client._make_request(
- "POST",
- f"conversations/{str(id)}/messages/{message_id}",
- json=data,
- version="v3",
- )
- async def export(
- self,
- output_path: str | Path,
- columns: Optional[_list[str]] = None,
- filters: Optional[dict] = None,
- include_header: bool = True,
- ) -> None:
- """
- Export conversations to a CSV file, streaming the results directly to disk.
- Args:
- output_path (str | Path): Local path where the CSV file should be saved
- columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
- filters (Optional[dict]): Optional filters to apply when selecting conversations
- include_header (bool): Whether to include column headers in the CSV (default: True)
- """
- # Convert path to string if it's a Path object
- output_path = (
- str(output_path) if isinstance(output_path, Path) else output_path
- )
- # Prepare request data
- data: dict[str, Any] = {"include_header": include_header}
- if columns:
- data["columns"] = columns
- if filters:
- data["filters"] = filters
- # Stream response directly to file
- async with aiofiles.open(output_path, "wb") as f:
- async with self.client.session.post(
- f"{self.client.base_url}/v3/conversations/export",
- json=data,
- headers={
- "Accept": "text/csv",
- **self.client._get_auth_headers(),
- },
- ) as response:
- if response.status != 200:
- raise ValueError(
- f"Export failed with status {response.status}",
- response,
- )
- async for chunk in response.content.iter_chunks():
- if chunk:
- await f.write(chunk[0])
- async def export_messages(
- self,
- output_path: str | Path,
- columns: Optional[_list[str]] = None,
- filters: Optional[dict] = None,
- include_header: bool = True,
- ) -> None:
- """
- Export messages to a CSV file, streaming the results directly to disk.
- Args:
- output_path (str | Path): Local path where the CSV file should be saved
- columns (Optional[list[str]]): Specific columns to export. If None, exports default columns
- filters (Optional[dict]): Optional filters to apply when selecting messages
- include_header (bool): Whether to include column headers in the CSV (default: True)
- """
- # Convert path to string if it's a Path object
- output_path = (
- str(output_path) if isinstance(output_path, Path) else output_path
- )
- # Prepare request data
- data: dict[str, Any] = {"include_header": include_header}
- if columns:
- data["columns"] = columns
- if filters:
- data["filters"] = filters
- # Stream response directly to file
- async with aiofiles.open(output_path, "wb") as f:
- async with self.client.session.post(
- f"{self.client.base_url}/v3/conversations/export_messages",
- json=data,
- headers={
- "Accept": "text/csv",
- **self.client._get_auth_headers(),
- },
- ) as response:
- if response.status != 200:
- raise ValueError(
- f"Export failed with status {response.status}",
- response,
- )
- async for chunk in response.content.iter_chunks():
- if chunk:
- await f.write(chunk[0])
|