|
@@ -37,9 +37,7 @@ from typing import (
|
|
|
Any,
|
|
|
Callable,
|
|
|
Collection,
|
|
|
- Dict,
|
|
|
Iterable,
|
|
|
- List,
|
|
|
Literal,
|
|
|
Optional,
|
|
|
Sequence,
|
|
@@ -47,7 +45,6 @@ from typing import (
|
|
|
Type,
|
|
|
TypedDict,
|
|
|
TypeVar,
|
|
|
- Union,
|
|
|
cast,
|
|
|
)
|
|
|
|
|
@@ -64,16 +61,16 @@ class BaseSerialized(TypedDict):
|
|
|
"""Base class for serialized objects."""
|
|
|
|
|
|
lc: int
|
|
|
- id: List[str]
|
|
|
+ id: list[str]
|
|
|
name: NotRequired[str]
|
|
|
- graph: NotRequired[Dict[str, Any]]
|
|
|
+ graph: NotRequired[dict[str, Any]]
|
|
|
|
|
|
|
|
|
class SerializedConstructor(BaseSerialized):
|
|
|
"""Serialized constructor."""
|
|
|
|
|
|
type: Literal["constructor"]
|
|
|
- kwargs: Dict[str, Any]
|
|
|
+ kwargs: dict[str, Any]
|
|
|
|
|
|
|
|
|
class SerializedSecret(BaseSerialized):
|
|
@@ -115,7 +112,7 @@ class Serializable(BaseModel, ABC):
|
|
|
return False
|
|
|
|
|
|
@classmethod
|
|
|
- def get_lc_namespace(cls) -> List[str]:
|
|
|
+ def get_lc_namespace(cls) -> list[str]:
|
|
|
"""Get the namespace of the langchain object.
|
|
|
|
|
|
For example, if the class is `langchain.llms.openai.OpenAI`, then the
|
|
@@ -124,16 +121,16 @@ class Serializable(BaseModel, ABC):
|
|
|
return cls.__module__.split(".")
|
|
|
|
|
|
@property
|
|
|
- def lc_secrets(self) -> Dict[str, str]:
|
|
|
+ def lc_secrets(self) -> dict[str, str]:
|
|
|
"""A map of constructor argument names to secret ids.
|
|
|
|
|
|
For example,
|
|
|
{"openai_api_key": "OPENAI_API_KEY"}
|
|
|
"""
|
|
|
- return dict()
|
|
|
+ return {}
|
|
|
|
|
|
@property
|
|
|
- def lc_attributes(self) -> Dict:
|
|
|
+ def lc_attributes(self) -> dict:
|
|
|
"""List of attribute names that should be included in the serialized kwargs.
|
|
|
|
|
|
These attributes must be accepted by the constructor.
|
|
@@ -141,7 +138,7 @@ class Serializable(BaseModel, ABC):
|
|
|
return {}
|
|
|
|
|
|
@classmethod
|
|
|
- def lc_id(cls) -> List[str]:
|
|
|
+ def lc_id(cls) -> list[str]:
|
|
|
"""A unique identifier for this class for serialization purposes.
|
|
|
|
|
|
The unique identifier is a list of strings that describes the path
|
|
@@ -159,7 +156,7 @@ class Serializable(BaseModel, ABC):
|
|
|
if (k not in self.__fields__ or try_neq_default(v, k, self))
|
|
|
]
|
|
|
|
|
|
- _lc_kwargs = PrivateAttr(default_factory=dict)
|
|
|
+ _lc_kwargs: dict[str, Any] = PrivateAttr(default_factory=dict)
|
|
|
|
|
|
def __init__(self, **kwargs: Any) -> None:
|
|
|
super().__init__(**kwargs)
|
|
@@ -167,7 +164,7 @@ class Serializable(BaseModel, ABC):
|
|
|
|
|
|
def to_json(
|
|
|
self,
|
|
|
- ) -> Union[SerializedConstructor, SerializedNotImplemented]:
|
|
|
+ ) -> SerializedConstructor | SerializedNotImplemented:
|
|
|
if not self.is_lc_serializable():
|
|
|
return self.to_json_not_implemented()
|
|
|
|
|
@@ -238,8 +235,8 @@ class Serializable(BaseModel, ABC):
|
|
|
|
|
|
|
|
|
def _replace_secrets(
|
|
|
- root: Dict[Any, Any], secrets_map: Dict[str, str]
|
|
|
-) -> Dict[Any, Any]:
|
|
|
+ root: dict[Any, Any], secrets_map: dict[str, str]
|
|
|
+) -> dict[Any, Any]:
|
|
|
result = root.copy()
|
|
|
for path, secret_id in secrets_map.items():
|
|
|
[*parts, last] = path.split(".")
|
|
@@ -267,7 +264,7 @@ def to_json_not_implemented(obj: object) -> SerializedNotImplemented:
|
|
|
Returns:
|
|
|
SerializedNotImplemented
|
|
|
"""
|
|
|
- _id: List[str] = []
|
|
|
+ _id: list[str] = []
|
|
|
try:
|
|
|
if hasattr(obj, "__name__"):
|
|
|
_id = [*obj.__module__.split("."), obj.__name__]
|
|
@@ -313,7 +310,7 @@ class SplitterDocument(Serializable):
|
|
|
return True
|
|
|
|
|
|
@classmethod
|
|
|
- def get_lc_namespace(cls) -> List[str]:
|
|
|
+ def get_lc_namespace(cls) -> list[str]:
|
|
|
"""Get the namespace of the langchain object."""
|
|
|
return ["langchain", "schema", "document"]
|
|
|
|
|
@@ -406,7 +403,7 @@ def _make_spacy_pipe_for_splitting(
|
|
|
|
|
|
def _split_text_with_regex(
|
|
|
text: str, separator: str, keep_separator: bool
|
|
|
-) -> List[str]:
|
|
|
+) -> list[str]:
|
|
|
# Now that we have the separator, split the text
|
|
|
if separator:
|
|
|
if keep_separator:
|
|
@@ -461,12 +458,12 @@ class TextSplitter(BaseDocumentTransformer, ABC):
|
|
|
self._strip_whitespace = strip_whitespace
|
|
|
|
|
|
@abstractmethod
|
|
|
- def split_text(self, text: str) -> List[str]:
|
|
|
+ def split_text(self, text: str) -> list[str]:
|
|
|
"""Split text into multiple components."""
|
|
|
|
|
|
def create_documents(
|
|
|
- self, texts: List[str], metadatas: Optional[List[dict]] = None
|
|
|
- ) -> List[SplitterDocument]:
|
|
|
+ self, texts: list[str], metadatas: Optional[list[dict]] = None
|
|
|
+ ) -> list[SplitterDocument]:
|
|
|
"""Create documents from a list of texts."""
|
|
|
_metadatas = metadatas or [{}] * len(texts)
|
|
|
documents = []
|
|
@@ -488,7 +485,7 @@ class TextSplitter(BaseDocumentTransformer, ABC):
|
|
|
|
|
|
def split_documents(
|
|
|
self, documents: Iterable[SplitterDocument]
|
|
|
- ) -> List[SplitterDocument]:
|
|
|
+ ) -> list[SplitterDocument]:
|
|
|
"""Split documents."""
|
|
|
texts, metadatas = [], []
|
|
|
for doc in documents:
|
|
@@ -496,7 +493,7 @@ class TextSplitter(BaseDocumentTransformer, ABC):
|
|
|
metadatas.append(doc.metadata)
|
|
|
return self.create_documents(texts, metadatas=metadatas)
|
|
|
|
|
|
- def _join_docs(self, docs: List[str], separator: str) -> Optional[str]:
|
|
|
+ def _join_docs(self, docs: list[str], separator: str) -> Optional[str]:
|
|
|
text = separator.join(docs)
|
|
|
if self._strip_whitespace:
|
|
|
text = text.strip()
|
|
@@ -507,13 +504,13 @@ class TextSplitter(BaseDocumentTransformer, ABC):
|
|
|
|
|
|
def _merge_splits(
|
|
|
self, splits: Iterable[str], separator: str
|
|
|
- ) -> List[str]:
|
|
|
+ ) -> list[str]:
|
|
|
# We now want to combine these smaller pieces into medium size
|
|
|
# chunks to send to the LLM.
|
|
|
separator_len = self._length_function(separator)
|
|
|
|
|
|
docs = []
|
|
|
- current_doc: List[str] = []
|
|
|
+ current_doc: list[str] = []
|
|
|
total = 0
|
|
|
for d in splits:
|
|
|
_len = self._length_function(d)
|
|
@@ -579,8 +576,8 @@ class TextSplitter(BaseDocumentTransformer, ABC):
|
|
|
cls: Type[TS],
|
|
|
encoding_name: str = "gpt2",
|
|
|
model: Optional[str] = None,
|
|
|
- allowed_special: Union[Literal["all"], AbstractSet[str]] = set(),
|
|
|
- disallowed_special: Union[Literal["all"], Collection[str]] = "all",
|
|
|
+ allowed_special: Literal["all"] | AbstractSet[str] = set(),
|
|
|
+ disallowed_special: Literal["all"] | Collection[str] = "all",
|
|
|
**kwargs: Any,
|
|
|
) -> TS:
|
|
|
"""Text splitter that uses tiktoken encoder to count length."""
|
|
@@ -641,7 +638,7 @@ class CharacterTextSplitter(TextSplitter):
|
|
|
self._separator = separator
|
|
|
self._is_separator_regex = is_separator_regex
|
|
|
|
|
|
- def split_text(self, text: str) -> List[str]:
|
|
|
+ def split_text(self, text: str) -> list[str]:
|
|
|
"""Split incoming text and return chunks."""
|
|
|
# First we naively split the large input into a bunch of smaller ones.
|
|
|
separator = (
|
|
@@ -657,7 +654,7 @@ class CharacterTextSplitter(TextSplitter):
|
|
|
class LineType(TypedDict):
|
|
|
"""Line type as typed dict."""
|
|
|
|
|
|
- metadata: Dict[str, str]
|
|
|
+ metadata: dict[str, str]
|
|
|
content: str
|
|
|
|
|
|
|
|
@@ -674,7 +671,7 @@ class MarkdownHeaderTextSplitter:
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
- headers_to_split_on: List[Tuple[str, str]],
|
|
|
+ headers_to_split_on: list[Tuple[str, str]],
|
|
|
return_each_line: bool = False,
|
|
|
strip_headers: bool = True,
|
|
|
):
|
|
@@ -696,13 +693,13 @@ class MarkdownHeaderTextSplitter:
|
|
|
self.strip_headers = strip_headers
|
|
|
|
|
|
def aggregate_lines_to_chunks(
|
|
|
- self, lines: List[LineType]
|
|
|
- ) -> List[SplitterDocument]:
|
|
|
+ self, lines: list[LineType]
|
|
|
+ ) -> list[SplitterDocument]:
|
|
|
"""Combine lines with common metadata into chunks
|
|
|
Args:
|
|
|
lines: Line of text / associated header metadata
|
|
|
"""
|
|
|
- aggregated_chunks: List[LineType] = []
|
|
|
+ aggregated_chunks: list[LineType] = []
|
|
|
|
|
|
for line in lines:
|
|
|
if (
|
|
@@ -742,7 +739,7 @@ class MarkdownHeaderTextSplitter:
|
|
|
for chunk in aggregated_chunks
|
|
|
]
|
|
|
|
|
|
- def split_text(self, text: str) -> List[SplitterDocument]:
|
|
|
+ def split_text(self, text: str) -> list[SplitterDocument]:
|
|
|
"""Split markdown file
|
|
|
Args:
|
|
|
text: Markdown file"""
|
|
@@ -750,14 +747,14 @@ class MarkdownHeaderTextSplitter:
|
|
|
# Split the input text by newline character ("\n").
|
|
|
lines = text.split("\n")
|
|
|
# Final output
|
|
|
- lines_with_metadata: List[LineType] = []
|
|
|
+ lines_with_metadata: list[LineType] = []
|
|
|
# Content and metadata of the chunk currently being processed
|
|
|
- current_content: List[str] = []
|
|
|
- current_metadata: Dict[str, str] = {}
|
|
|
+ current_content: list[str] = []
|
|
|
+ current_metadata: dict[str, str] = {}
|
|
|
# Keep track of the nested header structure
|
|
|
- # header_stack: List[Dict[str, Union[int, str]]] = []
|
|
|
- header_stack: List[HeaderType] = []
|
|
|
- initial_metadata: Dict[str, str] = {}
|
|
|
+ # header_stack: list[dict[str, int | str]] = []
|
|
|
+ header_stack: list[HeaderType] = []
|
|
|
+ initial_metadata: dict[str, str] = {}
|
|
|
|
|
|
in_code_block = False
|
|
|
opening_fence = ""
|
|
@@ -879,7 +876,7 @@ class ElementType(TypedDict):
|
|
|
url: str
|
|
|
xpath: str
|
|
|
content: str
|
|
|
- metadata: Dict[str, str]
|
|
|
+ metadata: dict[str, str]
|
|
|
|
|
|
|
|
|
class HTMLHeaderTextSplitter:
|
|
@@ -890,7 +887,7 @@ class HTMLHeaderTextSplitter:
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
- headers_to_split_on: List[Tuple[str, str]],
|
|
|
+ headers_to_split_on: list[Tuple[str, str]],
|
|
|
return_each_element: bool = False,
|
|
|
):
|
|
|
"""Create a new HTMLHeaderTextSplitter.
|
|
@@ -906,14 +903,14 @@ class HTMLHeaderTextSplitter:
|
|
|
self.headers_to_split_on = sorted(headers_to_split_on)
|
|
|
|
|
|
def aggregate_elements_to_chunks(
|
|
|
- self, elements: List[ElementType]
|
|
|
- ) -> List[SplitterDocument]:
|
|
|
+ self, elements: list[ElementType]
|
|
|
+ ) -> list[SplitterDocument]:
|
|
|
"""Combine elements with common metadata into chunks
|
|
|
|
|
|
Args:
|
|
|
elements: HTML element content with associated identifying info and metadata
|
|
|
"""
|
|
|
- aggregated_chunks: List[ElementType] = []
|
|
|
+ aggregated_chunks: list[ElementType] = []
|
|
|
|
|
|
for element in elements:
|
|
|
if (
|
|
@@ -935,7 +932,7 @@ class HTMLHeaderTextSplitter:
|
|
|
for chunk in aggregated_chunks
|
|
|
]
|
|
|
|
|
|
- def split_text_from_url(self, url: str) -> List[SplitterDocument]:
|
|
|
+ def split_text_from_url(self, url: str) -> list[SplitterDocument]:
|
|
|
"""Split HTML from web URL
|
|
|
|
|
|
Args:
|
|
@@ -944,7 +941,7 @@ class HTMLHeaderTextSplitter:
|
|
|
r = requests.get(url)
|
|
|
return self.split_text_from_file(BytesIO(r.content))
|
|
|
|
|
|
- def split_text(self, text: str) -> List[SplitterDocument]:
|
|
|
+ def split_text(self, text: str) -> list[SplitterDocument]:
|
|
|
"""Split HTML text string
|
|
|
|
|
|
Args:
|
|
@@ -952,7 +949,7 @@ class HTMLHeaderTextSplitter:
|
|
|
"""
|
|
|
return self.split_text_from_file(StringIO(text))
|
|
|
|
|
|
- def split_text_from_file(self, file: Any) -> List[SplitterDocument]:
|
|
|
+ def split_text_from_file(self, file: Any) -> list[SplitterDocument]:
|
|
|
"""Split HTML file
|
|
|
|
|
|
Args:
|
|
@@ -1048,15 +1045,15 @@ class Tokenizer:
|
|
|
"""Overlap in tokens between chunks"""
|
|
|
tokens_per_chunk: int
|
|
|
"""Maximum number of tokens per chunk"""
|
|
|
- decode: Callable[[List[int]], str]
|
|
|
+ decode: Callable[[list[int]], str]
|
|
|
""" Function to decode a list of token ids to a string"""
|
|
|
- encode: Callable[[str], List[int]]
|
|
|
+ encode: Callable[[str], list[int]]
|
|
|
""" Function to encode a string to a list of token ids"""
|
|
|
|
|
|
|
|
|
-def split_text_on_tokens(*, text: str, tokenizer: Tokenizer) -> List[str]:
|
|
|
+def split_text_on_tokens(*, text: str, tokenizer: Tokenizer) -> list[str]:
|
|
|
"""Split incoming text and return chunks using tokenizer."""
|
|
|
- splits: List[str] = []
|
|
|
+ splits: list[str] = []
|
|
|
input_ids = tokenizer.encode(text)
|
|
|
start_idx = 0
|
|
|
cur_idx = min(start_idx + tokenizer.tokens_per_chunk, len(input_ids))
|
|
@@ -1078,8 +1075,8 @@ class TokenTextSplitter(TextSplitter):
|
|
|
self,
|
|
|
encoding_name: str = "gpt2",
|
|
|
model: Optional[str] = None,
|
|
|
- allowed_special: Union[Literal["all"], AbstractSet[str]] = set(),
|
|
|
- disallowed_special: Union[Literal["all"], Collection[str]] = "all",
|
|
|
+ allowed_special: Literal["all"] | AbstractSet[str] = set(),
|
|
|
+ disallowed_special: Literal["all"] | Collection[str] = "all",
|
|
|
**kwargs: Any,
|
|
|
) -> None:
|
|
|
"""Create a new TextSplitter."""
|
|
@@ -1101,8 +1098,8 @@ class TokenTextSplitter(TextSplitter):
|
|
|
self._allowed_special = allowed_special
|
|
|
self._disallowed_special = disallowed_special
|
|
|
|
|
|
- def split_text(self, text: str) -> List[str]:
|
|
|
- def _encode(_text: str) -> List[int]:
|
|
|
+ def split_text(self, text: str) -> list[str]:
|
|
|
+ def _encode(_text: str) -> list[int]:
|
|
|
return self._tokenizer.encode(
|
|
|
_text,
|
|
|
allowed_special=self._allowed_special,
|
|
@@ -1164,8 +1161,8 @@ class SentenceTransformersTokenTextSplitter(TextSplitter):
|
|
|
f" > maximum token limit."
|
|
|
)
|
|
|
|
|
|
- def split_text(self, text: str) -> List[str]:
|
|
|
- def encode_strip_start_and_stop_token_ids(text: str) -> List[int]:
|
|
|
+ def split_text(self, text: str) -> list[str]:
|
|
|
+ def encode_strip_start_and_stop_token_ids(text: str) -> list[int]:
|
|
|
return self._encode(text)[1:-1]
|
|
|
|
|
|
tokenizer = Tokenizer(
|
|
@@ -1182,7 +1179,7 @@ class SentenceTransformersTokenTextSplitter(TextSplitter):
|
|
|
|
|
|
_max_length_equal_32_bit_integer: int = 2**32
|
|
|
|
|
|
- def _encode(self, text: str) -> List[int]:
|
|
|
+ def _encode(self, text: str) -> list[int]:
|
|
|
token_ids_with_start_and_end_token_ids = self.tokenizer.encode(
|
|
|
text,
|
|
|
max_length=self._max_length_equal_32_bit_integer,
|
|
@@ -1228,7 +1225,7 @@ class RecursiveCharacterTextSplitter(TextSplitter):
|
|
|
|
|
|
def __init__(
|
|
|
self,
|
|
|
- separators: Optional[List[str]] = None,
|
|
|
+ separators: Optional[list[str]] = None,
|
|
|
keep_separator: bool = True,
|
|
|
is_separator_regex: bool = False,
|
|
|
chunk_size: int = 4000,
|
|
@@ -1247,7 +1244,7 @@ class RecursiveCharacterTextSplitter(TextSplitter):
|
|
|
self.chunk_size = chunk_size
|
|
|
self.chunk_overlap = chunk_overlap
|
|
|
|
|
|
- def _split_text(self, text: str, separators: List[str]) -> List[str]:
|
|
|
+ def _split_text(self, text: str, separators: list[str]) -> list[str]:
|
|
|
"""Split incoming text and return chunks."""
|
|
|
final_chunks = []
|
|
|
# Get appropriate separator to use
|
|
@@ -1289,7 +1286,7 @@ class RecursiveCharacterTextSplitter(TextSplitter):
|
|
|
final_chunks.extend(merged_text)
|
|
|
return final_chunks
|
|
|
|
|
|
- def split_text(self, text: str) -> List[str]:
|
|
|
+ def split_text(self, text: str) -> list[str]:
|
|
|
return self._split_text(text, self._separators)
|
|
|
|
|
|
@classmethod
|
|
@@ -1300,7 +1297,7 @@ class RecursiveCharacterTextSplitter(TextSplitter):
|
|
|
return cls(separators=separators, is_separator_regex=True, **kwargs)
|
|
|
|
|
|
@staticmethod
|
|
|
- def get_separators_for_language(language: Language) -> List[str]:
|
|
|
+ def get_separators_for_language(language: Language) -> list[str]:
|
|
|
if language == Language.CPP:
|
|
|
return [
|
|
|
# Split along class definitions
|
|
@@ -1781,7 +1778,7 @@ class NLTKTextSplitter(TextSplitter):
|
|
|
self._separator = separator
|
|
|
self._language = language
|
|
|
|
|
|
- def split_text(self, text: str) -> List[str]:
|
|
|
+ def split_text(self, text: str) -> list[str]:
|
|
|
"""Split incoming text and return chunks."""
|
|
|
# First we naively split the large input into a bunch of smaller ones.
|
|
|
splits = self._tokenizer(text, language=self._language)
|
|
@@ -1812,7 +1809,7 @@ class SpacyTextSplitter(TextSplitter):
|
|
|
)
|
|
|
self._separator = separator
|
|
|
|
|
|
- def split_text(self, text: str) -> List[str]:
|
|
|
+ def split_text(self, text: str) -> list[str]:
|
|
|
"""Split incoming text and return chunks."""
|
|
|
splits = (s.text for s in self._tokenizer(text).sents)
|
|
|
return self._merge_splits(splits, self._separator)
|
|
@@ -1843,7 +1840,7 @@ class KonlpyTextSplitter(TextSplitter):
|
|
|
)
|
|
|
self.kkma = Kkma()
|
|
|
|
|
|
- def split_text(self, text: str) -> List[str]:
|
|
|
+ def split_text(self, text: str) -> list[str]:
|
|
|
"""Split incoming text and return chunks."""
|
|
|
splits = self.kkma.sentences(text)
|
|
|
return self._merge_splits(splits, self._separator)
|
|
@@ -1890,12 +1887,12 @@ class RecursiveJsonSplitter:
|
|
|
)
|
|
|
|
|
|
@staticmethod
|
|
|
- def _json_size(data: Dict) -> int:
|
|
|
+ def _json_size(data: dict) -> int:
|
|
|
"""Calculate the size of the serialized JSON object."""
|
|
|
return len(json.dumps(data))
|
|
|
|
|
|
@staticmethod
|
|
|
- def _set_nested_dict(d: Dict, path: List[str], value: Any) -> None:
|
|
|
+ def _set_nested_dict(d: dict, path: list[str], value: Any) -> None:
|
|
|
"""Set a value in a nested dictionary based on the given path."""
|
|
|
for key in path[:-1]:
|
|
|
d = d.setdefault(key, {})
|
|
@@ -1919,10 +1916,10 @@ class RecursiveJsonSplitter:
|
|
|
|
|
|
def _json_split(
|
|
|
self,
|
|
|
- data: Dict[str, Any],
|
|
|
- current_path: List[str] = [],
|
|
|
- chunks: List[Dict] = [{}],
|
|
|
- ) -> List[Dict]:
|
|
|
+ data: dict[str, Any],
|
|
|
+ current_path: list[str] = [],
|
|
|
+ chunks: list[dict] = [{}],
|
|
|
+ ) -> list[dict]:
|
|
|
"""
|
|
|
Split json into maximum size dictionaries while preserving structure.
|
|
|
"""
|
|
@@ -1950,9 +1947,9 @@ class RecursiveJsonSplitter:
|
|
|
|
|
|
def split_json(
|
|
|
self,
|
|
|
- json_data: Dict[str, Any],
|
|
|
+ json_data: dict[str, Any],
|
|
|
convert_lists: bool = False,
|
|
|
- ) -> List[Dict]:
|
|
|
+ ) -> list[dict]:
|
|
|
"""Splits JSON into a list of JSON chunks"""
|
|
|
|
|
|
if convert_lists:
|
|
@@ -1968,8 +1965,8 @@ class RecursiveJsonSplitter:
|
|
|
return chunks
|
|
|
|
|
|
def split_text(
|
|
|
- self, json_data: Dict[str, Any], convert_lists: bool = False
|
|
|
- ) -> List[str]:
|
|
|
+ self, json_data: dict[str, Any], convert_lists: bool = False
|
|
|
+ ) -> list[str]:
|
|
|
"""Splits JSON into a list of JSON formatted strings"""
|
|
|
|
|
|
chunks = self.split_json(
|
|
@@ -1981,11 +1978,11 @@ class RecursiveJsonSplitter:
|
|
|
|
|
|
def create_documents(
|
|
|
self,
|
|
|
- texts: List[Dict],
|
|
|
+ texts: list[dict],
|
|
|
convert_lists: bool = False,
|
|
|
- metadatas: Optional[List[dict]] = None,
|
|
|
- ) -> List[SplitterDocument]:
|
|
|
- """Create documents from a list of json objects (Dict)."""
|
|
|
+ metadatas: Optional[list[dict]] = None,
|
|
|
+ ) -> list[SplitterDocument]:
|
|
|
+ """Create documents from a list of json objects (dict)."""
|
|
|
_metadatas = metadatas or [{}] * len(texts)
|
|
|
documents = []
|
|
|
for i, text in enumerate(texts):
|