chunks.py 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310
  1. import copy
  2. import json
  3. import logging
  4. import math
  5. import time
  6. import uuid
  7. from typing import Any, Optional, TypedDict
  8. from uuid import UUID
  9. import numpy as np
  10. from core.base import (
  11. ChunkSearchResult,
  12. Handler,
  13. IndexArgsHNSW,
  14. IndexArgsIVFFlat,
  15. IndexMeasure,
  16. IndexMethod,
  17. R2RException,
  18. SearchSettings,
  19. VectorEntry,
  20. VectorQuantizationType,
  21. VectorTableName,
  22. )
  23. from core.base.utils import _decorate_vector_type
  24. from .base import PostgresConnectionManager
  25. from .filters import apply_filters
  26. from .utils import psql_quote_literal
  27. logger = logging.getLogger()
  28. def index_measure_to_ops(
  29. measure: IndexMeasure,
  30. quantization_type: VectorQuantizationType = VectorQuantizationType.FP32,
  31. ):
  32. return _decorate_vector_type(measure.ops, quantization_type)
  33. def quantize_vector_to_binary(
  34. vector: list[float] | np.ndarray,
  35. threshold: float = 0.0,
  36. ) -> bytes:
  37. """Quantizes a float vector to a binary vector string for PostgreSQL bit
  38. type. Used when quantization_type is INT1.
  39. Args:
  40. vector (List[float] | np.ndarray): Input vector of floats
  41. threshold (float, optional): Threshold for binarization. Defaults to 0.0.
  42. Returns:
  43. str: Binary string representation for PostgreSQL bit type
  44. """
  45. # Convert input to numpy array if it isn't already
  46. if not isinstance(vector, np.ndarray):
  47. vector = np.array(vector)
  48. # Convert to binary (1 where value > threshold, 0 otherwise)
  49. binary_vector = (vector > threshold).astype(int)
  50. # Convert to string of 1s and 0s
  51. # Convert to string of 1s and 0s, then to bytes
  52. binary_string = "".join(map(str, binary_vector))
  53. return binary_string.encode("ascii")
  54. class HybridSearchIntermediateResult(TypedDict):
  55. semantic_rank: int
  56. full_text_rank: int
  57. data: ChunkSearchResult
  58. rrf_score: float
  59. class PostgresChunksHandler(Handler):
  60. TABLE_NAME = VectorTableName.CHUNKS
  61. def __init__(
  62. self,
  63. project_name: str,
  64. connection_manager: PostgresConnectionManager,
  65. dimension: int | float,
  66. quantization_type: VectorQuantizationType,
  67. ):
  68. super().__init__(project_name, connection_manager)
  69. self.dimension = dimension
  70. self.quantization_type = quantization_type
  71. async def create_tables(self):
  72. # First check if table already exists and validate dimensions
  73. table_exists_query = """
  74. SELECT EXISTS (
  75. SELECT FROM pg_tables
  76. WHERE schemaname = $1
  77. AND tablename = $2
  78. );
  79. """
  80. table_name = VectorTableName.CHUNKS
  81. table_exists = await self.connection_manager.fetch_query(
  82. table_exists_query, (self.project_name, table_name)
  83. )
  84. if len(table_exists) > 0 and table_exists[0]["exists"]:
  85. # Table exists, check vector dimension
  86. vector_dim_query = """
  87. SELECT a.atttypmod as dimension
  88. FROM pg_attribute a
  89. JOIN pg_class c ON a.attrelid = c.oid
  90. JOIN pg_namespace n ON c.relnamespace = n.oid
  91. WHERE n.nspname = $1
  92. AND c.relname = $2
  93. AND a.attname = 'vec';
  94. """
  95. vector_dim_result = await self.connection_manager.fetch_query(
  96. vector_dim_query, (self.project_name, table_name)
  97. )
  98. if vector_dim_result and len(vector_dim_result) > 0:
  99. existing_dimension = vector_dim_result[0]["dimension"]
  100. # In pgvector, dimension is stored as atttypmod - 4
  101. if existing_dimension > 0: # If it has a specific dimension
  102. # Compare with provided dimension
  103. if (
  104. self.dimension > 0
  105. and existing_dimension != self.dimension
  106. ):
  107. raise ValueError(
  108. f"Dimension mismatch: Table '{self.project_name}.{table_name}' was created with "
  109. f"dimension {existing_dimension}, but {self.dimension} was provided. "
  110. f"You must use the same dimension for existing tables."
  111. )
  112. # Check for old table name
  113. check_query = """
  114. SELECT EXISTS (
  115. SELECT FROM pg_tables
  116. WHERE schemaname = $1
  117. AND tablename = $2
  118. );
  119. """
  120. old_table_exists = await self.connection_manager.fetch_query(
  121. check_query, (self.project_name, self.project_name)
  122. )
  123. if len(old_table_exists) > 0 and old_table_exists[0]["exists"]:
  124. raise ValueError(
  125. f"Found old vector table '{self.project_name}.{self.project_name}'. "
  126. "Please run `r2r db upgrade` with the CLI, or to run manually, "
  127. "run in R2R/py/migrations with 'alembic upgrade head' to update "
  128. "your database schema to the new version."
  129. )
  130. binary_col = (
  131. ""
  132. if self.quantization_type != VectorQuantizationType.INT1
  133. else f"vec_binary bit({self.dimension}),"
  134. )
  135. if self.dimension > 0:
  136. vector_col = f"vec vector({self.dimension})"
  137. else:
  138. vector_col = "vec vector"
  139. query = f"""
  140. CREATE TABLE IF NOT EXISTS {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} (
  141. id UUID PRIMARY KEY,
  142. document_id UUID,
  143. owner_id UUID,
  144. collection_ids UUID[],
  145. {vector_col},
  146. {binary_col}
  147. text TEXT,
  148. metadata JSONB,
  149. fts tsvector GENERATED ALWAYS AS (to_tsvector('english', text)) STORED
  150. );
  151. CREATE INDEX IF NOT EXISTS idx_vectors_document_id ON {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} (document_id);
  152. CREATE INDEX IF NOT EXISTS idx_vectors_owner_id ON {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} (owner_id);
  153. CREATE INDEX IF NOT EXISTS idx_vectors_collection_ids ON {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} USING GIN (collection_ids);
  154. CREATE INDEX IF NOT EXISTS idx_vectors_text ON {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} USING GIN (to_tsvector('english', text));
  155. """
  156. await self.connection_manager.execute_query(query)
  157. async def upsert(self, entry: VectorEntry) -> None:
  158. """Upsert function that handles vector quantization only when
  159. quantization_type is INT1.
  160. Matches the table schema where vec_binary column only exists for INT1
  161. quantization.
  162. """
  163. # Check the quantization type to determine which columns to use
  164. if self.quantization_type == VectorQuantizationType.INT1:
  165. bit_dim = (
  166. "" if math.isnan(self.dimension) else f"({self.dimension})"
  167. )
  168. # For quantized vectors, use vec_binary column
  169. query = f"""
  170. INSERT INTO {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  171. (id, document_id, owner_id, collection_ids, vec, vec_binary, text, metadata)
  172. VALUES ($1, $2, $3, $4, $5, $6::bit({bit_dim}), $7, $8)
  173. ON CONFLICT (id) DO UPDATE SET
  174. document_id = EXCLUDED.document_id,
  175. owner_id = EXCLUDED.owner_id,
  176. collection_ids = EXCLUDED.collection_ids,
  177. vec = EXCLUDED.vec,
  178. vec_binary = EXCLUDED.vec_binary,
  179. text = EXCLUDED.text,
  180. metadata = EXCLUDED.metadata;
  181. """
  182. await self.connection_manager.execute_query(
  183. query,
  184. (
  185. entry.id,
  186. entry.document_id,
  187. entry.owner_id,
  188. entry.collection_ids,
  189. str(entry.vector.data),
  190. quantize_vector_to_binary(
  191. entry.vector.data
  192. ), # Convert to binary
  193. entry.text,
  194. json.dumps(entry.metadata),
  195. ),
  196. )
  197. else:
  198. # For regular vectors, use vec column only
  199. query = f"""
  200. INSERT INTO {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  201. (id, document_id, owner_id, collection_ids, vec, text, metadata)
  202. VALUES ($1, $2, $3, $4, $5, $6, $7)
  203. ON CONFLICT (id) DO UPDATE SET
  204. document_id = EXCLUDED.document_id,
  205. owner_id = EXCLUDED.owner_id,
  206. collection_ids = EXCLUDED.collection_ids,
  207. vec = EXCLUDED.vec,
  208. text = EXCLUDED.text,
  209. metadata = EXCLUDED.metadata;
  210. """
  211. await self.connection_manager.execute_query(
  212. query,
  213. (
  214. entry.id,
  215. entry.document_id,
  216. entry.owner_id,
  217. entry.collection_ids,
  218. str(entry.vector.data),
  219. entry.text,
  220. json.dumps(entry.metadata),
  221. ),
  222. )
  223. async def upsert_entries(self, entries: list[VectorEntry]) -> None:
  224. """Batch upsert function that handles vector quantization only when
  225. quantization_type is INT1.
  226. Matches the table schema where vec_binary column only exists for INT1
  227. quantization.
  228. """
  229. if self.quantization_type == VectorQuantizationType.INT1:
  230. bit_dim = (
  231. "" if math.isnan(self.dimension) else f"({self.dimension})"
  232. )
  233. # For quantized vectors, use vec_binary column
  234. query = f"""
  235. INSERT INTO {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  236. (id, document_id, owner_id, collection_ids, vec, vec_binary, text, metadata)
  237. VALUES ($1, $2, $3, $4, $5, $6::bit({bit_dim}), $7, $8)
  238. ON CONFLICT (id) DO UPDATE SET
  239. document_id = EXCLUDED.document_id,
  240. owner_id = EXCLUDED.owner_id,
  241. collection_ids = EXCLUDED.collection_ids,
  242. vec = EXCLUDED.vec,
  243. vec_binary = EXCLUDED.vec_binary,
  244. text = EXCLUDED.text,
  245. metadata = EXCLUDED.metadata;
  246. """
  247. bin_params = [
  248. (
  249. entry.id,
  250. entry.document_id,
  251. entry.owner_id,
  252. entry.collection_ids,
  253. str(entry.vector.data),
  254. quantize_vector_to_binary(
  255. entry.vector.data
  256. ), # Convert to binary
  257. entry.text,
  258. json.dumps(entry.metadata),
  259. )
  260. for entry in entries
  261. ]
  262. await self.connection_manager.execute_many(query, bin_params)
  263. else:
  264. # For regular vectors, use vec column only
  265. query = f"""
  266. INSERT INTO {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  267. (id, document_id, owner_id, collection_ids, vec, text, metadata)
  268. VALUES ($1, $2, $3, $4, $5, $6, $7)
  269. ON CONFLICT (id) DO UPDATE SET
  270. document_id = EXCLUDED.document_id,
  271. owner_id = EXCLUDED.owner_id,
  272. collection_ids = EXCLUDED.collection_ids,
  273. vec = EXCLUDED.vec,
  274. text = EXCLUDED.text,
  275. metadata = EXCLUDED.metadata;
  276. """
  277. params = [
  278. (
  279. entry.id,
  280. entry.document_id,
  281. entry.owner_id,
  282. entry.collection_ids,
  283. str(entry.vector.data),
  284. entry.text,
  285. json.dumps(entry.metadata),
  286. )
  287. for entry in entries
  288. ]
  289. await self.connection_manager.execute_many(query, params)
  290. async def semantic_search(
  291. self, query_vector: list[float], search_settings: SearchSettings
  292. ) -> list[ChunkSearchResult]:
  293. try:
  294. imeasure_obj = IndexMeasure(
  295. search_settings.chunk_settings.index_measure
  296. )
  297. except ValueError:
  298. raise ValueError("Invalid index measure") from None
  299. table_name = self._get_table_name(PostgresChunksHandler.TABLE_NAME)
  300. cols = [
  301. f"{table_name}.id",
  302. f"{table_name}.document_id",
  303. f"{table_name}.owner_id",
  304. f"{table_name}.collection_ids",
  305. f"{table_name}.text",
  306. ]
  307. params: list[str | int | bytes] = []
  308. # For binary vectors (INT1), implement two-stage search
  309. if self.quantization_type == VectorQuantizationType.INT1:
  310. # Convert query vector to binary format
  311. binary_query = quantize_vector_to_binary(query_vector)
  312. # TODO - Put depth multiplier in config / settings
  313. extended_limit = (
  314. search_settings.limit * 20
  315. ) # Get 20x candidates for re-ranking
  316. if (
  317. imeasure_obj == IndexMeasure.hamming_distance
  318. or imeasure_obj == IndexMeasure.jaccard_distance
  319. ):
  320. binary_search_measure_repr = imeasure_obj.pgvector_repr
  321. else:
  322. binary_search_measure_repr = (
  323. IndexMeasure.hamming_distance.pgvector_repr
  324. )
  325. # Use binary column and binary-specific distance measures for first stage
  326. bit_dim = (
  327. "" if math.isnan(self.dimension) else f"({self.dimension})"
  328. )
  329. stage1_distance = f"{table_name}.vec_binary {binary_search_measure_repr} $1::bit{bit_dim}"
  330. stage1_param = binary_query
  331. cols.append(
  332. f"{table_name}.vec"
  333. ) # Need original vector for re-ranking
  334. if search_settings.include_metadatas:
  335. cols.append(f"{table_name}.metadata")
  336. select_clause = ", ".join(cols)
  337. where_clause = ""
  338. params.append(stage1_param)
  339. if search_settings.filters:
  340. where_clause, params = apply_filters(
  341. search_settings.filters, params, mode="where_clause"
  342. )
  343. vector_dim = (
  344. "" if math.isnan(self.dimension) else f"({self.dimension})"
  345. )
  346. # First stage: Get candidates using binary search
  347. query = f"""
  348. WITH candidates AS (
  349. SELECT {select_clause},
  350. ({stage1_distance}) as binary_distance
  351. FROM {table_name}
  352. {where_clause}
  353. ORDER BY {stage1_distance}
  354. LIMIT ${len(params) + 1}
  355. OFFSET ${len(params) + 2}
  356. )
  357. -- Second stage: Re-rank using original vectors
  358. SELECT
  359. id,
  360. document_id,
  361. owner_id,
  362. collection_ids,
  363. text,
  364. {"metadata," if search_settings.include_metadatas else ""}
  365. (vec <=> ${len(params) + 4}::vector{vector_dim}) as distance
  366. FROM candidates
  367. ORDER BY distance
  368. LIMIT ${len(params) + 3}
  369. """
  370. params.extend(
  371. [
  372. extended_limit, # First stage limit
  373. search_settings.offset,
  374. search_settings.limit, # Final limit
  375. str(query_vector), # For re-ranking
  376. ]
  377. )
  378. else:
  379. # Standard float vector handling
  380. vector_dim = (
  381. "" if math.isnan(self.dimension) else f"({self.dimension})"
  382. )
  383. distance_calc = f"{table_name}.vec {search_settings.chunk_settings.index_measure.pgvector_repr} $1::vector{vector_dim}"
  384. query_param = str(query_vector)
  385. if search_settings.include_scores:
  386. cols.append(f"({distance_calc}) AS distance")
  387. if search_settings.include_metadatas:
  388. cols.append(f"{table_name}.metadata")
  389. select_clause = ", ".join(cols)
  390. where_clause = ""
  391. params.append(query_param)
  392. if search_settings.filters:
  393. where_clause, new_params = apply_filters(
  394. search_settings.filters,
  395. params,
  396. mode="where_clause", # Get just conditions without WHERE
  397. )
  398. params = new_params
  399. query = f"""
  400. SELECT {select_clause}
  401. FROM {table_name}
  402. {where_clause}
  403. ORDER BY {distance_calc}
  404. LIMIT ${len(params) + 1}
  405. OFFSET ${len(params) + 2}
  406. """
  407. params.extend([search_settings.limit, search_settings.offset])
  408. results = await self.connection_manager.fetch_query(query, params)
  409. return [
  410. ChunkSearchResult(
  411. id=UUID(str(result["id"])),
  412. document_id=UUID(str(result["document_id"])),
  413. owner_id=UUID(str(result["owner_id"])),
  414. collection_ids=result["collection_ids"],
  415. text=result["text"],
  416. score=(
  417. (1 - float(result["distance"]))
  418. if "distance" in result
  419. else -1
  420. ),
  421. metadata=(
  422. json.loads(result["metadata"])
  423. if search_settings.include_metadatas
  424. else {}
  425. ),
  426. )
  427. for result in results
  428. ]
  429. async def full_text_search(
  430. self, query_text: str, search_settings: SearchSettings
  431. ) -> list[ChunkSearchResult]:
  432. conditions = []
  433. params: list[str | int | bytes] = [query_text]
  434. conditions.append("fts @@ websearch_to_tsquery('english', $1)")
  435. if search_settings.filters:
  436. filter_condition, params = apply_filters(
  437. search_settings.filters, params, mode="condition_only"
  438. )
  439. if filter_condition:
  440. conditions.append(filter_condition)
  441. where_clause = "WHERE " + " AND ".join(conditions)
  442. query = f"""
  443. SELECT
  444. id,
  445. document_id,
  446. owner_id,
  447. collection_ids,
  448. text,
  449. metadata,
  450. ts_rank(fts, websearch_to_tsquery('english', $1), 32) as rank
  451. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  452. {where_clause}
  453. ORDER BY rank DESC
  454. OFFSET ${len(params) + 1}
  455. LIMIT ${len(params) + 2}
  456. """
  457. params.extend(
  458. [
  459. search_settings.offset,
  460. search_settings.hybrid_settings.full_text_limit,
  461. ]
  462. )
  463. results = await self.connection_manager.fetch_query(query, params)
  464. return [
  465. ChunkSearchResult(
  466. id=UUID(str(r["id"])),
  467. document_id=UUID(str(r["document_id"])),
  468. owner_id=UUID(str(r["owner_id"])),
  469. collection_ids=r["collection_ids"],
  470. text=r["text"],
  471. score=float(r["rank"]),
  472. metadata=json.loads(r["metadata"]),
  473. )
  474. for r in results
  475. ]
  476. async def hybrid_search(
  477. self,
  478. query_text: str,
  479. query_vector: list[float],
  480. search_settings: SearchSettings,
  481. *args,
  482. **kwargs,
  483. ) -> list[ChunkSearchResult]:
  484. if search_settings.hybrid_settings is None:
  485. raise ValueError(
  486. "Please provide a valid `hybrid_settings` in the `search_settings`."
  487. )
  488. if (
  489. search_settings.hybrid_settings.full_text_limit
  490. < search_settings.limit
  491. ):
  492. raise ValueError(
  493. "The `full_text_limit` must be greater than or equal to the `limit`."
  494. )
  495. semantic_settings = copy.deepcopy(search_settings)
  496. semantic_settings.limit += search_settings.offset
  497. full_text_settings = copy.deepcopy(search_settings)
  498. full_text_settings.hybrid_settings.full_text_limit += (
  499. search_settings.offset
  500. )
  501. semantic_results: list[ChunkSearchResult] = await self.semantic_search(
  502. query_vector, semantic_settings
  503. )
  504. full_text_results: list[
  505. ChunkSearchResult
  506. ] = await self.full_text_search(query_text, full_text_settings)
  507. semantic_limit = search_settings.limit
  508. full_text_limit = search_settings.hybrid_settings.full_text_limit
  509. semantic_weight = search_settings.hybrid_settings.semantic_weight
  510. full_text_weight = search_settings.hybrid_settings.full_text_weight
  511. rrf_k = search_settings.hybrid_settings.rrf_k
  512. combined_results: dict[uuid.UUID, HybridSearchIntermediateResult] = {}
  513. for rank, result in enumerate(semantic_results, 1):
  514. combined_results[result.id] = {
  515. "semantic_rank": rank,
  516. "full_text_rank": full_text_limit,
  517. "data": result,
  518. "rrf_score": 0.0, # Initialize with 0, will be calculated later
  519. }
  520. for rank, result in enumerate(full_text_results, 1):
  521. if result.id in combined_results:
  522. combined_results[result.id]["full_text_rank"] = rank
  523. else:
  524. combined_results[result.id] = {
  525. "semantic_rank": semantic_limit,
  526. "full_text_rank": rank,
  527. "data": result,
  528. "rrf_score": 0.0, # Initialize with 0, will be calculated later
  529. }
  530. combined_results = {
  531. k: v
  532. for k, v in combined_results.items()
  533. if v["semantic_rank"] <= semantic_limit * 2
  534. and v["full_text_rank"] <= full_text_limit * 2
  535. }
  536. for hyb_result in combined_results.values():
  537. semantic_score = 1 / (rrf_k + hyb_result["semantic_rank"])
  538. full_text_score = 1 / (rrf_k + hyb_result["full_text_rank"])
  539. hyb_result["rrf_score"] = (
  540. semantic_score * semantic_weight
  541. + full_text_score * full_text_weight
  542. ) / (semantic_weight + full_text_weight)
  543. sorted_results = sorted(
  544. combined_results.values(),
  545. key=lambda x: x["rrf_score"],
  546. reverse=True,
  547. )
  548. offset_results = sorted_results[
  549. search_settings.offset : search_settings.offset
  550. + search_settings.limit
  551. ]
  552. return [
  553. ChunkSearchResult(
  554. id=result["data"].id,
  555. document_id=result["data"].document_id,
  556. owner_id=result["data"].owner_id,
  557. collection_ids=result["data"].collection_ids,
  558. text=result["data"].text,
  559. score=result["rrf_score"],
  560. metadata={
  561. **result["data"].metadata,
  562. "semantic_rank": result["semantic_rank"],
  563. "full_text_rank": result["full_text_rank"],
  564. },
  565. )
  566. for result in offset_results
  567. ]
  568. async def delete(
  569. self, filters: dict[str, Any]
  570. ) -> dict[str, dict[str, str]]:
  571. params: list[str | int | bytes] = []
  572. where_clause, params = apply_filters(
  573. filters, params, mode="condition_only"
  574. )
  575. query = f"""
  576. DELETE FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  577. WHERE {where_clause}
  578. RETURNING id, document_id, text;
  579. """
  580. results = await self.connection_manager.fetch_query(query, params)
  581. return {
  582. str(result["id"]): {
  583. "status": "deleted",
  584. "id": str(result["id"]),
  585. "document_id": str(result["document_id"]),
  586. "text": result["text"],
  587. }
  588. for result in results
  589. }
  590. async def assign_document_chunks_to_collection(
  591. self, document_id: UUID, collection_id: UUID
  592. ) -> None:
  593. query = f"""
  594. UPDATE {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  595. SET collection_ids = array_append(collection_ids, $1)
  596. WHERE document_id = $2 AND NOT ($1 = ANY(collection_ids));
  597. """
  598. logger.info(query)
  599. return await self.connection_manager.execute_query(
  600. query, (str(collection_id), str(document_id))
  601. )
  602. async def remove_document_from_collection_vector(
  603. self, document_id: UUID, collection_id: UUID
  604. ) -> None:
  605. query = f"""
  606. UPDATE {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  607. SET collection_ids = array_remove(collection_ids, $1)
  608. WHERE document_id = $2;
  609. """
  610. await self.connection_manager.execute_query(
  611. query, (collection_id, document_id)
  612. )
  613. async def delete_user_vector(self, owner_id: UUID) -> None:
  614. query = f"""
  615. DELETE FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  616. WHERE owner_id = $1;
  617. """
  618. await self.connection_manager.execute_query(query, (owner_id,))
  619. async def delete_collection_vector(self, collection_id: UUID) -> None:
  620. query = f"""
  621. DELETE FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  622. WHERE $1 = ANY(collection_ids)
  623. RETURNING collection_ids
  624. """
  625. await self.connection_manager.fetchrow_query(query, (collection_id,))
  626. return None
  627. async def list_document_chunks(
  628. self,
  629. document_id: UUID,
  630. offset: int,
  631. limit: int,
  632. include_vectors: bool = False,
  633. ) -> dict[str, Any]:
  634. vector_select = ", vec" if include_vectors else ""
  635. limit_clause = f"LIMIT {limit}" if limit > -1 else ""
  636. query = f"""
  637. SELECT id, document_id, owner_id, collection_ids, text, metadata{vector_select}, COUNT(*) OVER() AS total
  638. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  639. WHERE document_id = $1
  640. ORDER BY (metadata->>'chunk_order')::integer
  641. OFFSET $2
  642. {limit_clause};
  643. """
  644. params = [document_id, offset]
  645. results = await self.connection_manager.fetch_query(query, params)
  646. chunks = []
  647. total = 0
  648. if results:
  649. total = results[0].get("total", 0)
  650. chunks = [
  651. {
  652. "id": result["id"],
  653. "document_id": result["document_id"],
  654. "owner_id": result["owner_id"],
  655. "collection_ids": result["collection_ids"],
  656. "text": result["text"],
  657. "metadata": json.loads(result["metadata"]),
  658. "vector": (
  659. json.loads(result["vec"]) if include_vectors else None
  660. ),
  661. }
  662. for result in results
  663. ]
  664. return {"results": chunks, "total_entries": total}
  665. async def get_chunk(self, id: UUID) -> dict:
  666. query = f"""
  667. SELECT id, document_id, owner_id, collection_ids, text, metadata
  668. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  669. WHERE id = $1;
  670. """
  671. result = await self.connection_manager.fetchrow_query(query, (id,))
  672. if result:
  673. return {
  674. "id": result["id"],
  675. "document_id": result["document_id"],
  676. "owner_id": result["owner_id"],
  677. "collection_ids": result["collection_ids"],
  678. "text": result["text"],
  679. "metadata": json.loads(result["metadata"]),
  680. }
  681. raise R2RException(
  682. message=f"Chunk with ID {id} not found", status_code=404
  683. )
  684. async def create_index(
  685. self,
  686. table_name: Optional[VectorTableName] = None,
  687. index_measure: IndexMeasure = IndexMeasure.cosine_distance,
  688. index_method: IndexMethod = IndexMethod.auto,
  689. index_arguments: Optional[IndexArgsIVFFlat | IndexArgsHNSW] = None,
  690. index_name: Optional[str] = None,
  691. index_column: Optional[str] = None,
  692. concurrently: bool = True,
  693. ) -> None:
  694. """Creates an index for the collection.
  695. Note:
  696. When `vecs` creates an index on a pgvector column in PostgreSQL, it uses a multi-step
  697. process that enables performant indexes to be built for large collections with low end
  698. database hardware.
  699. Those steps are:
  700. - Creates a new table with a different name
  701. - Randomly selects records from the existing table
  702. - Inserts the random records from the existing table into the new table
  703. - Creates the requested vector index on the new table
  704. - Upserts all data from the existing table into the new table
  705. - Drops the existing table
  706. - Renames the new table to the existing tables name
  707. If you create dependencies (like views) on the table that underpins
  708. a `vecs.Collection` the `create_index` step may require you to drop those dependencies before
  709. it will succeed.
  710. Args:
  711. index_measure (IndexMeasure, optional): The measure to index for. Defaults to 'cosine_distance'.
  712. index_method (IndexMethod, optional): The indexing method to use. Defaults to 'auto'.
  713. index_arguments: (IndexArgsIVFFlat | IndexArgsHNSW, optional): Index type specific arguments
  714. index_name (str, optional): The name of the index to create. Defaults to None.
  715. concurrently (bool, optional): Whether to create the index concurrently. Defaults to True.
  716. Raises:
  717. ValueError: If an invalid index method is used, or if *replace* is False and an index already exists.
  718. """
  719. if table_name == VectorTableName.CHUNKS:
  720. table_name_str = f"{self.project_name}.{VectorTableName.CHUNKS}" # TODO - Fix bug in vector table naming convention
  721. if index_column:
  722. col_name = index_column
  723. else:
  724. col_name = (
  725. "vec"
  726. if (
  727. index_measure != IndexMeasure.hamming_distance
  728. and index_measure != IndexMeasure.jaccard_distance
  729. )
  730. else "vec_binary"
  731. )
  732. elif table_name == VectorTableName.ENTITIES_DOCUMENT:
  733. table_name_str = (
  734. f"{self.project_name}.{VectorTableName.ENTITIES_DOCUMENT}"
  735. )
  736. col_name = "description_embedding"
  737. elif table_name == VectorTableName.GRAPHS_ENTITIES:
  738. table_name_str = (
  739. f"{self.project_name}.{VectorTableName.GRAPHS_ENTITIES}"
  740. )
  741. col_name = "description_embedding"
  742. elif table_name == VectorTableName.COMMUNITIES:
  743. table_name_str = (
  744. f"{self.project_name}.{VectorTableName.COMMUNITIES}"
  745. )
  746. col_name = "embedding"
  747. else:
  748. raise ValueError("invalid table name")
  749. if index_method not in (
  750. IndexMethod.ivfflat,
  751. IndexMethod.hnsw,
  752. IndexMethod.auto,
  753. ):
  754. raise ValueError("invalid index method")
  755. if index_arguments:
  756. # Disallow case where user submits index arguments but uses the
  757. # IndexMethod.auto index (index build arguments should only be
  758. # used with a specific index)
  759. if index_method == IndexMethod.auto:
  760. raise ValueError(
  761. "Index build parameters are not allowed when using the IndexMethod.auto index."
  762. )
  763. # Disallow case where user specifies one index type but submits
  764. # index build arguments for the other index type
  765. if (
  766. isinstance(index_arguments, IndexArgsHNSW)
  767. and index_method != IndexMethod.hnsw
  768. ) or (
  769. isinstance(index_arguments, IndexArgsIVFFlat)
  770. and index_method != IndexMethod.ivfflat
  771. ):
  772. raise ValueError(
  773. f"{index_arguments.__class__.__name__} build parameters were supplied but {index_method} index was specified."
  774. )
  775. if index_method == IndexMethod.auto:
  776. index_method = IndexMethod.hnsw
  777. ops = index_measure_to_ops(
  778. index_measure # , quantization_type=self.quantization_type
  779. )
  780. if ops is None:
  781. raise ValueError("Unknown index measure")
  782. concurrently_sql = "CONCURRENTLY" if concurrently else ""
  783. index_name = (
  784. index_name
  785. or f"ix_{ops}_{index_method}__{col_name}_{time.strftime('%Y%m%d%H%M%S')}"
  786. )
  787. create_index_sql = f"""
  788. CREATE INDEX {concurrently_sql} {index_name}
  789. ON {table_name_str}
  790. USING {index_method} ({col_name} {ops}) {self._get_index_options(index_method, index_arguments)};
  791. """
  792. try:
  793. if concurrently:
  794. async with (
  795. self.connection_manager.pool.get_connection() as conn # type: ignore
  796. ):
  797. # Disable automatic transaction management
  798. await conn.execute(
  799. "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED"
  800. )
  801. await conn.execute(create_index_sql)
  802. else:
  803. # Non-concurrent index creation can use normal query execution
  804. await self.connection_manager.execute_query(create_index_sql)
  805. except Exception as e:
  806. raise Exception(f"Failed to create index: {e}") from e
  807. return None
  808. async def list_indices(
  809. self,
  810. offset: int,
  811. limit: int,
  812. filters: Optional[dict[str, Any]] = None,
  813. ) -> dict:
  814. where_clauses = []
  815. params: list[Any] = [self.project_name] # Start with schema name
  816. param_count = 1
  817. # Handle filtering
  818. if filters:
  819. if "table_name" in filters:
  820. where_clauses.append(f"i.tablename = ${param_count + 1}")
  821. params.append(filters["table_name"])
  822. param_count += 1
  823. if "index_method" in filters:
  824. where_clauses.append(f"am.amname = ${param_count + 1}")
  825. params.append(filters["index_method"])
  826. param_count += 1
  827. if "index_name" in filters:
  828. where_clauses.append(
  829. f"LOWER(i.indexname) LIKE LOWER(${param_count + 1})"
  830. )
  831. params.append(f"%{filters['index_name']}%")
  832. param_count += 1
  833. where_clause = " AND ".join(where_clauses) if where_clauses else ""
  834. if where_clause:
  835. where_clause = f"AND {where_clause}"
  836. query = f"""
  837. WITH index_info AS (
  838. SELECT
  839. i.indexname as name,
  840. i.tablename as table_name,
  841. i.indexdef as definition,
  842. am.amname as method,
  843. pg_relation_size(c.oid) as size_in_bytes,
  844. c.reltuples::bigint as row_estimate,
  845. COALESCE(psat.idx_scan, 0) as number_of_scans,
  846. COALESCE(psat.idx_tup_read, 0) as tuples_read,
  847. COALESCE(psat.idx_tup_fetch, 0) as tuples_fetched,
  848. COUNT(*) OVER() as total_count
  849. FROM pg_indexes i
  850. JOIN pg_class c ON c.relname = i.indexname
  851. JOIN pg_am am ON c.relam = am.oid
  852. LEFT JOIN pg_stat_user_indexes psat ON psat.indexrelname = i.indexname
  853. AND psat.schemaname = i.schemaname
  854. WHERE i.schemaname = $1
  855. AND i.indexdef LIKE '%vector%'
  856. {where_clause}
  857. )
  858. SELECT *
  859. FROM index_info
  860. ORDER BY name
  861. LIMIT ${param_count + 1}
  862. OFFSET ${param_count + 2}
  863. """
  864. # Add limit and offset to params
  865. params.extend([limit, offset])
  866. results = await self.connection_manager.fetch_query(query, params)
  867. indices = []
  868. total_entries = 0
  869. if results:
  870. total_entries = results[0]["total_count"]
  871. for result in results:
  872. index_info = {
  873. "name": result["name"],
  874. "table_name": result["table_name"],
  875. "definition": result["definition"],
  876. "size_in_bytes": result["size_in_bytes"],
  877. "row_estimate": result["row_estimate"],
  878. "number_of_scans": result["number_of_scans"],
  879. "tuples_read": result["tuples_read"],
  880. "tuples_fetched": result["tuples_fetched"],
  881. }
  882. indices.append(index_info)
  883. return {"indices": indices, "total_entries": total_entries}
  884. async def delete_index(
  885. self,
  886. index_name: str,
  887. table_name: Optional[VectorTableName] = None,
  888. concurrently: bool = True,
  889. ) -> None:
  890. """Deletes a vector index.
  891. Args:
  892. index_name (str): Name of the index to delete
  893. table_name (VectorTableName, optional): Table the index belongs to
  894. concurrently (bool): Whether to drop the index concurrently
  895. Raises:
  896. ValueError: If table name is invalid or index doesn't exist
  897. Exception: If index deletion fails
  898. """
  899. # Validate table name and get column name
  900. if table_name == VectorTableName.CHUNKS:
  901. table_name_str = f"{self.project_name}.{VectorTableName.CHUNKS}"
  902. col_name = "vec"
  903. elif table_name == VectorTableName.ENTITIES_DOCUMENT:
  904. table_name_str = (
  905. f"{self.project_name}.{VectorTableName.ENTITIES_DOCUMENT}"
  906. )
  907. col_name = "description_embedding"
  908. elif table_name == VectorTableName.GRAPHS_ENTITIES:
  909. table_name_str = (
  910. f"{self.project_name}.{VectorTableName.GRAPHS_ENTITIES}"
  911. )
  912. col_name = "description_embedding"
  913. elif table_name == VectorTableName.COMMUNITIES:
  914. table_name_str = (
  915. f"{self.project_name}.{VectorTableName.COMMUNITIES}"
  916. )
  917. col_name = "description_embedding"
  918. else:
  919. raise ValueError("invalid table name")
  920. # Extract schema and base table name
  921. schema_name, base_table_name = table_name_str.split(".")
  922. # Verify index exists and is a vector index
  923. query = """
  924. SELECT indexdef
  925. FROM pg_indexes
  926. WHERE indexname = $1
  927. AND schemaname = $2
  928. AND tablename = $3
  929. AND indexdef LIKE $4
  930. """
  931. result = await self.connection_manager.fetchrow_query(
  932. query, (index_name, schema_name, base_table_name, f"%({col_name}%")
  933. )
  934. if not result:
  935. raise ValueError(
  936. f"Vector index '{index_name}' does not exist on table {table_name_str}"
  937. )
  938. # Drop the index
  939. concurrently_sql = "CONCURRENTLY" if concurrently else ""
  940. drop_query = (
  941. f"DROP INDEX {concurrently_sql} {schema_name}.{index_name}"
  942. )
  943. try:
  944. if concurrently:
  945. async with (
  946. self.connection_manager.pool.get_connection() as conn # type: ignore
  947. ):
  948. # Disable automatic transaction management
  949. await conn.execute(
  950. "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED"
  951. )
  952. await conn.execute(drop_query)
  953. else:
  954. await self.connection_manager.execute_query(drop_query)
  955. except Exception as e:
  956. raise Exception(f"Failed to delete index: {e}") from e
  957. async def list_chunks(
  958. self,
  959. offset: int,
  960. limit: int,
  961. filters: Optional[dict[str, Any]] = None,
  962. include_vectors: bool = False,
  963. ) -> dict[str, Any]:
  964. """List chunks with pagination support.
  965. Args:
  966. offset (int, optional): Number of records to skip. Defaults to 0.
  967. limit (int, optional): Maximum number of records to return. Defaults to 10.
  968. filters (dict, optional): Dictionary of filters to apply. Defaults to None.
  969. include_vectors (bool, optional): Whether to include vector data. Defaults to False.
  970. Returns:
  971. dict: Dictionary containing:
  972. - results: List of chunk records
  973. - total_entries: Total number of chunks matching the filters
  974. """
  975. vector_select = ", vec" if include_vectors else ""
  976. select_clause = f"""
  977. id, document_id, owner_id, collection_ids,
  978. text, metadata{vector_select}, COUNT(*) OVER() AS total_entries
  979. """
  980. params: list[str | int | bytes] = []
  981. where_clause = ""
  982. if filters:
  983. where_clause, params = apply_filters(
  984. filters, params, mode="where_clause"
  985. )
  986. query = f"""
  987. SELECT {select_clause}
  988. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  989. {where_clause}
  990. LIMIT ${len(params) + 1}
  991. OFFSET ${len(params) + 2}
  992. """
  993. params.extend([limit, offset])
  994. logger.info(query)
  995. logger.info(params)
  996. # Execute the query
  997. results = await self.connection_manager.fetch_query(query, params)
  998. # Process results
  999. chunks = []
  1000. total_entries = 0
  1001. if results:
  1002. total_entries = results[0].get("total_entries", 0)
  1003. chunks = [
  1004. {
  1005. "id": str(result["id"]),
  1006. "document_id": str(result["document_id"]),
  1007. "owner_id": str(result["owner_id"]),
  1008. "collection_ids": result["collection_ids"],
  1009. "text": result["text"],
  1010. "metadata": json.loads(result["metadata"]),
  1011. "vector": (
  1012. json.loads(result["vec"]) if include_vectors else None
  1013. ),
  1014. }
  1015. for result in results
  1016. ]
  1017. return {"results": chunks, "total_entries": total_entries}
  1018. async def search_documents(
  1019. self,
  1020. query_text: str,
  1021. settings: SearchSettings,
  1022. ) -> list[dict[str, Any]]:
  1023. """Search for documents based on their metadata fields and/or body
  1024. text. Joins with documents table to get complete document metadata.
  1025. Args:
  1026. query_text (str): The search query text
  1027. settings (SearchSettings): Search settings including search preferences and filters
  1028. Returns:
  1029. list[dict[str, Any]]: List of documents with their search scores and complete metadata
  1030. """
  1031. where_clauses = []
  1032. params: list[str | int | bytes] = [query_text]
  1033. search_over_body = getattr(settings, "search_over_body", True)
  1034. search_over_metadata = getattr(settings, "search_over_metadata", True)
  1035. metadata_weight = getattr(settings, "metadata_weight", 3.0)
  1036. title_weight = getattr(settings, "title_weight", 1.0)
  1037. metadata_keys = getattr(
  1038. settings, "metadata_keys", ["title", "description"]
  1039. )
  1040. # Build the dynamic metadata field search expression
  1041. metadata_fields_expr = " || ' ' || ".join(
  1042. [
  1043. f"COALESCE(v.metadata->>{psql_quote_literal(key)}, '')"
  1044. for key in metadata_keys # type: ignore
  1045. ]
  1046. )
  1047. query = f"""
  1048. WITH
  1049. -- Metadata search scores
  1050. metadata_scores AS (
  1051. SELECT DISTINCT ON (v.document_id)
  1052. v.document_id,
  1053. d.metadata as doc_metadata,
  1054. CASE WHEN $1 = '' THEN 0.0
  1055. ELSE
  1056. ts_rank_cd(
  1057. setweight(to_tsvector('english', {metadata_fields_expr}), 'A'),
  1058. websearch_to_tsquery('english', $1),
  1059. 32
  1060. )
  1061. END as metadata_rank
  1062. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} v
  1063. LEFT JOIN {self._get_table_name("documents")} d ON v.document_id = d.id
  1064. WHERE v.metadata IS NOT NULL
  1065. ),
  1066. -- Body search scores
  1067. body_scores AS (
  1068. SELECT
  1069. document_id,
  1070. AVG(
  1071. ts_rank_cd(
  1072. setweight(to_tsvector('english', COALESCE(text, '')), 'B'),
  1073. websearch_to_tsquery('english', $1),
  1074. 32
  1075. )
  1076. ) as body_rank
  1077. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  1078. WHERE $1 != ''
  1079. {"AND to_tsvector('english', text) @@ websearch_to_tsquery('english', $1)" if search_over_body else ""}
  1080. GROUP BY document_id
  1081. ),
  1082. -- Combined scores with document metadata
  1083. combined_scores AS (
  1084. SELECT
  1085. COALESCE(m.document_id, b.document_id) as document_id,
  1086. m.doc_metadata as metadata,
  1087. COALESCE(m.metadata_rank, 0) as debug_metadata_rank,
  1088. COALESCE(b.body_rank, 0) as debug_body_rank,
  1089. CASE
  1090. WHEN {str(search_over_metadata).lower()} AND {str(search_over_body).lower()} THEN
  1091. COALESCE(m.metadata_rank, 0) * {metadata_weight} + COALESCE(b.body_rank, 0) * {title_weight}
  1092. WHEN {str(search_over_metadata).lower()} THEN
  1093. COALESCE(m.metadata_rank, 0)
  1094. WHEN {str(search_over_body).lower()} THEN
  1095. COALESCE(b.body_rank, 0)
  1096. ELSE 0
  1097. END as rank
  1098. FROM metadata_scores m
  1099. FULL OUTER JOIN body_scores b ON m.document_id = b.document_id
  1100. WHERE (
  1101. ($1 = '') OR
  1102. ({str(search_over_metadata).lower()} AND m.metadata_rank > 0) OR
  1103. ({str(search_over_body).lower()} AND b.body_rank > 0)
  1104. )
  1105. """
  1106. # Add any additional filters
  1107. if settings.filters:
  1108. filter_clause, params = apply_filters(settings.filters, params)
  1109. where_clauses.append(filter_clause)
  1110. if where_clauses:
  1111. query += f" AND {' AND '.join(where_clauses)}"
  1112. query += """
  1113. )
  1114. SELECT
  1115. document_id,
  1116. metadata,
  1117. rank as score,
  1118. debug_metadata_rank,
  1119. debug_body_rank
  1120. FROM combined_scores
  1121. WHERE rank > 0
  1122. ORDER BY rank DESC
  1123. OFFSET ${offset_param} LIMIT ${limit_param}
  1124. """.format(
  1125. offset_param=len(params) + 1,
  1126. limit_param=len(params) + 2,
  1127. )
  1128. # Add offset and limit to params
  1129. params.extend([settings.offset, settings.limit])
  1130. # Execute query
  1131. results = await self.connection_manager.fetch_query(query, params)
  1132. # Format results with complete document metadata
  1133. return [
  1134. {
  1135. "document_id": str(r["document_id"]),
  1136. "metadata": (
  1137. json.loads(r["metadata"])
  1138. if isinstance(r["metadata"], str)
  1139. else r["metadata"]
  1140. ),
  1141. "score": float(r["score"]),
  1142. "debug_metadata_rank": float(r["debug_metadata_rank"]),
  1143. "debug_body_rank": float(r["debug_body_rank"]),
  1144. }
  1145. for r in results
  1146. ]
  1147. def _get_index_options(
  1148. self,
  1149. method: IndexMethod,
  1150. index_arguments: Optional[IndexArgsIVFFlat | IndexArgsHNSW],
  1151. ) -> str:
  1152. if method == IndexMethod.ivfflat:
  1153. if isinstance(index_arguments, IndexArgsIVFFlat):
  1154. return f"WITH (lists={index_arguments.n_lists})"
  1155. else:
  1156. # Default value if no arguments provided
  1157. return "WITH (lists=100)"
  1158. elif method == IndexMethod.hnsw:
  1159. if isinstance(index_arguments, IndexArgsHNSW):
  1160. return f"WITH (m={index_arguments.m}, ef_construction={index_arguments.ef_construction})"
  1161. else:
  1162. # Default values if no arguments provided
  1163. return "WITH (m=16, ef_construction=64)"
  1164. else:
  1165. return "" # No options for other methods