| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 | from app.models.action import Actionfrom app.core.tools.base_tool import BaseToolfrom app.exceptions.exception import ResourceNotFoundErrorfrom app.services.tool.openapi_call import call_action_apifrom app.schemas.tool.action import ActionMethod, ActionBodyTypefrom app.services.tool.openapi_utils import action_param_dict_to_schemafrom app.schemas.tool.authentication import Authenticationclass OpenapiFunctionTool(BaseTool):    """    openapi tool, definition as follows:    {'id': '65d6c295a09d481250cc8ed1', 'type': 'action'}    """    name = ""    description = ""    args_schema = None    action: Action = None    def __init__(self, definition: dict, extra_body: dict, action: Action) -> None:        if definition["type"] != "action" or "id" not in definition:            raise ValueError(f"definition format error: {definition}")        # an exception is thrown if no action is found        if action is None:            raise ResourceNotFoundError(message="action not found")        if not action.use_for_everyone:            action_authentications = extra_body.get("action_authentications")            if action_authentications:                authentication = action_authentications.get(action.id)                if authentication:                    action.authentication = authentication                else:                    action.authentication = {"type": "none"}        self.action = action        self.openai_function = {"type": "function", "function": action.function_def}        self.name = action.function_def["name"]    def run(self, **arguments: dict) -> dict:        action = self.action        response = call_action_api(            url=action.url,            method=ActionMethod(action.method),            path_param_schema=action_param_dict_to_schema(action.path_param_schema),            query_param_schema=action_param_dict_to_schema(action.query_param_schema),            body_param_schema=action_param_dict_to_schema(action.body_param_schema),            body_type=ActionBodyType(action.body_type),            parameters=arguments,            headers={},            authentication=Authentication(**action.authentication),        )        return response
 |