message.py 9.7 KB


  1. from typing import List, Optional
  2. from fastapi import HTTPException
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from sqlalchemy.orm import Session
  5. from sqlmodel import select
  6. from app.exceptions.exception import ResourceNotFoundError
  7. from app.models import MessageFile
  8. from app.models.message import Message, MessageCreate, MessageUpdate
  9. from app.services.thread.thread import ThreadService
  10. from app.models.token_relation import RelationType
  11. from app.providers.auth_provider import auth_policy
  12. from app.schemas.common import DeleteResponse
  13. class MessageService:
  14. @staticmethod
  15. def format_message_content(message_create: MessageCreate) -> List:
  16. content = []
  17. if isinstance(message_create.content, str):
  18. content.append(
  19. {
  20. "type": "text",
  21. "text": {"value": message_create.content, "annotations": []},
  22. }
  23. )
  24. elif isinstance(message_create.content, list):
  25. for msg in message_create.content:
  26. if msg.get("type") == "text":
  27. msg_value = msg.get("text")
  28. content.append(
  29. {
  30. "type": "text",
  31. "text": {"value": msg_value, "annotations": []},
  32. }
  33. )
  34. elif msg.get("type") == "image_file" or msg.get("type") == "image_url":
  35. content.append(msg)
  36. elif (
  37. msg.get("type") == "input_audio" or msg.get("type") == "input_audio"
  38. ):
  39. content.append(msg)
  40. return content
  41. @staticmethod
  42. def new_message(
  43. *, session: Session, content, role, assistant_id, thread_id, run_id
  44. ) -> Message:
  45. message = Message(
  46. content=[{"type": "text", "text": {"value": content, "annotations": []}}],
  47. role=role,
  48. assistant_id=assistant_id,
  49. thread_id=thread_id,
  50. run_id=run_id,
  51. )
  52. session.add(message)
  53. session.commit()
  54. session.refresh(message)
  55. return message
  56. @staticmethod
  57. def get_message_list(*, session: Session, thread_id) -> List[Message]:
  58. statement = (
  59. select(Message)
  60. .where(Message.thread_id == thread_id)
  61. .order_by(Message.created_at)
  62. )
  63. return session.execute(statement).scalars().all()
  64. @staticmethod
  65. async def create_message(
  66. *, session: AsyncSession, body: MessageCreate, thread_id: str
  67. ) -> Message:
  68. # get thread
  69. thread = await ThreadService.get_thread(thread_id=thread_id, session=session)
  70. print(
  71. "create_messagecreate_messagecreate_messagecreate_messagecreate_messagecreate_message"
  72. )
  73. print(thread)
  74. # TODO message annotations
  75. body_file_ids = body.file_ids
  76. if body.attachments:
  77. body_file_ids = [a.get("file_id") for a in body.attachments]
  78. if body_file_ids:
  79. thread_file_ids = []
  80. if thread.tool_resources and "file_search" in thread.tool_resources:
  81. thread_file_ids = (
  82. thread.tool_resources.get("file_search")
  83. .get("vector_stores")[0]
  84. .get("file_ids")
  85. )
  86. for file_id in body_file_ids:
  87. if file_id not in thread_file_ids:
  88. thread_file_ids.append(file_id)
  89. # if thread_file_ids:
  90. if not thread.tool_resources:
  91. thread.tool_resources = {}
  92. if "file_search" not in thread.tool_resources:
  93. thread.tool_resources["file_search"] = {
  94. "vector_stores": [{"file_ids": []}]
  95. }
  96. thread.tool_resources.get("file_search").get("vector_stores")[0][
  97. "file_ids"
  98. ] = thread_file_ids
  99. session.add(thread)
  100. content = MessageService.format_message_content(body)
  101. db_message = Message.model_validate(
  102. body.model_dump(by_alias=True),
  103. update={"thread_id": thread_id, "content": content},
  104. from_attributes=True,
  105. )
  106. session.add(db_message)
  107. await session.commit()
  108. await session.refresh(db_message)
  109. return db_message
  110. @staticmethod
  111. def get_message_sync(
  112. *, session: Session, thread_id: str, message_id: str
  113. ) -> Message:
  114. statement = (
  115. select(Message)
  116. .where(Message.thread_id == thread_id)
  117. .where(Message.id == message_id)
  118. )
  119. result = session.execute(statement)
  120. message = result.scalars().one_or_none()
  121. if message is None:
  122. raise HTTPException(status_code=404, detail="Message not found")
  123. return message
  124. @staticmethod
  125. def modify_message_sync(
  126. *, session: Session, thread_id: str, message_id: str, body: MessageUpdate
  127. ) -> Message:
  128. if body.content:
  129. body.content = [
  130. {"type": "text", "text": {"value": body.content, "annotations": []}}
  131. ]
  132. # get thread
  133. ThreadService.get_thread_sync(thread_id=thread_id, session=session)
  134. # get message
  135. db_message = MessageService.get_message_sync(
  136. session=session, thread_id=thread_id, message_id=message_id
  137. )
  138. update_data = body.dict(exclude_unset=True)
  139. for key, value in update_data.items():
  140. setattr(db_message, key, value)
  141. session.add(db_message)
  142. session.commit()
  143. session.refresh(db_message)
  144. return db_message
  145. @staticmethod
  146. async def modify_message(
  147. *, session: AsyncSession, thread_id: str, message_id: str, body: MessageUpdate
  148. ) -> Message:
  149. if body.content:
  150. body.content = [
  151. {"type": "text", "text": {"value": body.content, "annotations": []}}
  152. ]
  153. # get thread
  154. await ThreadService.get_thread(thread_id=thread_id, session=session)
  155. # get message
  156. db_message = await MessageService.get_message(
  157. session=session, thread_id=thread_id, message_id=message_id
  158. )
  159. update_data = body.dict(exclude_unset=True)
  160. for key, value in update_data.items():
  161. setattr(db_message, key, value)
  162. session.add(db_message)
  163. await session.commit()
  164. await session.refresh(db_message)
  165. return db_message
  166. @staticmethod
  167. async def delete_message(
  168. *, session: AsyncSession, thread_id: str, message_id: str
  169. ) -> Message:
  170. message = await MessageService.get_message(
  171. session=session, thread_id=thread_id, message_id=message_id
  172. )
  173. await session.delete(message)
  174. await auth_policy.delete_token_rel(
  175. session=session, relation_type=RelationType.Message, relation_id=message_id
  176. )
  177. await session.commit()
  178. return DeleteResponse(id=message_id, object="message.deleted", deleted=True)
  179. @staticmethod
  180. async def get_message(
  181. *, session: AsyncSession, thread_id: str, message_id: str
  182. ) -> Message:
  183. statement = (
  184. select(Message)
  185. .where(Message.thread_id == thread_id)
  186. .where(Message.id == message_id)
  187. )
  188. result = await session.execute(statement)
  189. message = result.scalars().one_or_none()
  190. if message is None:
  191. raise HTTPException(status_code=404, detail="Message not found")
  192. return message
  193. @staticmethod
  194. async def get_message_file(
  195. *, session: AsyncSession, thread_id: str, message_id: str, file_id: str
  196. ) -> MessageFile:
  197. await MessageService.get_message(
  198. session=session, thread_id=thread_id, message_id=message_id
  199. )
  200. # get message files
  201. statement = (
  202. select(MessageFile)
  203. .where(MessageFile.id == file_id)
  204. .where(MessageFile.message_id == message_id)
  205. )
  206. result = await session.execute(statement)
  207. msg_file = result.scalars().one_or_none()
  208. if msg_file is None:
  209. raise ResourceNotFoundError(message="Message file not found")
  210. return msg_file
  211. @staticmethod
  212. async def copy_messages(
  213. *,
  214. session: AsyncSession,
  215. from_thread_id: str,
  216. to_thread_id: str,
  217. end_message_id: str
  218. ):
  219. """
  220. copy thread messages to another thread
  221. """
  222. statement = select(Message).where(Message.thread_id == from_thread_id)
  223. if end_message_id:
  224. statement = statement.where(Message.id <= end_message_id)
  225. result = await session.execute(statement.order_by(Message.id))
  226. original_messages = result.scalars().all()
  227. for original_message in original_messages:
  228. new_message = Message(
  229. thread_id=to_thread_id,
  230. **original_message.model_dump(
  231. exclude={"id", "created_at", "updated_at", "thread_id"}
  232. ),
  233. )
  234. session.add(new_message)
  235. await session.commit()
  236. @staticmethod
  237. async def create_messages(
  238. *,
  239. session: AsyncSession,
  240. thread_id: str,
  241. run_id: str,
  242. assistant_id: str,
  243. messages: list
  244. ):
  245. for original_message in messages:
  246. content = MessageService.format_message_content(original_message)
  247. new_message = Message.model_validate(
  248. original_message.model_dump(by_alias=True),
  249. update={
  250. "thread_id": thread_id,
  251. "run_id": run_id,
  252. "assistant_id": assistant_id,
  253. "content": content,
  254. "role": original_message.role,
  255. },
  256. )
  257. session.add(new_message)