main.py 1.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. import asyncio
  2. import base64
  3. import concurrent.futures
  4. import logging
  5. import os
  6. from io import BytesIO
  7. from typing import Optional
  8. from fastapi import FastAPI, HTTPException
  9. from pydantic import BaseModel
  10. from unstructured.partition.auto import partition
  11. logger = logging.getLogger()
  12. app = FastAPI()
  13. class PartitionRequestModel(BaseModel):
  14. file_content: bytes
  15. ingestion_config: dict
  16. filename: Optional[str] = None
  17. class PartitionResponseModel(BaseModel):
  18. elements: list[dict]
  19. executor = concurrent.futures.ThreadPoolExecutor(
  20. max_workers=int(os.environ.get("MAX_INGESTION_WORKERS", 10))
  21. )
  22. def run_partition(file_content: str, filename: str, ingestion_config: dict) -> list[dict]:
  23. file_content_bytes = base64.b64decode(file_content)
  24. file_io = BytesIO(file_content_bytes)
  25. elements = partition(file=file_io, file_filename=filename, **ingestion_config)
  26. return [element.to_dict() for element in elements]
  27. @app.get("/health")
  28. async def health_endpoint():
  29. return {"status": "ok"}
  30. @app.post("/partition", response_model=PartitionResponseModel)
  31. async def partition_endpoint(request: PartitionRequestModel):
  32. try:
  33. logger.info(f"Partitioning request received: {request}")
  34. loop = asyncio.get_event_loop()
  35. elements = await loop.run_in_executor(
  36. executor,
  37. run_partition,
  38. request.file_content,
  39. request.filename,
  40. request.ingestion_config,
  41. )
  42. logger.info("Partitioning completed")
  43. return PartitionResponseModel(elements=elements)
  44. except Exception as e:
  45. logger.error(f"Error partitioning file: {str(e)}")
  46. raise HTTPException(status_code=500, detail=str(e))