chunks.py 48 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308
  1. import copy
  2. import json
  3. import logging
  4. import math
  5. import time
  6. import uuid
  7. from typing import Any, Optional, TypedDict
  8. from uuid import UUID
  9. import numpy as np
  10. from core.base import (
  11. ChunkSearchResult,
  12. Handler,
  13. IndexArgsHNSW,
  14. IndexArgsIVFFlat,
  15. IndexMeasure,
  16. IndexMethod,
  17. R2RException,
  18. SearchSettings,
  19. VectorEntry,
  20. VectorQuantizationType,
  21. VectorTableName,
  22. )
  23. from core.base.utils import _decorate_vector_type
  24. from .base import PostgresConnectionManager
  25. from .filters import apply_filters
  26. from .utils import psql_quote_literal
  27. logger = logging.getLogger()
  28. def index_measure_to_ops(
  29. measure: IndexMeasure,
  30. quantization_type: VectorQuantizationType = VectorQuantizationType.FP32,
  31. ):
  32. return _decorate_vector_type(measure.ops, quantization_type)
  33. def quantize_vector_to_binary(
  34. vector: list[float] | np.ndarray,
  35. threshold: float = 0.0,
  36. ) -> bytes:
  37. """Quantizes a float vector to a binary vector string for PostgreSQL bit
  38. type. Used when quantization_type is INT1.
  39. Args:
  40. vector (List[float] | np.ndarray): Input vector of floats
  41. threshold (float, optional): Threshold for binarization. Defaults to 0.0.
  42. Returns:
  43. str: Binary string representation for PostgreSQL bit type
  44. """
  45. # Convert input to numpy array if it isn't already
  46. if not isinstance(vector, np.ndarray):
  47. vector = np.array(vector)
  48. # Convert to binary (1 where value > threshold, 0 otherwise)
  49. binary_vector = (vector > threshold).astype(int)
  50. # Convert to string of 1s and 0s
  51. # Convert to string of 1s and 0s, then to bytes
  52. binary_string = "".join(map(str, binary_vector))
  53. return binary_string.encode("ascii")
  54. class HybridSearchIntermediateResult(TypedDict):
  55. semantic_rank: int
  56. full_text_rank: int
  57. data: ChunkSearchResult
  58. rrf_score: float
  59. class PostgresChunksHandler(Handler):
  60. TABLE_NAME = VectorTableName.CHUNKS
  61. def __init__(
  62. self,
  63. project_name: str,
  64. connection_manager: PostgresConnectionManager,
  65. dimension: int | float,
  66. quantization_type: VectorQuantizationType,
  67. ):
  68. super().__init__(project_name, connection_manager)
  69. self.dimension = dimension
  70. self.quantization_type = quantization_type
  71. async def create_tables(self):
  72. # First check if table already exists and validate dimensions
  73. table_exists_query = """
  74. SELECT EXISTS (
  75. SELECT FROM pg_tables
  76. WHERE schemaname = $1
  77. AND tablename = $2
  78. );
  79. """
  80. table_name = VectorTableName.CHUNKS
  81. table_exists = await self.connection_manager.fetch_query(
  82. table_exists_query, (self.project_name, table_name)
  83. )
  84. if len(table_exists) > 0 and table_exists[0]["exists"]:
  85. # Table exists, check vector dimension
  86. vector_dim_query = """
  87. SELECT a.atttypmod as dimension
  88. FROM pg_attribute a
  89. JOIN pg_class c ON a.attrelid = c.oid
  90. JOIN pg_namespace n ON c.relnamespace = n.oid
  91. WHERE n.nspname = $1
  92. AND c.relname = $2
  93. AND a.attname = 'vec';
  94. """
  95. vector_dim_result = await self.connection_manager.fetch_query(
  96. vector_dim_query, (self.project_name, table_name)
  97. )
  98. if vector_dim_result and len(vector_dim_result) > 0:
  99. existing_dimension = vector_dim_result[0]["dimension"]
  100. # In pgvector, dimension is stored as atttypmod - 4
  101. if existing_dimension > 0: # If it has a specific dimension
  102. # Compare with provided dimension
  103. if (
  104. self.dimension > 0
  105. and existing_dimension != self.dimension
  106. ):
  107. raise ValueError(
  108. f"Dimension mismatch: Table '{self.project_name}.{table_name}' was created with "
  109. f"dimension {existing_dimension}, but {self.dimension} was provided. "
  110. f"You must use the same dimension for existing tables."
  111. )
  112. # Check for old table name
  113. check_query = """
  114. SELECT EXISTS (
  115. SELECT FROM pg_tables
  116. WHERE schemaname = $1
  117. AND tablename = $2
  118. );
  119. """
  120. old_table_exists = await self.connection_manager.fetch_query(
  121. check_query, (self.project_name, self.project_name)
  122. )
  123. if len(old_table_exists) > 0 and old_table_exists[0]["exists"]:
  124. raise ValueError(
  125. f"Found old vector table '{self.project_name}.{self.project_name}'. "
  126. "Please run `r2r db upgrade` with the CLI, or to run manually, "
  127. "run in R2R/py/migrations with 'alembic upgrade head' to update "
  128. "your database schema to the new version."
  129. )
  130. binary_col = (
  131. ""
  132. if self.quantization_type != VectorQuantizationType.INT1
  133. else f"vec_binary bit({self.dimension}),"
  134. )
  135. if self.dimension > 0:
  136. vector_col = f"vec vector({self.dimension})"
  137. else:
  138. vector_col = "vec vector"
  139. query = f"""
  140. CREATE TABLE IF NOT EXISTS {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} (
  141. id UUID PRIMARY KEY,
  142. document_id UUID,
  143. owner_id UUID,
  144. collection_ids UUID[],
  145. {vector_col},
  146. {binary_col}
  147. text TEXT,
  148. metadata JSONB,
  149. fts tsvector GENERATED ALWAYS AS (to_tsvector('english', text)) STORED
  150. );
  151. CREATE INDEX IF NOT EXISTS idx_vectors_document_id ON {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} (document_id);
  152. CREATE INDEX IF NOT EXISTS idx_vectors_owner_id ON {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} (owner_id);
  153. CREATE INDEX IF NOT EXISTS idx_vectors_collection_ids ON {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} USING GIN (collection_ids);
  154. CREATE INDEX IF NOT EXISTS idx_vectors_text ON {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} USING GIN (to_tsvector('english', text));
  155. """
  156. await self.connection_manager.execute_query(query)
  157. async def upsert(self, entry: VectorEntry) -> None:
  158. """Upsert function that handles vector quantization only when
  159. quantization_type is INT1.
  160. Matches the table schema where vec_binary column only exists for INT1
  161. quantization.
  162. """
  163. # Check the quantization type to determine which columns to use
  164. if self.quantization_type == VectorQuantizationType.INT1:
  165. bit_dim = (
  166. "" if math.isnan(self.dimension) else f"({self.dimension})"
  167. )
  168. # For quantized vectors, use vec_binary column
  169. query = f"""
  170. INSERT INTO {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  171. (id, document_id, owner_id, collection_ids, vec, vec_binary, text, metadata)
  172. VALUES ($1, $2, $3, $4, $5, $6::bit({bit_dim}), $7, $8)
  173. ON CONFLICT (id) DO UPDATE SET
  174. document_id = EXCLUDED.document_id,
  175. owner_id = EXCLUDED.owner_id,
  176. collection_ids = EXCLUDED.collection_ids,
  177. vec = EXCLUDED.vec,
  178. vec_binary = EXCLUDED.vec_binary,
  179. text = EXCLUDED.text,
  180. metadata = EXCLUDED.metadata;
  181. """
  182. await self.connection_manager.execute_query(
  183. query,
  184. (
  185. entry.id,
  186. entry.document_id,
  187. entry.owner_id,
  188. entry.collection_ids,
  189. str(entry.vector.data),
  190. quantize_vector_to_binary(
  191. entry.vector.data
  192. ), # Convert to binary
  193. entry.text,
  194. json.dumps(entry.metadata),
  195. ),
  196. )
  197. else:
  198. # For regular vectors, use vec column only
  199. query = f"""
  200. INSERT INTO {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  201. (id, document_id, owner_id, collection_ids, vec, text, metadata)
  202. VALUES ($1, $2, $3, $4, $5, $6, $7)
  203. ON CONFLICT (id) DO UPDATE SET
  204. document_id = EXCLUDED.document_id,
  205. owner_id = EXCLUDED.owner_id,
  206. collection_ids = EXCLUDED.collection_ids,
  207. vec = EXCLUDED.vec,
  208. text = EXCLUDED.text,
  209. metadata = EXCLUDED.metadata;
  210. """
  211. await self.connection_manager.execute_query(
  212. query,
  213. (
  214. entry.id,
  215. entry.document_id,
  216. entry.owner_id,
  217. entry.collection_ids,
  218. str(entry.vector.data),
  219. entry.text,
  220. json.dumps(entry.metadata),
  221. ),
  222. )
  223. async def upsert_entries(self, entries: list[VectorEntry]) -> None:
  224. """Batch upsert function that handles vector quantization only when
  225. quantization_type is INT1.
  226. Matches the table schema where vec_binary column only exists for INT1
  227. quantization.
  228. """
  229. if self.quantization_type == VectorQuantizationType.INT1:
  230. bit_dim = (
  231. "" if math.isnan(self.dimension) else f"({self.dimension})"
  232. )
  233. # For quantized vectors, use vec_binary column
  234. query = f"""
  235. INSERT INTO {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  236. (id, document_id, owner_id, collection_ids, vec, vec_binary, text, metadata)
  237. VALUES ($1, $2, $3, $4, $5, $6::bit({bit_dim}), $7, $8)
  238. ON CONFLICT (id) DO UPDATE SET
  239. document_id = EXCLUDED.document_id,
  240. owner_id = EXCLUDED.owner_id,
  241. collection_ids = EXCLUDED.collection_ids,
  242. vec = EXCLUDED.vec,
  243. vec_binary = EXCLUDED.vec_binary,
  244. text = EXCLUDED.text,
  245. metadata = EXCLUDED.metadata;
  246. """
  247. bin_params = [
  248. (
  249. entry.id,
  250. entry.document_id,
  251. entry.owner_id,
  252. entry.collection_ids,
  253. str(entry.vector.data),
  254. quantize_vector_to_binary(
  255. entry.vector.data
  256. ), # Convert to binary
  257. entry.text,
  258. json.dumps(entry.metadata),
  259. )
  260. for entry in entries
  261. ]
  262. await self.connection_manager.execute_many(query, bin_params)
  263. else:
  264. # For regular vectors, use vec column only
  265. query = f"""
  266. INSERT INTO {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  267. (id, document_id, owner_id, collection_ids, vec, text, metadata)
  268. VALUES ($1, $2, $3, $4, $5, $6, $7)
  269. ON CONFLICT (id) DO UPDATE SET
  270. document_id = EXCLUDED.document_id,
  271. owner_id = EXCLUDED.owner_id,
  272. collection_ids = EXCLUDED.collection_ids,
  273. vec = EXCLUDED.vec,
  274. text = EXCLUDED.text,
  275. metadata = EXCLUDED.metadata;
  276. """
  277. params = [
  278. (
  279. entry.id,
  280. entry.document_id,
  281. entry.owner_id,
  282. entry.collection_ids,
  283. str(entry.vector.data),
  284. entry.text,
  285. json.dumps(entry.metadata),
  286. )
  287. for entry in entries
  288. ]
  289. await self.connection_manager.execute_many(query, params)
  290. async def semantic_search(
  291. self, query_vector: list[float], search_settings: SearchSettings
  292. ) -> list[ChunkSearchResult]:
  293. try:
  294. imeasure_obj = IndexMeasure(
  295. search_settings.chunk_settings.index_measure
  296. )
  297. except ValueError:
  298. raise ValueError("Invalid index measure") from None
  299. table_name = self._get_table_name(PostgresChunksHandler.TABLE_NAME)
  300. cols = [
  301. f"{table_name}.id",
  302. f"{table_name}.document_id",
  303. f"{table_name}.owner_id",
  304. f"{table_name}.collection_ids",
  305. f"{table_name}.text",
  306. ]
  307. params: list[str | int | bytes] = []
  308. # For binary vectors (INT1), implement two-stage search
  309. if self.quantization_type == VectorQuantizationType.INT1:
  310. # Convert query vector to binary format
  311. binary_query = quantize_vector_to_binary(query_vector)
  312. # TODO - Put depth multiplier in config / settings
  313. extended_limit = (
  314. search_settings.limit * 20
  315. ) # Get 20x candidates for re-ranking
  316. if (
  317. imeasure_obj == IndexMeasure.hamming_distance
  318. or imeasure_obj == IndexMeasure.jaccard_distance
  319. ):
  320. binary_search_measure_repr = imeasure_obj.pgvector_repr
  321. else:
  322. binary_search_measure_repr = (
  323. IndexMeasure.hamming_distance.pgvector_repr
  324. )
  325. # Use binary column and binary-specific distance measures for first stage
  326. bit_dim = (
  327. "" if math.isnan(self.dimension) else f"({self.dimension})"
  328. )
  329. stage1_distance = f"{table_name}.vec_binary {binary_search_measure_repr} $1::bit{bit_dim}"
  330. stage1_param = binary_query
  331. cols.append(
  332. f"{table_name}.vec"
  333. ) # Need original vector for re-ranking
  334. if search_settings.include_metadatas:
  335. cols.append(f"{table_name}.metadata")
  336. select_clause = ", ".join(cols)
  337. where_clause = ""
  338. params.append(stage1_param)
  339. if search_settings.filters:
  340. where_clause, params = apply_filters(
  341. search_settings.filters, params, mode="where_clause"
  342. )
  343. vector_dim = (
  344. "" if math.isnan(self.dimension) else f"({self.dimension})"
  345. )
  346. # First stage: Get candidates using binary search
  347. query = f"""
  348. WITH candidates AS (
  349. SELECT {select_clause},
  350. ({stage1_distance}) as binary_distance
  351. FROM {table_name}
  352. {where_clause}
  353. ORDER BY {stage1_distance}
  354. LIMIT ${len(params) + 1}
  355. OFFSET ${len(params) + 2}
  356. )
  357. -- Second stage: Re-rank using original vectors
  358. SELECT
  359. id,
  360. document_id,
  361. owner_id,
  362. collection_ids,
  363. text,
  364. {"metadata," if search_settings.include_metadatas else ""}
  365. (vec <=> ${len(params) + 4}::vector{vector_dim}) as distance
  366. FROM candidates
  367. ORDER BY distance
  368. LIMIT ${len(params) + 3}
  369. """
  370. params.extend(
  371. [
  372. extended_limit, # First stage limit
  373. search_settings.offset,
  374. search_settings.limit, # Final limit
  375. str(query_vector), # For re-ranking
  376. ]
  377. )
  378. else:
  379. # Standard float vector handling
  380. vector_dim = (
  381. "" if math.isnan(self.dimension) else f"({self.dimension})"
  382. )
  383. distance_calc = f"{table_name}.vec {search_settings.chunk_settings.index_measure.pgvector_repr} $1::vector{vector_dim}"
  384. query_param = str(query_vector)
  385. if search_settings.include_scores:
  386. cols.append(f"({distance_calc}) AS distance")
  387. if search_settings.include_metadatas:
  388. cols.append(f"{table_name}.metadata")
  389. select_clause = ", ".join(cols)
  390. where_clause = ""
  391. params.append(query_param)
  392. if search_settings.filters:
  393. where_clause, new_params = apply_filters(
  394. search_settings.filters,
  395. params,
  396. mode="where_clause", # Get just conditions without WHERE
  397. )
  398. params = new_params
  399. query = f"""
  400. SELECT {select_clause}
  401. FROM {table_name}
  402. {where_clause}
  403. ORDER BY {distance_calc}
  404. LIMIT ${len(params) + 1}
  405. OFFSET ${len(params) + 2}
  406. """
  407. params.extend([search_settings.limit, search_settings.offset])
  408. results = await self.connection_manager.fetch_query(query, params)
  409. return [
  410. ChunkSearchResult(
  411. id=UUID(str(result["id"])),
  412. document_id=UUID(str(result["document_id"])),
  413. owner_id=UUID(str(result["owner_id"])),
  414. collection_ids=result["collection_ids"],
  415. text=result["text"],
  416. score=(
  417. (1 - float(result["distance"]))
  418. if "distance" in result
  419. else -1
  420. ),
  421. metadata=(
  422. json.loads(result["metadata"])
  423. if search_settings.include_metadatas
  424. else {}
  425. ),
  426. )
  427. for result in results
  428. ]
  429. async def full_text_search(
  430. self, query_text: str, search_settings: SearchSettings
  431. ) -> list[ChunkSearchResult]:
  432. conditions = []
  433. params: list[str | int | bytes] = [query_text]
  434. conditions.append("fts @@ websearch_to_tsquery('english', $1)")
  435. if search_settings.filters:
  436. filter_condition, params = apply_filters(
  437. search_settings.filters, params, mode="condition_only"
  438. )
  439. if filter_condition:
  440. conditions.append(filter_condition)
  441. where_clause = "WHERE " + " AND ".join(conditions)
  442. query = f"""
  443. SELECT
  444. id,
  445. document_id,
  446. owner_id,
  447. collection_ids,
  448. text,
  449. metadata,
  450. ts_rank(fts, websearch_to_tsquery('english', $1), 32) as rank
  451. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  452. {where_clause}
  453. ORDER BY rank DESC
  454. OFFSET ${len(params) + 1}
  455. LIMIT ${len(params) + 2}
  456. """
  457. params.extend(
  458. [
  459. search_settings.offset,
  460. search_settings.hybrid_settings.full_text_limit,
  461. ]
  462. )
  463. results = await self.connection_manager.fetch_query(query, params)
  464. return [
  465. ChunkSearchResult(
  466. id=UUID(str(r["id"])),
  467. document_id=UUID(str(r["document_id"])),
  468. owner_id=UUID(str(r["owner_id"])),
  469. collection_ids=r["collection_ids"],
  470. text=r["text"],
  471. score=float(r["rank"]),
  472. metadata=json.loads(r["metadata"]),
  473. )
  474. for r in results
  475. ]
  476. async def hybrid_search(
  477. self,
  478. query_text: str,
  479. query_vector: list[float],
  480. search_settings: SearchSettings,
  481. *args,
  482. **kwargs,
  483. ) -> list[ChunkSearchResult]:
  484. if search_settings.hybrid_settings is None:
  485. raise ValueError(
  486. "Please provide a valid `hybrid_settings` in the `search_settings`."
  487. )
  488. if (
  489. search_settings.hybrid_settings.full_text_limit
  490. < search_settings.limit
  491. ):
  492. raise ValueError(
  493. "The `full_text_limit` must be greater than or equal to the `limit`."
  494. )
  495. semantic_settings = copy.deepcopy(search_settings)
  496. semantic_settings.limit += search_settings.offset
  497. full_text_settings = copy.deepcopy(search_settings)
  498. full_text_settings.hybrid_settings.full_text_limit += (
  499. search_settings.offset
  500. )
  501. semantic_results: list[ChunkSearchResult] = await self.semantic_search(
  502. query_vector, semantic_settings
  503. )
  504. full_text_results: list[
  505. ChunkSearchResult
  506. ] = await self.full_text_search(query_text, full_text_settings)
  507. semantic_limit = search_settings.limit
  508. full_text_limit = search_settings.hybrid_settings.full_text_limit
  509. semantic_weight = search_settings.hybrid_settings.semantic_weight
  510. full_text_weight = search_settings.hybrid_settings.full_text_weight
  511. rrf_k = search_settings.hybrid_settings.rrf_k
  512. combined_results: dict[uuid.UUID, HybridSearchIntermediateResult] = {}
  513. for rank, result in enumerate(semantic_results, 1):
  514. combined_results[result.id] = {
  515. "semantic_rank": rank,
  516. "full_text_rank": full_text_limit,
  517. "data": result,
  518. "rrf_score": 0.0, # Initialize with 0, will be calculated later
  519. }
  520. for rank, result in enumerate(full_text_results, 1):
  521. if result.id in combined_results:
  522. combined_results[result.id]["full_text_rank"] = rank
  523. else:
  524. combined_results[result.id] = {
  525. "semantic_rank": semantic_limit,
  526. "full_text_rank": rank,
  527. "data": result,
  528. "rrf_score": 0.0, # Initialize with 0, will be calculated later
  529. }
  530. combined_results = {
  531. k: v
  532. for k, v in combined_results.items()
  533. if v["semantic_rank"] <= semantic_limit * 2
  534. and v["full_text_rank"] <= full_text_limit * 2
  535. }
  536. for hyb_result in combined_results.values():
  537. semantic_score = 1 / (rrf_k + hyb_result["semantic_rank"])
  538. full_text_score = 1 / (rrf_k + hyb_result["full_text_rank"])
  539. hyb_result["rrf_score"] = (
  540. semantic_score * semantic_weight
  541. + full_text_score * full_text_weight
  542. ) / (semantic_weight + full_text_weight)
  543. sorted_results = sorted(
  544. combined_results.values(),
  545. key=lambda x: x["rrf_score"],
  546. reverse=True,
  547. )
  548. offset_results = sorted_results[
  549. search_settings.offset : search_settings.offset
  550. + search_settings.limit
  551. ]
  552. return [
  553. ChunkSearchResult(
  554. id=result["data"].id,
  555. document_id=result["data"].document_id,
  556. owner_id=result["data"].owner_id,
  557. collection_ids=result["data"].collection_ids,
  558. text=result["data"].text,
  559. score=result["rrf_score"],
  560. metadata={
  561. **result["data"].metadata,
  562. "semantic_rank": result["semantic_rank"],
  563. "full_text_rank": result["full_text_rank"],
  564. },
  565. )
  566. for result in offset_results
  567. ]
  568. async def delete(
  569. self, filters: dict[str, Any]
  570. ) -> dict[str, dict[str, str]]:
  571. params: list[str | int | bytes] = []
  572. where_clause, params = apply_filters(
  573. filters, params, mode="condition_only"
  574. )
  575. query = f"""
  576. DELETE FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  577. WHERE {where_clause}
  578. RETURNING id, document_id, text;
  579. """
  580. results = await self.connection_manager.fetch_query(query, params)
  581. return {
  582. str(result["id"]): {
  583. "status": "deleted",
  584. "id": str(result["id"]),
  585. "document_id": str(result["document_id"]),
  586. "text": result["text"],
  587. }
  588. for result in results
  589. }
  590. async def assign_document_chunks_to_collection(
  591. self, document_id: UUID, collection_id: UUID
  592. ) -> None:
  593. query = f"""
  594. UPDATE {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  595. SET collection_ids = array_append(collection_ids, $1)
  596. WHERE document_id = $2 AND NOT ($1 = ANY(collection_ids));
  597. """
  598. return await self.connection_manager.execute_query(
  599. query, (str(collection_id), str(document_id))
  600. )
  601. async def remove_document_from_collection_vector(
  602. self, document_id: UUID, collection_id: UUID
  603. ) -> None:
  604. query = f"""
  605. UPDATE {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  606. SET collection_ids = array_remove(collection_ids, $1)
  607. WHERE document_id = $2;
  608. """
  609. await self.connection_manager.execute_query(
  610. query, (collection_id, document_id)
  611. )
  612. async def delete_user_vector(self, owner_id: UUID) -> None:
  613. query = f"""
  614. DELETE FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  615. WHERE owner_id = $1;
  616. """
  617. await self.connection_manager.execute_query(query, (owner_id,))
  618. async def delete_collection_vector(self, collection_id: UUID) -> None:
  619. query = f"""
  620. DELETE FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  621. WHERE $1 = ANY(collection_ids)
  622. RETURNING collection_ids
  623. """
  624. await self.connection_manager.fetchrow_query(query, (collection_id,))
  625. return None
  626. async def list_document_chunks(
  627. self,
  628. document_id: UUID,
  629. offset: int,
  630. limit: int,
  631. include_vectors: bool = False,
  632. ) -> dict[str, Any]:
  633. vector_select = ", vec" if include_vectors else ""
  634. limit_clause = f"LIMIT {limit}" if limit > -1 else ""
  635. query = f"""
  636. SELECT id, document_id, owner_id, collection_ids, text, metadata{vector_select}, COUNT(*) OVER() AS total
  637. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  638. WHERE document_id = $1
  639. ORDER BY (metadata->>'chunk_order')::integer
  640. OFFSET $2
  641. {limit_clause};
  642. """
  643. params = [document_id, offset]
  644. results = await self.connection_manager.fetch_query(query, params)
  645. chunks = []
  646. total = 0
  647. if results:
  648. total = results[0].get("total", 0)
  649. chunks = [
  650. {
  651. "id": result["id"],
  652. "document_id": result["document_id"],
  653. "owner_id": result["owner_id"],
  654. "collection_ids": result["collection_ids"],
  655. "text": result["text"],
  656. "metadata": json.loads(result["metadata"]),
  657. "vector": (
  658. json.loads(result["vec"]) if include_vectors else None
  659. ),
  660. }
  661. for result in results
  662. ]
  663. return {"results": chunks, "total_entries": total}
  664. async def get_chunk(self, id: UUID) -> dict:
  665. query = f"""
  666. SELECT id, document_id, owner_id, collection_ids, text, metadata
  667. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  668. WHERE id = $1;
  669. """
  670. result = await self.connection_manager.fetchrow_query(query, (id,))
  671. if result:
  672. return {
  673. "id": result["id"],
  674. "document_id": result["document_id"],
  675. "owner_id": result["owner_id"],
  676. "collection_ids": result["collection_ids"],
  677. "text": result["text"],
  678. "metadata": json.loads(result["metadata"]),
  679. }
  680. raise R2RException(
  681. message=f"Chunk with ID {id} not found", status_code=404
  682. )
  683. async def create_index(
  684. self,
  685. table_name: Optional[VectorTableName] = None,
  686. index_measure: IndexMeasure = IndexMeasure.cosine_distance,
  687. index_method: IndexMethod = IndexMethod.auto,
  688. index_arguments: Optional[IndexArgsIVFFlat | IndexArgsHNSW] = None,
  689. index_name: Optional[str] = None,
  690. index_column: Optional[str] = None,
  691. concurrently: bool = True,
  692. ) -> None:
  693. """Creates an index for the collection.
  694. Note:
  695. When `vecs` creates an index on a pgvector column in PostgreSQL, it uses a multi-step
  696. process that enables performant indexes to be built for large collections with low end
  697. database hardware.
  698. Those steps are:
  699. - Creates a new table with a different name
  700. - Randomly selects records from the existing table
  701. - Inserts the random records from the existing table into the new table
  702. - Creates the requested vector index on the new table
  703. - Upserts all data from the existing table into the new table
  704. - Drops the existing table
  705. - Renames the new table to the existing tables name
  706. If you create dependencies (like views) on the table that underpins
  707. a `vecs.Collection` the `create_index` step may require you to drop those dependencies before
  708. it will succeed.
  709. Args:
  710. index_measure (IndexMeasure, optional): The measure to index for. Defaults to 'cosine_distance'.
  711. index_method (IndexMethod, optional): The indexing method to use. Defaults to 'auto'.
  712. index_arguments: (IndexArgsIVFFlat | IndexArgsHNSW, optional): Index type specific arguments
  713. index_name (str, optional): The name of the index to create. Defaults to None.
  714. concurrently (bool, optional): Whether to create the index concurrently. Defaults to True.
  715. Raises:
  716. ValueError: If an invalid index method is used, or if *replace* is False and an index already exists.
  717. """
  718. if table_name == VectorTableName.CHUNKS:
  719. table_name_str = f"{self.project_name}.{VectorTableName.CHUNKS}" # TODO - Fix bug in vector table naming convention
  720. if index_column:
  721. col_name = index_column
  722. else:
  723. col_name = (
  724. "vec"
  725. if (
  726. index_measure != IndexMeasure.hamming_distance
  727. and index_measure != IndexMeasure.jaccard_distance
  728. )
  729. else "vec_binary"
  730. )
  731. elif table_name == VectorTableName.ENTITIES_DOCUMENT:
  732. table_name_str = (
  733. f"{self.project_name}.{VectorTableName.ENTITIES_DOCUMENT}"
  734. )
  735. col_name = "description_embedding"
  736. elif table_name == VectorTableName.GRAPHS_ENTITIES:
  737. table_name_str = (
  738. f"{self.project_name}.{VectorTableName.GRAPHS_ENTITIES}"
  739. )
  740. col_name = "description_embedding"
  741. elif table_name == VectorTableName.COMMUNITIES:
  742. table_name_str = (
  743. f"{self.project_name}.{VectorTableName.COMMUNITIES}"
  744. )
  745. col_name = "embedding"
  746. else:
  747. raise ValueError("invalid table name")
  748. if index_method not in (
  749. IndexMethod.ivfflat,
  750. IndexMethod.hnsw,
  751. IndexMethod.auto,
  752. ):
  753. raise ValueError("invalid index method")
  754. if index_arguments:
  755. # Disallow case where user submits index arguments but uses the
  756. # IndexMethod.auto index (index build arguments should only be
  757. # used with a specific index)
  758. if index_method == IndexMethod.auto:
  759. raise ValueError(
  760. "Index build parameters are not allowed when using the IndexMethod.auto index."
  761. )
  762. # Disallow case where user specifies one index type but submits
  763. # index build arguments for the other index type
  764. if (
  765. isinstance(index_arguments, IndexArgsHNSW)
  766. and index_method != IndexMethod.hnsw
  767. ) or (
  768. isinstance(index_arguments, IndexArgsIVFFlat)
  769. and index_method != IndexMethod.ivfflat
  770. ):
  771. raise ValueError(
  772. f"{index_arguments.__class__.__name__} build parameters were supplied but {index_method} index was specified."
  773. )
  774. if index_method == IndexMethod.auto:
  775. index_method = IndexMethod.hnsw
  776. ops = index_measure_to_ops(
  777. index_measure # , quantization_type=self.quantization_type
  778. )
  779. if ops is None:
  780. raise ValueError("Unknown index measure")
  781. concurrently_sql = "CONCURRENTLY" if concurrently else ""
  782. index_name = (
  783. index_name
  784. or f"ix_{ops}_{index_method}__{col_name}_{time.strftime('%Y%m%d%H%M%S')}"
  785. )
  786. create_index_sql = f"""
  787. CREATE INDEX {concurrently_sql} {index_name}
  788. ON {table_name_str}
  789. USING {index_method} ({col_name} {ops}) {self._get_index_options(index_method, index_arguments)};
  790. """
  791. try:
  792. if concurrently:
  793. async with (
  794. self.connection_manager.pool.get_connection() as conn # type: ignore
  795. ):
  796. # Disable automatic transaction management
  797. await conn.execute(
  798. "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED"
  799. )
  800. await conn.execute(create_index_sql)
  801. else:
  802. # Non-concurrent index creation can use normal query execution
  803. await self.connection_manager.execute_query(create_index_sql)
  804. except Exception as e:
  805. raise Exception(f"Failed to create index: {e}") from e
  806. return None
  807. async def list_indices(
  808. self,
  809. offset: int,
  810. limit: int,
  811. filters: Optional[dict[str, Any]] = None,
  812. ) -> dict:
  813. where_clauses = []
  814. params: list[Any] = [self.project_name] # Start with schema name
  815. param_count = 1
  816. # Handle filtering
  817. if filters:
  818. if "table_name" in filters:
  819. where_clauses.append(f"i.tablename = ${param_count + 1}")
  820. params.append(filters["table_name"])
  821. param_count += 1
  822. if "index_method" in filters:
  823. where_clauses.append(f"am.amname = ${param_count + 1}")
  824. params.append(filters["index_method"])
  825. param_count += 1
  826. if "index_name" in filters:
  827. where_clauses.append(
  828. f"LOWER(i.indexname) LIKE LOWER(${param_count + 1})"
  829. )
  830. params.append(f"%{filters['index_name']}%")
  831. param_count += 1
  832. where_clause = " AND ".join(where_clauses) if where_clauses else ""
  833. if where_clause:
  834. where_clause = f"AND {where_clause}"
  835. query = f"""
  836. WITH index_info AS (
  837. SELECT
  838. i.indexname as name,
  839. i.tablename as table_name,
  840. i.indexdef as definition,
  841. am.amname as method,
  842. pg_relation_size(c.oid) as size_in_bytes,
  843. c.reltuples::bigint as row_estimate,
  844. COALESCE(psat.idx_scan, 0) as number_of_scans,
  845. COALESCE(psat.idx_tup_read, 0) as tuples_read,
  846. COALESCE(psat.idx_tup_fetch, 0) as tuples_fetched,
  847. COUNT(*) OVER() as total_count
  848. FROM pg_indexes i
  849. JOIN pg_class c ON c.relname = i.indexname
  850. JOIN pg_am am ON c.relam = am.oid
  851. LEFT JOIN pg_stat_user_indexes psat ON psat.indexrelname = i.indexname
  852. AND psat.schemaname = i.schemaname
  853. WHERE i.schemaname = $1
  854. AND i.indexdef LIKE '%vector%'
  855. {where_clause}
  856. )
  857. SELECT *
  858. FROM index_info
  859. ORDER BY name
  860. LIMIT ${param_count + 1}
  861. OFFSET ${param_count + 2}
  862. """
  863. # Add limit and offset to params
  864. params.extend([limit, offset])
  865. results = await self.connection_manager.fetch_query(query, params)
  866. indices = []
  867. total_entries = 0
  868. if results:
  869. total_entries = results[0]["total_count"]
  870. for result in results:
  871. index_info = {
  872. "name": result["name"],
  873. "table_name": result["table_name"],
  874. "definition": result["definition"],
  875. "size_in_bytes": result["size_in_bytes"],
  876. "row_estimate": result["row_estimate"],
  877. "number_of_scans": result["number_of_scans"],
  878. "tuples_read": result["tuples_read"],
  879. "tuples_fetched": result["tuples_fetched"],
  880. }
  881. indices.append(index_info)
  882. return {"indices": indices, "total_entries": total_entries}
  883. async def delete_index(
  884. self,
  885. index_name: str,
  886. table_name: Optional[VectorTableName] = None,
  887. concurrently: bool = True,
  888. ) -> None:
  889. """Deletes a vector index.
  890. Args:
  891. index_name (str): Name of the index to delete
  892. table_name (VectorTableName, optional): Table the index belongs to
  893. concurrently (bool): Whether to drop the index concurrently
  894. Raises:
  895. ValueError: If table name is invalid or index doesn't exist
  896. Exception: If index deletion fails
  897. """
  898. # Validate table name and get column name
  899. if table_name == VectorTableName.CHUNKS:
  900. table_name_str = f"{self.project_name}.{VectorTableName.CHUNKS}"
  901. col_name = "vec"
  902. elif table_name == VectorTableName.ENTITIES_DOCUMENT:
  903. table_name_str = (
  904. f"{self.project_name}.{VectorTableName.ENTITIES_DOCUMENT}"
  905. )
  906. col_name = "description_embedding"
  907. elif table_name == VectorTableName.GRAPHS_ENTITIES:
  908. table_name_str = (
  909. f"{self.project_name}.{VectorTableName.GRAPHS_ENTITIES}"
  910. )
  911. col_name = "description_embedding"
  912. elif table_name == VectorTableName.COMMUNITIES:
  913. table_name_str = (
  914. f"{self.project_name}.{VectorTableName.COMMUNITIES}"
  915. )
  916. col_name = "description_embedding"
  917. else:
  918. raise ValueError("invalid table name")
  919. # Extract schema and base table name
  920. schema_name, base_table_name = table_name_str.split(".")
  921. # Verify index exists and is a vector index
  922. query = """
  923. SELECT indexdef
  924. FROM pg_indexes
  925. WHERE indexname = $1
  926. AND schemaname = $2
  927. AND tablename = $3
  928. AND indexdef LIKE $4
  929. """
  930. result = await self.connection_manager.fetchrow_query(
  931. query, (index_name, schema_name, base_table_name, f"%({col_name}%")
  932. )
  933. if not result:
  934. raise ValueError(
  935. f"Vector index '{index_name}' does not exist on table {table_name_str}"
  936. )
  937. # Drop the index
  938. concurrently_sql = "CONCURRENTLY" if concurrently else ""
  939. drop_query = (
  940. f"DROP INDEX {concurrently_sql} {schema_name}.{index_name}"
  941. )
  942. try:
  943. if concurrently:
  944. async with (
  945. self.connection_manager.pool.get_connection() as conn # type: ignore
  946. ):
  947. # Disable automatic transaction management
  948. await conn.execute(
  949. "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL READ COMMITTED"
  950. )
  951. await conn.execute(drop_query)
  952. else:
  953. await self.connection_manager.execute_query(drop_query)
  954. except Exception as e:
  955. raise Exception(f"Failed to delete index: {e}") from e
  956. async def list_chunks(
  957. self,
  958. offset: int,
  959. limit: int,
  960. filters: Optional[dict[str, Any]] = None,
  961. include_vectors: bool = False,
  962. ) -> dict[str, Any]:
  963. """List chunks with pagination support.
  964. Args:
  965. offset (int, optional): Number of records to skip. Defaults to 0.
  966. limit (int, optional): Maximum number of records to return. Defaults to 10.
  967. filters (dict, optional): Dictionary of filters to apply. Defaults to None.
  968. include_vectors (bool, optional): Whether to include vector data. Defaults to False.
  969. Returns:
  970. dict: Dictionary containing:
  971. - results: List of chunk records
  972. - total_entries: Total number of chunks matching the filters
  973. """
  974. vector_select = ", vec" if include_vectors else ""
  975. select_clause = f"""
  976. id, document_id, owner_id, collection_ids,
  977. text, metadata{vector_select}, COUNT(*) OVER() AS total_entries
  978. """
  979. params: list[str | int | bytes] = []
  980. where_clause = ""
  981. if filters:
  982. where_clause, params = apply_filters(
  983. filters, params, mode="where_clause"
  984. )
  985. query = f"""
  986. SELECT {select_clause}
  987. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  988. {where_clause}
  989. LIMIT ${len(params) + 1}
  990. OFFSET ${len(params) + 2}
  991. """
  992. params.extend([limit, offset])
  993. # Execute the query
  994. results = await self.connection_manager.fetch_query(query, params)
  995. # Process results
  996. chunks = []
  997. total_entries = 0
  998. if results:
  999. total_entries = results[0].get("total_entries", 0)
  1000. chunks = [
  1001. {
  1002. "id": str(result["id"]),
  1003. "document_id": str(result["document_id"]),
  1004. "owner_id": str(result["owner_id"]),
  1005. "collection_ids": result["collection_ids"],
  1006. "text": result["text"],
  1007. "metadata": json.loads(result["metadata"]),
  1008. "vector": (
  1009. json.loads(result["vec"]) if include_vectors else None
  1010. ),
  1011. }
  1012. for result in results
  1013. ]
  1014. return {"results": chunks, "total_entries": total_entries}
  1015. async def search_documents(
  1016. self,
  1017. query_text: str,
  1018. settings: SearchSettings,
  1019. ) -> list[dict[str, Any]]:
  1020. """Search for documents based on their metadata fields and/or body
  1021. text. Joins with documents table to get complete document metadata.
  1022. Args:
  1023. query_text (str): The search query text
  1024. settings (SearchSettings): Search settings including search preferences and filters
  1025. Returns:
  1026. list[dict[str, Any]]: List of documents with their search scores and complete metadata
  1027. """
  1028. where_clauses = []
  1029. params: list[str | int | bytes] = [query_text]
  1030. search_over_body = getattr(settings, "search_over_body", True)
  1031. search_over_metadata = getattr(settings, "search_over_metadata", True)
  1032. metadata_weight = getattr(settings, "metadata_weight", 3.0)
  1033. title_weight = getattr(settings, "title_weight", 1.0)
  1034. metadata_keys = getattr(
  1035. settings, "metadata_keys", ["title", "description"]
  1036. )
  1037. # Build the dynamic metadata field search expression
  1038. metadata_fields_expr = " || ' ' || ".join(
  1039. [
  1040. f"COALESCE(v.metadata->>{psql_quote_literal(key)}, '')"
  1041. for key in metadata_keys # type: ignore
  1042. ]
  1043. )
  1044. query = f"""
  1045. WITH
  1046. -- Metadata search scores
  1047. metadata_scores AS (
  1048. SELECT DISTINCT ON (v.document_id)
  1049. v.document_id,
  1050. d.metadata as doc_metadata,
  1051. CASE WHEN $1 = '' THEN 0.0
  1052. ELSE
  1053. ts_rank_cd(
  1054. setweight(to_tsvector('english', {metadata_fields_expr}), 'A'),
  1055. websearch_to_tsquery('english', $1),
  1056. 32
  1057. )
  1058. END as metadata_rank
  1059. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)} v
  1060. LEFT JOIN {self._get_table_name("documents")} d ON v.document_id = d.id
  1061. WHERE v.metadata IS NOT NULL
  1062. ),
  1063. -- Body search scores
  1064. body_scores AS (
  1065. SELECT
  1066. document_id,
  1067. AVG(
  1068. ts_rank_cd(
  1069. setweight(to_tsvector('english', COALESCE(text, '')), 'B'),
  1070. websearch_to_tsquery('english', $1),
  1071. 32
  1072. )
  1073. ) as body_rank
  1074. FROM {self._get_table_name(PostgresChunksHandler.TABLE_NAME)}
  1075. WHERE $1 != ''
  1076. {"AND to_tsvector('english', text) @@ websearch_to_tsquery('english', $1)" if search_over_body else ""}
  1077. GROUP BY document_id
  1078. ),
  1079. -- Combined scores with document metadata
  1080. combined_scores AS (
  1081. SELECT
  1082. COALESCE(m.document_id, b.document_id) as document_id,
  1083. m.doc_metadata as metadata,
  1084. COALESCE(m.metadata_rank, 0) as debug_metadata_rank,
  1085. COALESCE(b.body_rank, 0) as debug_body_rank,
  1086. CASE
  1087. WHEN {str(search_over_metadata).lower()} AND {str(search_over_body).lower()} THEN
  1088. COALESCE(m.metadata_rank, 0) * {metadata_weight} + COALESCE(b.body_rank, 0) * {title_weight}
  1089. WHEN {str(search_over_metadata).lower()} THEN
  1090. COALESCE(m.metadata_rank, 0)
  1091. WHEN {str(search_over_body).lower()} THEN
  1092. COALESCE(b.body_rank, 0)
  1093. ELSE 0
  1094. END as rank
  1095. FROM metadata_scores m
  1096. FULL OUTER JOIN body_scores b ON m.document_id = b.document_id
  1097. WHERE (
  1098. ($1 = '') OR
  1099. ({str(search_over_metadata).lower()} AND m.metadata_rank > 0) OR
  1100. ({str(search_over_body).lower()} AND b.body_rank > 0)
  1101. )
  1102. """
  1103. # Add any additional filters
  1104. if settings.filters:
  1105. filter_clause, params = apply_filters(settings.filters, params)
  1106. where_clauses.append(filter_clause)
  1107. if where_clauses:
  1108. query += f" AND {' AND '.join(where_clauses)}"
  1109. query += """
  1110. )
  1111. SELECT
  1112. document_id,
  1113. metadata,
  1114. rank as score,
  1115. debug_metadata_rank,
  1116. debug_body_rank
  1117. FROM combined_scores
  1118. WHERE rank > 0
  1119. ORDER BY rank DESC
  1120. OFFSET ${offset_param} LIMIT ${limit_param}
  1121. """.format(
  1122. offset_param=len(params) + 1,
  1123. limit_param=len(params) + 2,
  1124. )
  1125. # Add offset and limit to params
  1126. params.extend([settings.offset, settings.limit])
  1127. # Execute query
  1128. results = await self.connection_manager.fetch_query(query, params)
  1129. # Format results with complete document metadata
  1130. return [
  1131. {
  1132. "document_id": str(r["document_id"]),
  1133. "metadata": (
  1134. json.loads(r["metadata"])
  1135. if isinstance(r["metadata"], str)
  1136. else r["metadata"]
  1137. ),
  1138. "score": float(r["score"]),
  1139. "debug_metadata_rank": float(r["debug_metadata_rank"]),
  1140. "debug_body_rank": float(r["debug_body_rank"]),
  1141. }
  1142. for r in results
  1143. ]
  1144. def _get_index_options(
  1145. self,
  1146. method: IndexMethod,
  1147. index_arguments: Optional[IndexArgsIVFFlat | IndexArgsHNSW],
  1148. ) -> str:
  1149. if method == IndexMethod.ivfflat:
  1150. if isinstance(index_arguments, IndexArgsIVFFlat):
  1151. return f"WITH (lists={index_arguments.n_lists})"
  1152. else:
  1153. # Default value if no arguments provided
  1154. return "WITH (lists=100)"
  1155. elif method == IndexMethod.hnsw:
  1156. if isinstance(index_arguments, IndexArgsHNSW):
  1157. return f"WITH (m={index_arguments.m}, ef_construction={index_arguments.ef_construction})"
  1158. else:
  1159. # Default values if no arguments provided
  1160. return "WITH (m=16, ef_construction=64)"
  1161. else:
  1162. return "" # No options for other methods