12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576 |
- import os
- from pathlib import Path
- import time
- import openai
- base_url = "https://assistantapi.cocorobo.cn/api/v1"
- api_key = "cocorobo-xjw-admin"
- client = openai.OpenAI(base_url=base_url, api_key=api_key)
- if __name__ == "__main__":
- file_path = os.path.join(os.path.dirname(__file__) + "/test.txt")
- print(file_path)
- file = client.files.create(file=Path(file_path), purpose="assistants")
- print(file)
- assistant = client.beta.assistants.create(
- name="Assistant Demo",
- instructions="you are a personal assistant, file content could be retrieved to assist the conversation.",
- model="gpt-4o",
- tools=[
- {"type": "file_search"},
- ],
- tool_resources={"file_search": {"vector_stores": [{"file_ids": [file.id]}]}},
- )
- # assistant = client.beta.assistants.retrieve(assistant_id="67614b38d5f1a0df9dddfba9")
- print("=====> : %s\n", assistant)
- thread = client.beta.threads.create()
- print("=====> : %s\n", thread)
- message = client.beta.threads.messages.create(
- thread_id=thread.id,
- role="user",
- content="内容有什么",
- # attachments=[
- # {"file_id": "67614b375e4b953d7f07c27a", "tools": [{"type": "file_search"}]}
- # ]
- )
- print("=====> : %s\n", message)
- run = client.beta.threads.runs.create(
- thread_id=thread.id,
- assistant_id=assistant.id,
- )
- print("=====> : %s\n", run)
- print("checking assistant status. \n")
- while True:
- run = client.beta.threads.runs.retrieve(thread_id=thread.id, run_id=run.id)
- run_steps = client.beta.threads.runs.steps.list(
- run_id=run.id, thread_id=thread.id
- ).data
- for run_step in run_steps:
- print("=====> : %s\n", run_step)
- if run.status == "completed":
- messages = client.beta.threads.messages.list(thread_id=thread.id)
- print("=====> messages:")
- for message in messages:
- assert message.content[0].type == "text"
- print(
- "%s",
- {"role": message.role, "message": message.content[0].text.value},
- )
- # delete asst
- client.beta.assistants.delete(assistant.id)
- break
- elif run.status == "failed":
- print("run failed %s\n", run.last_error)
- break
- else:
- print("in progress...\n")
- time.sleep(5)
|