123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785 |
- # File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
- from __future__ import annotations
- import asyncio
- from typing import List, Iterable
- from typing_extensions import Literal
- from concurrent.futures import Future, ThreadPoolExecutor, as_completed
- import httpx
- import sniffio
- from .... import _legacy_response
- from ....types import FileObject
- from ...._types import NOT_GIVEN, Body, Query, Headers, NotGiven, FileTypes
- from ...._utils import (
- is_given,
- maybe_transform,
- async_maybe_transform,
- )
- from ...._compat import cached_property
- from ...._resource import SyncAPIResource, AsyncAPIResource
- from ...._response import to_streamed_response_wrapper, async_to_streamed_response_wrapper
- from ....pagination import SyncCursorPage, AsyncCursorPage
- from ....types.beta import FileChunkingStrategyParam
- from ...._base_client import AsyncPaginator, make_request_options
- from ....types.beta.vector_stores import file_batch_create_params, file_batch_list_files_params
- from ....types.beta.file_chunking_strategy_param import FileChunkingStrategyParam
- from ....types.beta.vector_stores.vector_store_file import VectorStoreFile
- from ....types.beta.vector_stores.vector_store_file_batch import VectorStoreFileBatch
- __all__ = ["FileBatches", "AsyncFileBatches"]
- class FileBatches(SyncAPIResource):
- @cached_property
- def with_raw_response(self) -> FileBatchesWithRawResponse:
- """
- This property can be used as a prefix for any HTTP method call to return
- the raw response object instead of the parsed content.
- For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers
- """
- return FileBatchesWithRawResponse(self)
- @cached_property
- def with_streaming_response(self) -> FileBatchesWithStreamingResponse:
- """
- An alternative to `.with_raw_response` that doesn't eagerly read the response body.
- For more information, see https://www.github.com/openai/openai-python#with_streaming_response
- """
- return FileBatchesWithStreamingResponse(self)
- def create(
- self,
- vector_store_id: str,
- *,
- file_ids: List[str],
- chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
- # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
- # The extra values given here take precedence over values defined on the client or passed to this method.
- extra_headers: Headers | None = None,
- extra_query: Query | None = None,
- extra_body: Body | None = None,
- timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """
- Create a vector store file batch.
- Args:
- file_ids: A list of [File](https://platform.openai.com/docs/api-reference/files) IDs that
- the vector store should use. Useful for tools like `file_search` that can access
- files.
- chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the `auto`
- strategy. Only applicable if `file_ids` is non-empty.
- extra_headers: Send extra headers
- extra_query: Add additional query parameters to the request
- extra_body: Add additional JSON properties to the request
- timeout: Override the client-level default timeout for this request, in seconds
- """
- if not vector_store_id:
- raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
- extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
- return self._post(
- f"/vector_stores/{vector_store_id}/file_batches",
- body=maybe_transform(
- {
- "file_ids": file_ids,
- "chunking_strategy": chunking_strategy,
- },
- file_batch_create_params.FileBatchCreateParams,
- ),
- options=make_request_options(
- extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
- ),
- cast_to=VectorStoreFileBatch,
- )
- def retrieve(
- self,
- batch_id: str,
- *,
- vector_store_id: str,
- # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
- # The extra values given here take precedence over values defined on the client or passed to this method.
- extra_headers: Headers | None = None,
- extra_query: Query | None = None,
- extra_body: Body | None = None,
- timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """
- Retrieves a vector store file batch.
- Args:
- extra_headers: Send extra headers
- extra_query: Add additional query parameters to the request
- extra_body: Add additional JSON properties to the request
- timeout: Override the client-level default timeout for this request, in seconds
- """
- if not vector_store_id:
- raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
- if not batch_id:
- raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
- extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
- return self._get(
- f"/vector_stores/{vector_store_id}/file_batches/{batch_id}",
- options=make_request_options(
- extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
- ),
- cast_to=VectorStoreFileBatch,
- )
- def cancel(
- self,
- batch_id: str,
- *,
- vector_store_id: str,
- # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
- # The extra values given here take precedence over values defined on the client or passed to this method.
- extra_headers: Headers | None = None,
- extra_query: Query | None = None,
- extra_body: Body | None = None,
- timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """Cancel a vector store file batch.
- This attempts to cancel the processing of
- files in this batch as soon as possible.
- Args:
- extra_headers: Send extra headers
- extra_query: Add additional query parameters to the request
- extra_body: Add additional JSON properties to the request
- timeout: Override the client-level default timeout for this request, in seconds
- """
- if not vector_store_id:
- raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
- if not batch_id:
- raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
- extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
- return self._post(
- f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/cancel",
- options=make_request_options(
- extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
- ),
- cast_to=VectorStoreFileBatch,
- )
- def create_and_poll(
- self,
- vector_store_id: str,
- *,
- file_ids: List[str],
- poll_interval_ms: int | NotGiven = NOT_GIVEN,
- chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """Create a vector store batch and poll until all files have been processed."""
- batch = self.create(
- vector_store_id=vector_store_id,
- file_ids=file_ids,
- chunking_strategy=chunking_strategy,
- )
- # TODO: don't poll unless necessary??
- return self.poll(
- batch.id,
- vector_store_id=vector_store_id,
- poll_interval_ms=poll_interval_ms,
- )
- def list_files(
- self,
- batch_id: str,
- *,
- vector_store_id: str,
- after: str | NotGiven = NOT_GIVEN,
- before: str | NotGiven = NOT_GIVEN,
- filter: Literal["in_progress", "completed", "failed", "cancelled"] | NotGiven = NOT_GIVEN,
- limit: int | NotGiven = NOT_GIVEN,
- order: Literal["asc", "desc"] | NotGiven = NOT_GIVEN,
- # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
- # The extra values given here take precedence over values defined on the client or passed to this method.
- extra_headers: Headers | None = None,
- extra_query: Query | None = None,
- extra_body: Body | None = None,
- timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
- ) -> SyncCursorPage[VectorStoreFile]:
- """
- Returns a list of vector store files in a batch.
- Args:
- after: A cursor for use in pagination. `after` is an object ID that defines your place
- in the list. For instance, if you make a list request and receive 100 objects,
- ending with obj_foo, your subsequent call can include after=obj_foo in order to
- fetch the next page of the list.
- before: A cursor for use in pagination. `before` is an object ID that defines your place
- in the list. For instance, if you make a list request and receive 100 objects,
- starting with obj_foo, your subsequent call can include before=obj_foo in order
- to fetch the previous page of the list.
- filter: Filter by file status. One of `in_progress`, `completed`, `failed`, `cancelled`.
- limit: A limit on the number of objects to be returned. Limit can range between 1 and
- 100, and the default is 20.
- order: Sort order by the `created_at` timestamp of the objects. `asc` for ascending
- order and `desc` for descending order.
- extra_headers: Send extra headers
- extra_query: Add additional query parameters to the request
- extra_body: Add additional JSON properties to the request
- timeout: Override the client-level default timeout for this request, in seconds
- """
- if not vector_store_id:
- raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
- if not batch_id:
- raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
- extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
- return self._get_api_list(
- f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/files",
- page=SyncCursorPage[VectorStoreFile],
- options=make_request_options(
- extra_headers=extra_headers,
- extra_query=extra_query,
- extra_body=extra_body,
- timeout=timeout,
- query=maybe_transform(
- {
- "after": after,
- "before": before,
- "filter": filter,
- "limit": limit,
- "order": order,
- },
- file_batch_list_files_params.FileBatchListFilesParams,
- ),
- ),
- model=VectorStoreFile,
- )
- def poll(
- self,
- batch_id: str,
- *,
- vector_store_id: str,
- poll_interval_ms: int | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """Wait for the given file batch to be processed.
- Note: this will return even if one of the files failed to process, you need to
- check batch.file_counts.failed_count to handle this case.
- """
- headers: dict[str, str] = {"X-Stainless-Poll-Helper": "true"}
- if is_given(poll_interval_ms):
- headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)
- while True:
- response = self.with_raw_response.retrieve(
- batch_id,
- vector_store_id=vector_store_id,
- extra_headers=headers,
- )
- batch = response.parse()
- if batch.file_counts.in_progress > 0:
- if not is_given(poll_interval_ms):
- from_header = response.headers.get("openai-poll-after-ms")
- if from_header is not None:
- poll_interval_ms = int(from_header)
- else:
- poll_interval_ms = 1000
- self._sleep(poll_interval_ms / 1000)
- continue
- return batch
- def upload_and_poll(
- self,
- vector_store_id: str,
- *,
- files: Iterable[FileTypes],
- max_concurrency: int = 5,
- file_ids: List[str] = [],
- poll_interval_ms: int | NotGiven = NOT_GIVEN,
- chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """Uploads the given files concurrently and then creates a vector store file batch.
- If you've already uploaded certain files that you want to include in this batch
- then you can pass their IDs through the `file_ids` argument.
- By default, if any file upload fails then an exception will be eagerly raised.
- The number of concurrency uploads is configurable using the `max_concurrency`
- parameter.
- Note: this method only supports `asyncio` or `trio` as the backing async
- runtime.
- """
- results: list[FileObject] = []
- with ThreadPoolExecutor(max_workers=max_concurrency) as executor:
- futures: list[Future[FileObject]] = [
- executor.submit(
- self._client.files.create,
- file=file,
- purpose="assistants",
- )
- for file in files
- ]
- for future in as_completed(futures):
- exc = future.exception()
- if exc:
- raise exc
- results.append(future.result())
- batch = self.create_and_poll(
- vector_store_id=vector_store_id,
- file_ids=[*file_ids, *(f.id for f in results)],
- poll_interval_ms=poll_interval_ms,
- chunking_strategy=chunking_strategy,
- )
- return batch
- class AsyncFileBatches(AsyncAPIResource):
- @cached_property
- def with_raw_response(self) -> AsyncFileBatchesWithRawResponse:
- """
- This property can be used as a prefix for any HTTP method call to return
- the raw response object instead of the parsed content.
- For more information, see https://www.github.com/openai/openai-python#accessing-raw-response-data-eg-headers
- """
- return AsyncFileBatchesWithRawResponse(self)
- @cached_property
- def with_streaming_response(self) -> AsyncFileBatchesWithStreamingResponse:
- """
- An alternative to `.with_raw_response` that doesn't eagerly read the response body.
- For more information, see https://www.github.com/openai/openai-python#with_streaming_response
- """
- return AsyncFileBatchesWithStreamingResponse(self)
- async def create(
- self,
- vector_store_id: str,
- *,
- file_ids: List[str],
- chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
- # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
- # The extra values given here take precedence over values defined on the client or passed to this method.
- extra_headers: Headers | None = None,
- extra_query: Query | None = None,
- extra_body: Body | None = None,
- timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """
- Create a vector store file batch.
- Args:
- file_ids: A list of [File](https://platform.openai.com/docs/api-reference/files) IDs that
- the vector store should use. Useful for tools like `file_search` that can access
- files.
- chunking_strategy: The chunking strategy used to chunk the file(s). If not set, will use the `auto`
- strategy. Only applicable if `file_ids` is non-empty.
- extra_headers: Send extra headers
- extra_query: Add additional query parameters to the request
- extra_body: Add additional JSON properties to the request
- timeout: Override the client-level default timeout for this request, in seconds
- """
- if not vector_store_id:
- raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
- extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
- return await self._post(
- f"/vector_stores/{vector_store_id}/file_batches",
- body=await async_maybe_transform(
- {
- "file_ids": file_ids,
- "chunking_strategy": chunking_strategy,
- },
- file_batch_create_params.FileBatchCreateParams,
- ),
- options=make_request_options(
- extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
- ),
- cast_to=VectorStoreFileBatch,
- )
- async def retrieve(
- self,
- batch_id: str,
- *,
- vector_store_id: str,
- # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
- # The extra values given here take precedence over values defined on the client or passed to this method.
- extra_headers: Headers | None = None,
- extra_query: Query | None = None,
- extra_body: Body | None = None,
- timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """
- Retrieves a vector store file batch.
- Args:
- extra_headers: Send extra headers
- extra_query: Add additional query parameters to the request
- extra_body: Add additional JSON properties to the request
- timeout: Override the client-level default timeout for this request, in seconds
- """
- if not vector_store_id:
- raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
- if not batch_id:
- raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
- extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
- return await self._get(
- f"/vector_stores/{vector_store_id}/file_batches/{batch_id}",
- options=make_request_options(
- extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
- ),
- cast_to=VectorStoreFileBatch,
- )
- async def cancel(
- self,
- batch_id: str,
- *,
- vector_store_id: str,
- # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
- # The extra values given here take precedence over values defined on the client or passed to this method.
- extra_headers: Headers | None = None,
- extra_query: Query | None = None,
- extra_body: Body | None = None,
- timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """Cancel a vector store file batch.
- This attempts to cancel the processing of
- files in this batch as soon as possible.
- Args:
- extra_headers: Send extra headers
- extra_query: Add additional query parameters to the request
- extra_body: Add additional JSON properties to the request
- timeout: Override the client-level default timeout for this request, in seconds
- """
- if not vector_store_id:
- raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
- if not batch_id:
- raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
- extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
- return await self._post(
- f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/cancel",
- options=make_request_options(
- extra_headers=extra_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
- ),
- cast_to=VectorStoreFileBatch,
- )
- async def create_and_poll(
- self,
- vector_store_id: str,
- *,
- file_ids: List[str],
- poll_interval_ms: int | NotGiven = NOT_GIVEN,
- chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """Create a vector store batch and poll until all files have been processed."""
- batch = await self.create(
- vector_store_id=vector_store_id,
- file_ids=file_ids,
- chunking_strategy=chunking_strategy,
- )
- # TODO: don't poll unless necessary??
- return await self.poll(
- batch.id,
- vector_store_id=vector_store_id,
- poll_interval_ms=poll_interval_ms,
- )
- def list_files(
- self,
- batch_id: str,
- *,
- vector_store_id: str,
- after: str | NotGiven = NOT_GIVEN,
- before: str | NotGiven = NOT_GIVEN,
- filter: Literal["in_progress", "completed", "failed", "cancelled"] | NotGiven = NOT_GIVEN,
- limit: int | NotGiven = NOT_GIVEN,
- order: Literal["asc", "desc"] | NotGiven = NOT_GIVEN,
- # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs.
- # The extra values given here take precedence over values defined on the client or passed to this method.
- extra_headers: Headers | None = None,
- extra_query: Query | None = None,
- extra_body: Body | None = None,
- timeout: float | httpx.Timeout | None | NotGiven = NOT_GIVEN,
- ) -> AsyncPaginator[VectorStoreFile, AsyncCursorPage[VectorStoreFile]]:
- """
- Returns a list of vector store files in a batch.
- Args:
- after: A cursor for use in pagination. `after` is an object ID that defines your place
- in the list. For instance, if you make a list request and receive 100 objects,
- ending with obj_foo, your subsequent call can include after=obj_foo in order to
- fetch the next page of the list.
- before: A cursor for use in pagination. `before` is an object ID that defines your place
- in the list. For instance, if you make a list request and receive 100 objects,
- starting with obj_foo, your subsequent call can include before=obj_foo in order
- to fetch the previous page of the list.
- filter: Filter by file status. One of `in_progress`, `completed`, `failed`, `cancelled`.
- limit: A limit on the number of objects to be returned. Limit can range between 1 and
- 100, and the default is 20.
- order: Sort order by the `created_at` timestamp of the objects. `asc` for ascending
- order and `desc` for descending order.
- extra_headers: Send extra headers
- extra_query: Add additional query parameters to the request
- extra_body: Add additional JSON properties to the request
- timeout: Override the client-level default timeout for this request, in seconds
- """
- if not vector_store_id:
- raise ValueError(f"Expected a non-empty value for `vector_store_id` but received {vector_store_id!r}")
- if not batch_id:
- raise ValueError(f"Expected a non-empty value for `batch_id` but received {batch_id!r}")
- extra_headers = {"OpenAI-Beta": "assistants=v2", **(extra_headers or {})}
- return self._get_api_list(
- f"/vector_stores/{vector_store_id}/file_batches/{batch_id}/files",
- page=AsyncCursorPage[VectorStoreFile],
- options=make_request_options(
- extra_headers=extra_headers,
- extra_query=extra_query,
- extra_body=extra_body,
- timeout=timeout,
- query=maybe_transform(
- {
- "after": after,
- "before": before,
- "filter": filter,
- "limit": limit,
- "order": order,
- },
- file_batch_list_files_params.FileBatchListFilesParams,
- ),
- ),
- model=VectorStoreFile,
- )
- async def poll(
- self,
- batch_id: str,
- *,
- vector_store_id: str,
- poll_interval_ms: int | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """Wait for the given file batch to be processed.
- Note: this will return even if one of the files failed to process, you need to
- check batch.file_counts.failed_count to handle this case.
- """
- headers: dict[str, str] = {"X-Stainless-Poll-Helper": "true"}
- if is_given(poll_interval_ms):
- headers["X-Stainless-Custom-Poll-Interval"] = str(poll_interval_ms)
- while True:
- response = await self.with_raw_response.retrieve(
- batch_id,
- vector_store_id=vector_store_id,
- extra_headers=headers,
- )
- batch = response.parse()
- if batch.file_counts.in_progress > 0:
- if not is_given(poll_interval_ms):
- from_header = response.headers.get("openai-poll-after-ms")
- if from_header is not None:
- poll_interval_ms = int(from_header)
- else:
- poll_interval_ms = 1000
- await self._sleep(poll_interval_ms / 1000)
- continue
- return batch
- async def upload_and_poll(
- self,
- vector_store_id: str,
- *,
- files: Iterable[FileTypes],
- max_concurrency: int = 5,
- file_ids: List[str] = [],
- poll_interval_ms: int | NotGiven = NOT_GIVEN,
- chunking_strategy: FileChunkingStrategyParam | NotGiven = NOT_GIVEN,
- ) -> VectorStoreFileBatch:
- """Uploads the given files concurrently and then creates a vector store file batch.
- If you've already uploaded certain files that you want to include in this batch
- then you can pass their IDs through the `file_ids` argument.
- By default, if any file upload fails then an exception will be eagerly raised.
- The number of concurrency uploads is configurable using the `max_concurrency`
- parameter.
- Note: this method only supports `asyncio` or `trio` as the backing async
- runtime.
- """
- uploaded_files: list[FileObject] = []
- async_library = sniffio.current_async_library()
- if async_library == "asyncio":
- async def asyncio_upload_file(semaphore: asyncio.Semaphore, file: FileTypes) -> None:
- async with semaphore:
- file_obj = await self._client.files.create(
- file=file,
- purpose="assistants",
- )
- uploaded_files.append(file_obj)
- semaphore = asyncio.Semaphore(max_concurrency)
- tasks = [asyncio_upload_file(semaphore, file) for file in files]
- await asyncio.gather(*tasks)
- elif async_library == "trio":
- # We only import if the library is being used.
- # We support Python 3.7 so are using an older version of trio that does not have type information
- import trio # type: ignore # pyright: ignore[reportMissingTypeStubs]
- async def trio_upload_file(limiter: trio.CapacityLimiter, file: FileTypes) -> None:
- async with limiter:
- file_obj = await self._client.files.create(
- file=file,
- purpose="assistants",
- )
- uploaded_files.append(file_obj)
- limiter = trio.CapacityLimiter(max_concurrency)
- async with trio.open_nursery() as nursery:
- for file in files:
- nursery.start_soon(trio_upload_file, limiter, file) # pyright: ignore [reportUnknownMemberType]
- else:
- raise RuntimeError(
- f"Async runtime {async_library} is not supported yet. Only asyncio or trio is supported",
- )
- batch = await self.create_and_poll(
- vector_store_id=vector_store_id,
- file_ids=[*file_ids, *(f.id for f in uploaded_files)],
- poll_interval_ms=poll_interval_ms,
- chunking_strategy=chunking_strategy,
- )
- return batch
- class FileBatchesWithRawResponse:
- def __init__(self, file_batches: FileBatches) -> None:
- self._file_batches = file_batches
- self.create = _legacy_response.to_raw_response_wrapper(
- file_batches.create,
- )
- self.retrieve = _legacy_response.to_raw_response_wrapper(
- file_batches.retrieve,
- )
- self.cancel = _legacy_response.to_raw_response_wrapper(
- file_batches.cancel,
- )
- self.list_files = _legacy_response.to_raw_response_wrapper(
- file_batches.list_files,
- )
- class AsyncFileBatchesWithRawResponse:
- def __init__(self, file_batches: AsyncFileBatches) -> None:
- self._file_batches = file_batches
- self.create = _legacy_response.async_to_raw_response_wrapper(
- file_batches.create,
- )
- self.retrieve = _legacy_response.async_to_raw_response_wrapper(
- file_batches.retrieve,
- )
- self.cancel = _legacy_response.async_to_raw_response_wrapper(
- file_batches.cancel,
- )
- self.list_files = _legacy_response.async_to_raw_response_wrapper(
- file_batches.list_files,
- )
- class FileBatchesWithStreamingResponse:
- def __init__(self, file_batches: FileBatches) -> None:
- self._file_batches = file_batches
- self.create = to_streamed_response_wrapper(
- file_batches.create,
- )
- self.retrieve = to_streamed_response_wrapper(
- file_batches.retrieve,
- )
- self.cancel = to_streamed_response_wrapper(
- file_batches.cancel,
- )
- self.list_files = to_streamed_response_wrapper(
- file_batches.list_files,
- )
- class AsyncFileBatchesWithStreamingResponse:
- def __init__(self, file_batches: AsyncFileBatches) -> None:
- self._file_batches = file_batches
- self.create = async_to_streamed_response_wrapper(
- file_batches.create,
- )
- self.retrieve = async_to_streamed_response_wrapper(
- file_batches.retrieve,
- )
- self.cancel = async_to_streamed_response_wrapper(
- file_batches.cancel,
- )
- self.list_files = async_to_streamed_response_wrapper(
- file_batches.list_files,
- )
|