openapi_utils.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. import json
  2. import re
  3. from typing import Dict, Tuple, Optional
  4. from app.exceptions.exception import ValidateFailedError
  5. from app.schemas.tool.action import ActionMethod, ActionParam, ActionBodyType, ChatCompletionFunction
  6. # This function code from the Open Source Project TaskingAI.
  7. # The original code can be found at: https://github.com/TaskingAI/TaskingAI
  8. def validate_param_type(param_name: str, param_type: str):
  9. # check var type in [string, integer, number, boolean] but not object or array
  10. if param_type not in ["string", "integer", "number", "boolean", "object"]:
  11. raise ValidateFailedError(f"Param {param_name}'s type {param_type} is not supported.")
  12. # This function code from the Open Source Project TaskingAI.
  13. # The original code can be found at: https://github.com/TaskingAI/TaskingAI
  14. def _to_snake_case(name):
  15. # Convert CamelCase to snake_case
  16. temp = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
  17. return re.sub("([a-z0-9])([A-Z])", r"\1_\2", temp).lower()
  18. # This function code from the Open Source Project TaskingAI.
  19. # The original code can be found at: https://github.com/TaskingAI/TaskingAI
  20. def function_name(method, path, operation_id=None):
  21. if operation_id:
  22. # Use operationId and convert to snake_case
  23. return _to_snake_case(operation_id)
  24. # Remove leading and trailing slashes and split the path
  25. path_parts = path.strip("/").split("/")
  26. # Replace path parameters (such as {userId}) with 'by'
  27. path_parts = [re.sub(r"{\w+}", "by", part) for part in path_parts]
  28. # Combine the method and path parts into an underscore-separated string
  29. snake_case_name = "_".join([method.lower()] + path_parts)
  30. return snake_case_name
  31. # This function code from the Open Source Project TaskingAI.
  32. # The original code can be found at: https://github.com/TaskingAI/TaskingAI
  33. def _resolve_ref(document, ref):
  34. parts = ref.split("/")
  35. result = document
  36. for part in parts[1:]:
  37. result = result[part]
  38. return result
  39. # This function code from the Open Source Project TaskingAI.
  40. # The original code can be found at: https://github.com/TaskingAI/TaskingAI
  41. def _replace_refs(schema, document):
  42. if isinstance(schema, dict):
  43. if "$ref" in schema:
  44. ref_path = schema["$ref"]
  45. return _resolve_ref(document, ref_path)
  46. else:
  47. return {k: _replace_refs(v, document) for k, v in schema.items()}
  48. elif isinstance(schema, list):
  49. return [_replace_refs(item, document) for item in schema]
  50. else:
  51. return schema
  52. # This function code from the Open Source Project TaskingAI.
  53. # The original code can be found at: https://github.com/TaskingAI/TaskingAI
  54. def replace_openapi_refs(openapi_dict) -> Dict:
  55. processed_dict = _replace_refs(openapi_dict, openapi_dict)
  56. if "components" in processed_dict:
  57. del processed_dict["components"]
  58. return processed_dict
  59. # This function code from the Open Source Project TaskingAI.
  60. # The original code can be found at: https://github.com/TaskingAI/TaskingAI
  61. def split_openapi_schema(openapi_schema: Dict):
  62. # Check if the original JSON has 'paths' and 'servers' fields
  63. if "paths" not in openapi_schema or "servers" not in openapi_schema:
  64. return []
  65. base_json = {
  66. "openapi": openapi_schema.get("openapi", "3.0.0"),
  67. "info": openapi_schema.get("info", {}),
  68. "servers": openapi_schema.get("servers", []),
  69. "components": openapi_schema.get("components", {}),
  70. "security": openapi_schema.get("security", []),
  71. }
  72. split_jsons = []
  73. for path, methods in openapi_schema["paths"].items():
  74. for method, details in methods.items():
  75. # deep copy the base json
  76. new_json = json.loads(json.dumps(base_json))
  77. # only keep one path and method
  78. new_json["paths"] = {path: {method: details}}
  79. split_jsons.append(new_json)
  80. return split_jsons
  81. # This function code from the Open Source Project TaskingAI.
  82. # The original code can be found at: https://github.com/TaskingAI/TaskingAI
  83. def extract_params(
  84. openapi_schema: Dict,
  85. method: ActionMethod,
  86. path: str,
  87. ) -> Tuple[
  88. str,
  89. Optional[Dict[str, ActionParam]],
  90. Optional[Dict[str, ActionParam]],
  91. ActionBodyType,
  92. Optional[Dict[str, ActionParam]],
  93. ]:
  94. """
  95. Extract parameter schemas for an API call based on OpenAPI schema definitions.
  96. :param openapi_schema: The OpenAPI specification as a dictionary.
  97. :param method: The HTTP method as an instance of ActionMethod.
  98. :param path: The API endpoint path.
  99. :return: A tuple with the final URL, path_param_schema, query_param_schema, body_type, and body_param_schema.
  100. """
  101. # Extract base URL from OpenAPI schema and construct final endpoint URL
  102. base_url = openapi_schema["servers"][0]["url"]
  103. final_url = f"{base_url}{path}"
  104. path_param_schema = None
  105. query_param_schema = None
  106. body_param_schema = None
  107. body_type = ActionBodyType.NONE
  108. # Verify if the provided path exists in the OpenAPI schema
  109. path_item = openapi_schema["paths"].get(path)
  110. if path_item is None:
  111. raise ValidateFailedError(f"No path item found for path: {path}")
  112. # Verify if the provided method is defined for the path in the OpenAPI schema
  113. operation = path_item.get(method.value.lower())
  114. if operation is None:
  115. raise ValidateFailedError(f"No operation found for method: {method} at path: {path}")
  116. # Extract schemas for path and query parameters
  117. if "parameters" in operation:
  118. for param in operation["parameters"]:
  119. param_name = param["name"]
  120. param_in = param["in"]
  121. param_type = param["schema"]["type"]
  122. validate_param_type(param_name, param_type)
  123. action_param = ActionParam(
  124. type=param_type,
  125. description=param.get("description", ""),
  126. required=param.get("required", False),
  127. enum=param["schema"].get("enum"),
  128. )
  129. if param_in == "query":
  130. if query_param_schema is None:
  131. query_param_schema = {}
  132. query_param_schema[param_name] = action_param
  133. elif param_in == "path":
  134. if path_param_schema is None:
  135. path_param_schema = {}
  136. path_param_schema[param_name] = action_param
  137. # Extract information about the requestBody
  138. if "requestBody" in operation:
  139. content_types = operation["requestBody"]["content"].keys()
  140. original_body_param_schema = None
  141. if "application/json" in content_types:
  142. body_type = ActionBodyType.JSON
  143. original_body_param_schema = operation["requestBody"]["content"]["application/json"].get("schema", {})
  144. elif "application/x-www-form-urlencoded" in content_types:
  145. body_type = ActionBodyType.FORM
  146. original_body_param_schema = operation["requestBody"]["content"]["application/x-www-form-urlencoded"].get(
  147. "schema", {}
  148. )
  149. if original_body_param_schema:
  150. body_param_schema = {}
  151. for prop_name, prop_info in original_body_param_schema.get("properties", {}).items():
  152. param_type = prop_info.get("type")
  153. validate_param_type(prop_name, param_type)
  154. body_param_schema[prop_name] = ActionParam(
  155. type=param_type,
  156. description=prop_info.get("description", ""),
  157. enum=prop_info.get("enum", None),
  158. required=prop_name in original_body_param_schema.get("required", []),
  159. properties=prop_info.get("properties", None),
  160. )
  161. return final_url, path_param_schema, query_param_schema, body_type, body_param_schema
  162. # This function code from the Open Source Project TaskingAI.
  163. # The original code can be found at: https://github.com/TaskingAI/TaskingAI
  164. def build_function_def(
  165. name: str,
  166. description: str,
  167. path_param_schema: Dict,
  168. query_param_schema: Dict,
  169. body_param_schema: Dict,
  170. ) -> ChatCompletionFunction:
  171. """
  172. Build a function definition from provided schemas and metadata.
  173. :param name: the name of the function
  174. :param description: the description of the function
  175. :param path_param_schema: the path parameters schema
  176. :param query_param_schema: the query parameters schema
  177. :param body_param_schema: the body parameters schema
  178. :return: a dict of function definition
  179. """
  180. parameters_schema = {"type": "object", "properties": {}, "required": []}
  181. # Process and add path and query params to the schema
  182. for param_schemas in [path_param_schema, query_param_schema, body_param_schema]:
  183. if not param_schemas:
  184. continue
  185. for param_name, action_param in param_schemas.items():
  186. if not action_param.is_single_value_enum():
  187. parameters_schema["properties"][param_name] = action_param.dict(exclude_none=True)
  188. if action_param.required:
  189. parameters_schema["required"].append(param_name)
  190. function_def = ChatCompletionFunction(
  191. name=name,
  192. description=description,
  193. parameters=parameters_schema,
  194. )
  195. return function_def
  196. def action_param_schema_to_dict(param_schema: Optional[Dict[str, ActionParam]]):
  197. if not param_schema:
  198. return None
  199. ret = {}
  200. for param_name, param in param_schema.items():
  201. ret[param_name] = param.dict(exclude_none=True)
  202. return ret
  203. def action_param_dict_to_schema(param_schema: Optional[Dict[str, Dict]]):
  204. if not param_schema:
  205. return None
  206. ret = {}
  207. for param_name, param_dict in param_schema.items():
  208. ret[param_name] = ActionParam(**param_dict)
  209. return ret