message.py 10 KB


  1. from typing import List, Optional
  2. from fastapi import HTTPException
  3. from sqlalchemy.ext.asyncio import AsyncSession
  4. from sqlalchemy.orm import Session
  5. from sqlmodel import select
  6. from app.exceptions.exception import ResourceNotFoundError
  7. from app.models import MessageFile
  8. from app.models.message import Message, MessageCreate, MessageUpdate
  9. from app.services.thread.thread import ThreadService
  10. from app.models.token_relation import RelationType
  11. from app.providers.auth_provider import auth_policy
  12. from app.schemas.common import DeleteResponse
  13. from sqlalchemy.orm.attributes import flag_modified
  14. class MessageService:
  15. @staticmethod
  16. def format_message_content(message_create: MessageCreate) -> List:
  17. content = []
  18. if isinstance(message_create.content, str):
  19. content = message_create.content
  20. '''
  21. content.append(
  22. {
  23. "type": "text",
  24. "text": {"value": message_create.content, "annotations": []},
  25. }
  26. )
  27. '''
  28. elif isinstance(message_create.content, list):
  29. for msg in message_create.content:
  30. if msg.get("type") == "text":
  31. msg_value = msg.get("text")
  32. content = msg_value
  33. '''
  34. content.append(
  35. {
  36. "type": "text",
  37. "text": {"value": msg_value, "annotations": []},
  38. }
  39. )
  40. '''
  41. elif msg.get("type") == "image_file" or msg.get("type") == "image_url":
  42. content.append(msg)
  43. elif msg.get("type") == "input_audio":
  44. content.append(msg)
  45. return content
  46. @staticmethod
  47. def new_message(
  48. *, session: Session, content, role, assistant_id, thread_id, run_id
  49. ) -> Message:
  50. message = Message(
  51. content=[{"type": "text", "text": {"value": content, "annotations": []}}],
  52. role=role,
  53. assistant_id=assistant_id,
  54. thread_id=thread_id,
  55. run_id=run_id,
  56. )
  57. session.add(message)
  58. session.commit()
  59. session.refresh(message)
  60. return message
  61. @staticmethod
  62. def get_message_list(*, session: Session, thread_id) -> List[Message]:
  63. statement = (
  64. select(Message)
  65. .where(Message.thread_id == thread_id)
  66. .order_by(Message.created_at.desc())
  67. )
  68. return session.execute(statement).scalars().all()
  69. @staticmethod
  70. async def create_message(
  71. *, session: AsyncSession, body: MessageCreate, thread_id: str
  72. ) -> Message:
  73. # get thread
  74. thread = await ThreadService.get_thread(session=session, thread_id=thread_id)
  75. print(
  76. "create_messagecreate_messagecreate_messagecreate_messagecreate_messagecreate_message"
  77. )
  78. # print(thread)
  79. # print(body)
  80. # TODO message annotations
  81. body_file_ids = body.file_ids
  82. if body.attachments:
  83. body_file_ids = [a.get("file_id") for a in body.attachments]
  84. # print(body_file_ids)
  85. if body_file_ids:
  86. thread_file_ids = []
  87. if thread.tool_resources and "file_search" in thread.tool_resources:
  88. thread_file_ids = (
  89. thread.tool_resources.get("file_search")
  90. .get("vector_stores")[0]
  91. .get("file_ids")
  92. )
  93. for file_id in body_file_ids:
  94. if file_id not in thread_file_ids:
  95. thread_file_ids.append(file_id)
  96. print(thread_file_ids)
  97. # if thread_file_ids:
  98. if not thread.tool_resources:
  99. thread.tool_resources = {}
  100. if "file_search" not in thread.tool_resources:
  101. thread.tool_resources["file_search"] = {
  102. "vector_stores": [{"file_ids": []}]
  103. }
  104. thread.tool_resources.get("file_search").get("vector_stores")[0][
  105. "file_ids"
  106. ] = thread_file_ids
  107. setattr(thread, "tool_resources", thread.tool_resources)
  108. flag_modified(thread, "tool_resources")
  109. # thread.tool_resources = thread.tool_resources
  110. print(thread)
  111. session.add(thread)
  112. await session.commit()
  113. await session.refresh(thread)
  114. # session.add(thread)
  115. # await session.commit()
  116. # await session.refresh(thread)
  117. content = MessageService.format_message_content(body)
  118. db_message = Message.model_validate(
  119. body.model_dump(by_alias=True),
  120. update={"thread_id": thread_id, "content": content},
  121. from_attributes=True,
  122. )
  123. session.add(db_message)
  124. await session.commit()
  125. await session.refresh(db_message)
  126. return db_message
  127. @staticmethod
  128. def get_message_sync(
  129. *, session: Session, thread_id: str, message_id: str
  130. ) -> Message:
  131. statement = (
  132. select(Message)
  133. .where(Message.thread_id == thread_id)
  134. .where(Message.id == message_id)
  135. )
  136. result = session.execute(statement)
  137. message = result.scalars().one_or_none()
  138. if message is None:
  139. raise HTTPException(status_code=404, detail="Message not found")
  140. return message
  141. @staticmethod
  142. def modify_message_sync(
  143. *, session: Session, thread_id: str, message_id: str, body: MessageUpdate
  144. ) -> Message:
  145. if body.content:
  146. body.content = [
  147. {"type": "text", "text": {"value": body.content, "annotations": []}}
  148. ]
  149. # get thread
  150. ThreadService.get_thread_sync(thread_id=thread_id, session=session)
  151. # get message
  152. db_message = MessageService.get_message_sync(
  153. session=session, thread_id=thread_id, message_id=message_id
  154. )
  155. update_data = body.dict(exclude_unset=True)
  156. for key, value in update_data.items():
  157. setattr(db_message, key, value)
  158. session.add(db_message)
  159. session.commit()
  160. session.refresh(db_message)
  161. return db_message
  162. @staticmethod
  163. async def modify_message(
  164. *, session: AsyncSession, thread_id: str, message_id: str, body: MessageUpdate
  165. ) -> Message:
  166. if body.content:
  167. body.content = [
  168. {"type": "text", "text": {"value": body.content, "annotations": []}}
  169. ]
  170. # get thread
  171. await ThreadService.get_thread(thread_id=thread_id, session=session)
  172. # get message
  173. db_message = await MessageService.get_message(
  174. session=session, thread_id=thread_id, message_id=message_id
  175. )
  176. update_data = body.dict(exclude_unset=True)
  177. for key, value in update_data.items():
  178. setattr(db_message, key, value)
  179. session.add(db_message)
  180. await session.commit()
  181. await session.refresh(db_message)
  182. return db_message
  183. @staticmethod
  184. async def delete_message(
  185. *, session: AsyncSession, thread_id: str, message_id: str
  186. ) -> Message:
  187. message = await MessageService.get_message(
  188. session=session, thread_id=thread_id, message_id=message_id
  189. )
  190. await session.delete(message)
  191. await auth_policy.delete_token_rel(
  192. session=session, relation_type=RelationType.Message, relation_id=message_id
  193. )
  194. await session.commit()
  195. return DeleteResponse(id=message_id, object="message.deleted", deleted=True)
  196. @staticmethod
  197. async def get_message(
  198. *, session: AsyncSession, thread_id: str, message_id: str
  199. ) -> Message:
  200. statement = (
  201. select(Message)
  202. .where(Message.thread_id == thread_id)
  203. .where(Message.id == message_id)
  204. )
  205. result = await session.execute(statement)
  206. message = result.scalars().one_or_none()
  207. if message is None:
  208. raise HTTPException(status_code=404, detail="Message not found")
  209. return message
  210. @staticmethod
  211. async def get_message_file(
  212. *, session: AsyncSession, thread_id: str, message_id: str, file_id: str
  213. ) -> MessageFile:
  214. await MessageService.get_message(
  215. session=session, thread_id=thread_id, message_id=message_id
  216. )
  217. # get message files
  218. statement = (
  219. select(MessageFile)
  220. .where(MessageFile.id == file_id)
  221. .where(MessageFile.message_id == message_id)
  222. )
  223. result = await session.execute(statement)
  224. msg_file = result.scalars().one_or_none()
  225. if msg_file is None:
  226. raise ResourceNotFoundError(message="Message file not found")
  227. return msg_file
  228. @staticmethod
  229. async def copy_messages(
  230. *,
  231. session: AsyncSession,
  232. from_thread_id: str,
  233. to_thread_id: str,
  234. end_message_id: str
  235. ):
  236. """
  237. copy thread messages to another thread
  238. """
  239. statement = select(Message).where(Message.thread_id == from_thread_id)
  240. if end_message_id:
  241. statement = statement.where(Message.id <= end_message_id)
  242. result = await session.execute(statement.order_by(Message.id))
  243. original_messages = result.scalars().all()
  244. for original_message in original_messages:
  245. new_message = Message(
  246. thread_id=to_thread_id,
  247. **original_message.model_dump(
  248. exclude={"id", "created_at", "updated_at", "thread_id"}
  249. ),
  250. )
  251. session.add(new_message)
  252. await session.commit()
  253. @staticmethod
  254. async def create_messages(
  255. *,
  256. session: AsyncSession,
  257. thread_id: str,
  258. run_id: str,
  259. assistant_id: str,
  260. messages: list
  261. ):
  262. for original_message in messages:
  263. content = MessageService.format_message_content(original_message)
  264. new_message = Message.model_validate(
  265. original_message.model_dump(by_alias=True),
  266. update={
  267. "thread_id": thread_id,
  268. "run_id": run_id,
  269. "assistant_id": assistant_id,
  270. "content": content,
  271. "role": original_message.role,
  272. },
  273. )
  274. session.add(new_message)