123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259 |
- import json
- import re
- from typing import Dict, Tuple, Optional
- from app.exceptions.exception import ValidateFailedError
- from app.schemas.tool.action import ActionMethod, ActionParam, ActionBodyType, ChatCompletionFunction
- # This function code from the Open Source Project TaskingAI.
- # The original code can be found at: https://github.com/TaskingAI/TaskingAI
- def validate_param_type(param_name: str, param_type: str):
- # check var type in [string, integer, number, boolean] but not object or array
- if param_type not in ["string", "integer", "number", "boolean", "object"]:
- raise ValidateFailedError(f"Param {param_name}'s type {param_type} is not supported.")
- # This function code from the Open Source Project TaskingAI.
- # The original code can be found at: https://github.com/TaskingAI/TaskingAI
- def _to_snake_case(name):
- # Convert CamelCase to snake_case
- temp = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
- return re.sub("([a-z0-9])([A-Z])", r"\1_\2", temp).lower()
- # This function code from the Open Source Project TaskingAI.
- # The original code can be found at: https://github.com/TaskingAI/TaskingAI
- def function_name(method, path, operation_id=None):
- if operation_id:
- # Use operationId and convert to snake_case
- return _to_snake_case(operation_id)
- # Remove leading and trailing slashes and split the path
- path_parts = path.strip("/").split("/")
- # Replace path parameters (such as {userId}) with 'by'
- path_parts = [re.sub(r"{\w+}", "by", part) for part in path_parts]
- # Combine the method and path parts into an underscore-separated string
- snake_case_name = "_".join([method.lower()] + path_parts)
- return snake_case_name
- # This function code from the Open Source Project TaskingAI.
- # The original code can be found at: https://github.com/TaskingAI/TaskingAI
- def _resolve_ref(document, ref):
- parts = ref.split("/")
- result = document
- for part in parts[1:]:
- result = result[part]
- return result
- # This function code from the Open Source Project TaskingAI.
- # The original code can be found at: https://github.com/TaskingAI/TaskingAI
- def _replace_refs(schema, document):
- if isinstance(schema, dict):
- if "$ref" in schema:
- ref_path = schema["$ref"]
- return _resolve_ref(document, ref_path)
- else:
- return {k: _replace_refs(v, document) for k, v in schema.items()}
- elif isinstance(schema, list):
- return [_replace_refs(item, document) for item in schema]
- else:
- return schema
- # This function code from the Open Source Project TaskingAI.
- # The original code can be found at: https://github.com/TaskingAI/TaskingAI
- def replace_openapi_refs(openapi_dict) -> Dict:
- processed_dict = _replace_refs(openapi_dict, openapi_dict)
- if "components" in processed_dict:
- del processed_dict["components"]
- return processed_dict
- # This function code from the Open Source Project TaskingAI.
- # The original code can be found at: https://github.com/TaskingAI/TaskingAI
- def split_openapi_schema(openapi_schema: Dict):
- # Check if the original JSON has 'paths' and 'servers' fields
- if "paths" not in openapi_schema or "servers" not in openapi_schema:
- return []
- base_json = {
- "openapi": openapi_schema.get("openapi", "3.0.0"),
- "info": openapi_schema.get("info", {}),
- "servers": openapi_schema.get("servers", []),
- "components": openapi_schema.get("components", {}),
- "security": openapi_schema.get("security", []),
- }
- split_jsons = []
- for path, methods in openapi_schema["paths"].items():
- for method, details in methods.items():
- # deep copy the base json
- new_json = json.loads(json.dumps(base_json))
- # only keep one path and method
- new_json["paths"] = {path: {method: details}}
- split_jsons.append(new_json)
- return split_jsons
- # This function code from the Open Source Project TaskingAI.
- # The original code can be found at: https://github.com/TaskingAI/TaskingAI
- def extract_params(
- openapi_schema: Dict,
- method: ActionMethod,
- path: str,
- ) -> Tuple[
- str,
- Optional[Dict[str, ActionParam]],
- Optional[Dict[str, ActionParam]],
- ActionBodyType,
- Optional[Dict[str, ActionParam]],
- ]:
- """
- Extract parameter schemas for an API call based on OpenAPI schema definitions.
- :param openapi_schema: The OpenAPI specification as a dictionary.
- :param method: The HTTP method as an instance of ActionMethod.
- :param path: The API endpoint path.
- :return: A tuple with the final URL, path_param_schema, query_param_schema, body_type, and body_param_schema.
- """
- # Extract base URL from OpenAPI schema and construct final endpoint URL
- base_url = openapi_schema["servers"][0]["url"]
- final_url = f"{base_url}{path}"
- path_param_schema = None
- query_param_schema = None
- body_param_schema = None
- body_type = ActionBodyType.NONE
- # Verify if the provided path exists in the OpenAPI schema
- path_item = openapi_schema["paths"].get(path)
- if path_item is None:
- raise ValidateFailedError(f"No path item found for path: {path}")
- # Verify if the provided method is defined for the path in the OpenAPI schema
- operation = path_item.get(method.value.lower())
- if operation is None:
- raise ValidateFailedError(f"No operation found for method: {method} at path: {path}")
- # Extract schemas for path and query parameters
- if "parameters" in operation:
- for param in operation["parameters"]:
- param_name = param["name"]
- param_in = param["in"]
- param_type = param["schema"]["type"]
- validate_param_type(param_name, param_type)
- action_param = ActionParam(
- type=param_type,
- description=param.get("description", ""),
- required=param.get("required", False),
- enum=param["schema"].get("enum"),
- )
- if param_in == "query":
- if query_param_schema is None:
- query_param_schema = {}
- query_param_schema[param_name] = action_param
- elif param_in == "path":
- if path_param_schema is None:
- path_param_schema = {}
- path_param_schema[param_name] = action_param
- # Extract information about the requestBody
- if "requestBody" in operation:
- content_types = operation["requestBody"]["content"].keys()
- original_body_param_schema = None
- if "application/json" in content_types:
- body_type = ActionBodyType.JSON
- original_body_param_schema = operation["requestBody"]["content"]["application/json"].get("schema", {})
- elif "application/x-www-form-urlencoded" in content_types:
- body_type = ActionBodyType.FORM
- original_body_param_schema = operation["requestBody"]["content"]["application/x-www-form-urlencoded"].get(
- "schema", {}
- )
- if original_body_param_schema:
- body_param_schema = {}
- for prop_name, prop_info in original_body_param_schema.get("properties", {}).items():
- param_type = prop_info.get("type")
- validate_param_type(prop_name, param_type)
- body_param_schema[prop_name] = ActionParam(
- type=param_type,
- description=prop_info.get("description", ""),
- enum=prop_info.get("enum", None),
- required=prop_name in original_body_param_schema.get("required", []),
- properties=prop_info.get("properties", None),
- )
- return final_url, path_param_schema, query_param_schema, body_type, body_param_schema
- # This function code from the Open Source Project TaskingAI.
- # The original code can be found at: https://github.com/TaskingAI/TaskingAI
- def build_function_def(
- name: str,
- description: str,
- path_param_schema: Dict,
- query_param_schema: Dict,
- body_param_schema: Dict,
- ) -> ChatCompletionFunction:
- """
- Build a function definition from provided schemas and metadata.
- :param name: the name of the function
- :param description: the description of the function
- :param path_param_schema: the path parameters schema
- :param query_param_schema: the query parameters schema
- :param body_param_schema: the body parameters schema
- :return: a dict of function definition
- """
- parameters_schema = {"type": "object", "properties": {}, "required": []}
- # Process and add path and query params to the schema
- for param_schemas in [path_param_schema, query_param_schema, body_param_schema]:
- if not param_schemas:
- continue
- for param_name, action_param in param_schemas.items():
- if not action_param.is_single_value_enum():
- parameters_schema["properties"][param_name] = action_param.dict(exclude_none=True)
- if action_param.required:
- parameters_schema["required"].append(param_name)
- function_def = ChatCompletionFunction(
- name=name,
- description=description,
- parameters=parameters_schema,
- )
- return function_def
- def action_param_schema_to_dict(param_schema: Optional[Dict[str, ActionParam]]):
- if not param_schema:
- return None
- ret = {}
- for param_name, param in param_schema.items():
- ret[param_name] = param.dict(exclude_none=True)
- return ret
- def action_param_dict_to_schema(param_schema: Optional[Dict[str, Dict]]):
- if not param_schema:
- return None
- ret = {}
- for param_name, param_dict in param_schema.items():
- ret[param_name] = ActionParam(**param_dict)
- return ret
|