users.py 35 KB


  1. import csv
  2. import json
  3. import tempfile
  4. from datetime import datetime
  5. from typing import IO, Optional
  6. from uuid import UUID
  7. from fastapi import HTTPException
  8. from core.base import CryptoProvider, Handler
  9. from core.base.abstractions import R2RException
  10. from core.utils import generate_user_id
  11. from shared.abstractions import User
  12. from .base import PostgresConnectionManager, QueryBuilder
  13. from .collections import PostgresCollectionsHandler
  14. class PostgresUserHandler(Handler):
  15. TABLE_NAME = "users"
  16. API_KEYS_TABLE_NAME = "users_api_keys"
  17. def __init__(
  18. self,
  19. project_name: str,
  20. connection_manager: PostgresConnectionManager,
  21. crypto_provider: CryptoProvider,
  22. ):
  23. super().__init__(project_name, connection_manager)
  24. self.crypto_provider = crypto_provider
  25. async def create_tables(self):
  26. user_table_query = f"""
  27. CREATE TABLE IF NOT EXISTS {self._get_table_name(PostgresUserHandler.TABLE_NAME)} (
  28. id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
  29. email TEXT UNIQUE NOT NULL,
  30. hashed_password TEXT NOT NULL,
  31. is_superuser BOOLEAN DEFAULT FALSE,
  32. is_active BOOLEAN DEFAULT TRUE,
  33. is_verified BOOLEAN DEFAULT FALSE,
  34. verification_code TEXT,
  35. verification_code_expiry TIMESTAMPTZ,
  36. name TEXT,
  37. bio TEXT,
  38. profile_picture TEXT,
  39. reset_token TEXT,
  40. reset_token_expiry TIMESTAMPTZ,
  41. collection_ids UUID[] NULL,
  42. limits_overrides JSONB,
  43. created_at TIMESTAMPTZ DEFAULT NOW(),
  44. updated_at TIMESTAMPTZ DEFAULT NOW()
  45. );
  46. """
  47. # API keys table with updated_at instead of last_used_at
  48. api_keys_table_query = f"""
  49. CREATE TABLE IF NOT EXISTS {self._get_table_name(PostgresUserHandler.API_KEYS_TABLE_NAME)} (
  50. id UUID PRIMARY KEY DEFAULT uuid_generate_v4(),
  51. user_id UUID NOT NULL REFERENCES {self._get_table_name(PostgresUserHandler.TABLE_NAME)}(id) ON DELETE CASCADE,
  52. public_key TEXT UNIQUE NOT NULL,
  53. hashed_key TEXT NOT NULL,
  54. name TEXT,
  55. created_at TIMESTAMPTZ DEFAULT NOW(),
  56. updated_at TIMESTAMPTZ DEFAULT NOW()
  57. );
  58. CREATE INDEX IF NOT EXISTS idx_api_keys_user_id
  59. ON {self._get_table_name(PostgresUserHandler.API_KEYS_TABLE_NAME)}(user_id);
  60. CREATE INDEX IF NOT EXISTS idx_api_keys_public_key
  61. ON {self._get_table_name(PostgresUserHandler.API_KEYS_TABLE_NAME)}(public_key);
  62. """
  63. await self.connection_manager.execute_query(user_table_query)
  64. await self.connection_manager.execute_query(api_keys_table_query)
  65. async def get_user_by_id(self, id: UUID) -> User:
  66. query, _ = (
  67. QueryBuilder(self._get_table_name("users"))
  68. .select(
  69. [
  70. "id",
  71. "email",
  72. "hashed_password",
  73. "is_superuser",
  74. "is_active",
  75. "is_verified",
  76. "created_at",
  77. "updated_at",
  78. "name",
  79. "profile_picture",
  80. "bio",
  81. "collection_ids",
  82. "limits_overrides", # Fetch JSONB column
  83. ]
  84. )
  85. .where("id = $1")
  86. .build()
  87. )
  88. result = await self.connection_manager.fetchrow_query(query, [id])
  89. if not result:
  90. raise R2RException(status_code=404, message="User not found")
  91. return User(
  92. id=result["id"],
  93. email=result["email"],
  94. hashed_password=result["hashed_password"],
  95. is_superuser=result["is_superuser"],
  96. is_active=result["is_active"],
  97. is_verified=result["is_verified"],
  98. created_at=result["created_at"],
  99. updated_at=result["updated_at"],
  100. name=result["name"],
  101. profile_picture=result["profile_picture"],
  102. bio=result["bio"],
  103. collection_ids=result["collection_ids"],
  104. # Add the new field
  105. limits_overrides=json.loads(result["limits_overrides"] or "{}"),
  106. )
  107. async def get_user_by_email(self, email: str) -> User:
  108. query, params = (
  109. QueryBuilder(self._get_table_name("users"))
  110. .select(
  111. [
  112. "id",
  113. "email",
  114. "hashed_password",
  115. "is_superuser",
  116. "is_active",
  117. "is_verified",
  118. "created_at",
  119. "updated_at",
  120. "name",
  121. "profile_picture",
  122. "bio",
  123. "collection_ids",
  124. "limits_overrides",
  125. ]
  126. )
  127. .where("email = $1")
  128. .build()
  129. )
  130. result = await self.connection_manager.fetchrow_query(query, [email])
  131. if not result:
  132. raise R2RException(status_code=404, message="User not found")
  133. return User(
  134. id=result["id"],
  135. email=result["email"],
  136. hashed_password=result["hashed_password"],
  137. is_superuser=result["is_superuser"],
  138. is_active=result["is_active"],
  139. is_verified=result["is_verified"],
  140. created_at=result["created_at"],
  141. updated_at=result["updated_at"],
  142. name=result["name"],
  143. profile_picture=result["profile_picture"],
  144. bio=result["bio"],
  145. collection_ids=result["collection_ids"],
  146. limits_overrides=json.loads(result["limits_overrides"] or "{}"),
  147. )
  148. async def create_user(
  149. self, email: str, password: str, is_superuser: bool = False
  150. ) -> User:
  151. """Create a new user."""
  152. try:
  153. existing = await self.get_user_by_email(email)
  154. if existing:
  155. raise R2RException(
  156. status_code=400,
  157. message="User with this email already exists",
  158. )
  159. except R2RException as e:
  160. if e.status_code != 404:
  161. raise e
  162. hashed_password = self.crypto_provider.get_password_hash(password) # type: ignore
  163. query, params = (
  164. QueryBuilder(self._get_table_name(self.TABLE_NAME))
  165. .insert(
  166. {
  167. "email": email,
  168. "id": generate_user_id(email),
  169. "is_superuser": is_superuser,
  170. "hashed_password": hashed_password,
  171. "collection_ids": [],
  172. "limits_overrides": None,
  173. }
  174. )
  175. .returning(
  176. [
  177. "id",
  178. "email",
  179. "is_superuser",
  180. "is_active",
  181. "is_verified",
  182. "created_at",
  183. "updated_at",
  184. "collection_ids",
  185. "limits_overrides",
  186. ]
  187. )
  188. .build()
  189. )
  190. result = await self.connection_manager.fetchrow_query(query, params)
  191. if not result:
  192. raise R2RException(
  193. status_code=500,
  194. message="Failed to create user",
  195. )
  196. return User(
  197. id=result["id"],
  198. email=result["email"],
  199. is_superuser=result["is_superuser"],
  200. is_active=result["is_active"],
  201. is_verified=result["is_verified"],
  202. created_at=result["created_at"],
  203. updated_at=result["updated_at"],
  204. collection_ids=result["collection_ids"] or [],
  205. hashed_password=hashed_password,
  206. limits_overrides=json.loads(result["limits_overrides"] or "{}"),
  207. name=None,
  208. bio=None,
  209. profile_picture=None,
  210. )
  211. async def update_user(
  212. self, user: User, merge_limits: bool = False
  213. ) -> User:
  214. """
  215. Update user information including limits_overrides.
  216. Args:
  217. user: User object containing updated information
  218. merge_limits: If True, will merge existing limits_overrides with new ones.
  219. If False, will overwrite existing limits_overrides.
  220. Returns:
  221. Updated User object
  222. """
  223. # Get current user if we need to merge limits or get hashed password
  224. current_user = None
  225. try:
  226. current_user = await self.get_user_by_id(user.id)
  227. except R2RException:
  228. raise R2RException(status_code=404, message="User not found")
  229. # Merge or replace limits_overrides
  230. final_limits = user.limits_overrides
  231. if (
  232. merge_limits
  233. and current_user.limits_overrides
  234. and user.limits_overrides
  235. ):
  236. final_limits = {
  237. **current_user.limits_overrides,
  238. **user.limits_overrides,
  239. }
  240. query = f"""
  241. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  242. SET email = $1,
  243. is_superuser = $2,
  244. is_active = $3,
  245. is_verified = $4,
  246. updated_at = NOW(),
  247. name = $5,
  248. profile_picture = $6,
  249. bio = $7,
  250. collection_ids = $8,
  251. limits_overrides = $9::jsonb
  252. WHERE id = $10
  253. RETURNING id, email, is_superuser, is_active, is_verified,
  254. created_at, updated_at, name, profile_picture, bio,
  255. collection_ids, limits_overrides, hashed_password
  256. """
  257. result = await self.connection_manager.fetchrow_query(
  258. query,
  259. [
  260. user.email,
  261. user.is_superuser,
  262. user.is_active,
  263. user.is_verified,
  264. user.name,
  265. user.profile_picture,
  266. user.bio,
  267. user.collection_ids or [], # Ensure null becomes empty array
  268. json.dumps(final_limits), # Already handled null case
  269. user.id,
  270. ],
  271. )
  272. if not result:
  273. raise HTTPException(
  274. status_code=500,
  275. detail="Failed to update user",
  276. )
  277. return User(
  278. id=result["id"],
  279. email=result["email"],
  280. hashed_password=result[
  281. "hashed_password"
  282. ], # Include hashed_password
  283. is_superuser=result["is_superuser"],
  284. is_active=result["is_active"],
  285. is_verified=result["is_verified"],
  286. created_at=result["created_at"],
  287. updated_at=result["updated_at"],
  288. name=result["name"],
  289. profile_picture=result["profile_picture"],
  290. bio=result["bio"],
  291. collection_ids=result["collection_ids"]
  292. or [], # Ensure null becomes empty array
  293. limits_overrides=json.loads(
  294. result["limits_overrides"] or "{}"
  295. ), # Can be null
  296. )
  297. async def delete_user_relational(self, id: UUID) -> None:
  298. """Delete a user and update related records."""
  299. # Get the collections the user belongs to
  300. collection_query, params = (
  301. QueryBuilder(self._get_table_name(self.TABLE_NAME))
  302. .select(["collection_ids"])
  303. .where("id = $1")
  304. .build()
  305. )
  306. collection_result = await self.connection_manager.fetchrow_query(
  307. collection_query, [id]
  308. )
  309. if not collection_result:
  310. raise R2RException(status_code=404, message="User not found")
  311. # Update documents query
  312. doc_update_query, doc_params = (
  313. QueryBuilder(self._get_table_name("documents"))
  314. .update({"id": None})
  315. .where("id = $1")
  316. .build()
  317. )
  318. await self.connection_manager.execute_query(doc_update_query, [id])
  319. # Delete user query
  320. delete_query, del_params = (
  321. QueryBuilder(self._get_table_name(self.TABLE_NAME))
  322. .delete()
  323. .where("id = $1")
  324. .returning(["id"])
  325. .build()
  326. )
  327. result = await self.connection_manager.fetchrow_query(
  328. delete_query, [id]
  329. )
  330. if not result:
  331. raise R2RException(status_code=404, message="User not found")
  332. async def update_user_password(self, id: UUID, new_hashed_password: str):
  333. query = f"""
  334. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  335. SET hashed_password = $1, updated_at = NOW()
  336. WHERE id = $2
  337. """
  338. await self.connection_manager.execute_query(
  339. query, [new_hashed_password, id]
  340. )
  341. async def get_all_users(self) -> list[User]:
  342. """Get all users with minimal information."""
  343. query, params = (
  344. QueryBuilder(self._get_table_name(self.TABLE_NAME))
  345. .select(
  346. [
  347. "id",
  348. "email",
  349. "is_superuser",
  350. "is_active",
  351. "is_verified",
  352. "created_at",
  353. "updated_at",
  354. "collection_ids",
  355. "hashed_password",
  356. "limits_overrides",
  357. "name",
  358. "bio",
  359. "profile_picture",
  360. ]
  361. )
  362. .build()
  363. )
  364. results = await self.connection_manager.fetch_query(query, params)
  365. return [
  366. User(
  367. id=result["id"],
  368. email=result["email"],
  369. hashed_password=result["hashed_password"],
  370. is_superuser=result["is_superuser"],
  371. is_active=result["is_active"],
  372. is_verified=result["is_verified"],
  373. created_at=result["created_at"],
  374. updated_at=result["updated_at"],
  375. collection_ids=result["collection_ids"] or [],
  376. limits_overrides=json.loads(
  377. result["limits_overrides"] or "{}"
  378. ),
  379. name=result["name"],
  380. bio=result["bio"],
  381. profile_picture=result["profile_picture"],
  382. )
  383. for result in results
  384. ]
  385. async def store_verification_code(
  386. self, id: UUID, verification_code: str, expiry: datetime
  387. ):
  388. query = f"""
  389. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  390. SET verification_code = $1, verification_code_expiry = $2
  391. WHERE id = $3
  392. """
  393. await self.connection_manager.execute_query(
  394. query, [verification_code, expiry, id]
  395. )
  396. async def verify_user(self, verification_code: str) -> None:
  397. query = f"""
  398. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  399. SET is_verified = TRUE, verification_code = NULL, verification_code_expiry = NULL
  400. WHERE verification_code = $1 AND verification_code_expiry > NOW()
  401. RETURNING id
  402. """
  403. result = await self.connection_manager.fetchrow_query(
  404. query, [verification_code]
  405. )
  406. if not result:
  407. raise R2RException(
  408. status_code=400, message="Invalid or expired verification code"
  409. )
  410. async def remove_verification_code(self, verification_code: str):
  411. query = f"""
  412. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  413. SET verification_code = NULL, verification_code_expiry = NULL
  414. WHERE verification_code = $1
  415. """
  416. await self.connection_manager.execute_query(query, [verification_code])
  417. async def expire_verification_code(self, id: UUID):
  418. query = f"""
  419. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  420. SET verification_code_expiry = NOW() - INTERVAL '1 day'
  421. WHERE id = $1
  422. """
  423. await self.connection_manager.execute_query(query, [id])
  424. async def store_reset_token(
  425. self, id: UUID, reset_token: str, expiry: datetime
  426. ):
  427. query = f"""
  428. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  429. SET reset_token = $1, reset_token_expiry = $2
  430. WHERE id = $3
  431. """
  432. await self.connection_manager.execute_query(
  433. query, [reset_token, expiry, id]
  434. )
  435. async def get_user_id_by_reset_token(
  436. self, reset_token: str
  437. ) -> Optional[UUID]:
  438. query = f"""
  439. SELECT id FROM {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  440. WHERE reset_token = $1 AND reset_token_expiry > NOW()
  441. """
  442. result = await self.connection_manager.fetchrow_query(
  443. query, [reset_token]
  444. )
  445. return result["id"] if result else None
  446. async def remove_reset_token(self, id: UUID):
  447. query = f"""
  448. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  449. SET reset_token = NULL, reset_token_expiry = NULL
  450. WHERE id = $1
  451. """
  452. await self.connection_manager.execute_query(query, [id])
  453. async def remove_user_from_all_collections(self, id: UUID):
  454. query = f"""
  455. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  456. SET collection_ids = ARRAY[]::UUID[]
  457. WHERE id = $1
  458. """
  459. await self.connection_manager.execute_query(query, [id])
  460. async def add_user_to_collection(
  461. self, id: UUID, collection_id: UUID
  462. ) -> bool:
  463. # Check if the user exists
  464. if not await self.get_user_by_id(id):
  465. raise R2RException(status_code=404, message="User not found")
  466. # Check if the collection exists
  467. if not await self._collection_exists(collection_id):
  468. raise R2RException(status_code=404, message="Collection not found")
  469. query = f"""
  470. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  471. SET collection_ids = array_append(collection_ids, $1)
  472. WHERE id = $2 AND NOT ($1 = ANY(collection_ids))
  473. RETURNING id
  474. """
  475. result = await self.connection_manager.fetchrow_query(
  476. query, [collection_id, id]
  477. )
  478. if not result:
  479. raise R2RException(
  480. status_code=400, message="User already in collection"
  481. )
  482. update_collection_query = f"""
  483. UPDATE {self._get_table_name('collections')}
  484. SET user_count = user_count + 1
  485. WHERE id = $1
  486. """
  487. await self.connection_manager.execute_query(
  488. query=update_collection_query,
  489. params=[collection_id],
  490. )
  491. return True
  492. async def remove_user_from_collection(
  493. self, id: UUID, collection_id: UUID
  494. ) -> bool:
  495. if not await self.get_user_by_id(id):
  496. raise R2RException(status_code=404, message="User not found")
  497. query = f"""
  498. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  499. SET collection_ids = array_remove(collection_ids, $1)
  500. WHERE id = $2 AND $1 = ANY(collection_ids)
  501. RETURNING id
  502. """
  503. result = await self.connection_manager.fetchrow_query(
  504. query, [collection_id, id]
  505. )
  506. if not result:
  507. raise R2RException(
  508. status_code=400,
  509. message="User is not a member of the specified collection",
  510. )
  511. return True
  512. async def get_users_in_collection(
  513. self, collection_id: UUID, offset: int, limit: int
  514. ) -> dict[str, list[User] | int]:
  515. """Get all users in a specific collection with pagination."""
  516. if not await self._collection_exists(collection_id):
  517. raise R2RException(status_code=404, message="Collection not found")
  518. query, params = (
  519. QueryBuilder(self._get_table_name(self.TABLE_NAME))
  520. .select(
  521. [
  522. "id",
  523. "email",
  524. "is_active",
  525. "is_superuser",
  526. "created_at",
  527. "updated_at",
  528. "is_verified",
  529. "collection_ids",
  530. "name",
  531. "bio",
  532. "profile_picture",
  533. "hashed_password",
  534. "limits_overrides",
  535. "COUNT(*) OVER() AS total_entries",
  536. ]
  537. )
  538. .where("$1 = ANY(collection_ids)")
  539. .order_by("name")
  540. .offset("$2")
  541. .limit("$3" if limit != -1 else None)
  542. .build()
  543. )
  544. conditions = [collection_id, offset]
  545. if limit != -1:
  546. conditions.append(limit)
  547. results = await self.connection_manager.fetch_query(query, conditions)
  548. users_list = [
  549. User(
  550. id=row["id"],
  551. email=row["email"],
  552. is_active=row["is_active"],
  553. is_superuser=row["is_superuser"],
  554. created_at=row["created_at"],
  555. updated_at=row["updated_at"],
  556. is_verified=row["is_verified"],
  557. collection_ids=row["collection_ids"] or [],
  558. name=row["name"],
  559. bio=row["bio"],
  560. profile_picture=row["profile_picture"],
  561. hashed_password=row["hashed_password"],
  562. limits_overrides=json.loads(row["limits_overrides"] or "{}"),
  563. )
  564. for row in results
  565. ]
  566. total_entries = results[0]["total_entries"] if results else 0
  567. return {"results": users_list, "total_entries": total_entries}
  568. async def mark_user_as_superuser(self, id: UUID):
  569. query = f"""
  570. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  571. SET is_superuser = TRUE, is_verified = TRUE,
  572. verification_code = NULL, verification_code_expiry = NULL
  573. WHERE id = $1
  574. """
  575. await self.connection_manager.execute_query(query, [id])
  576. async def get_user_id_by_verification_code(
  577. self, verification_code: str
  578. ) -> UUID:
  579. query = f"""
  580. SELECT id FROM {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  581. WHERE verification_code = $1 AND verification_code_expiry > NOW()
  582. """
  583. result = await self.connection_manager.fetchrow_query(
  584. query, [verification_code]
  585. )
  586. if not result:
  587. raise R2RException(
  588. status_code=400, message="Invalid or expired verification code"
  589. )
  590. return result["id"]
  591. async def mark_user_as_verified(self, id: UUID):
  592. query = f"""
  593. UPDATE {self._get_table_name(PostgresUserHandler.TABLE_NAME)}
  594. SET is_verified = TRUE,
  595. verification_code = NULL,
  596. verification_code_expiry = NULL
  597. WHERE id = $1
  598. """
  599. await self.connection_manager.execute_query(query, [id])
  600. async def get_users_overview(
  601. self,
  602. offset: int,
  603. limit: int,
  604. user_ids: Optional[list[UUID]] = None,
  605. ) -> dict[str, list[User] | int]:
  606. """
  607. Return users with document usage and total entries.
  608. """
  609. query = f"""
  610. WITH user_document_ids AS (
  611. SELECT
  612. u.id as user_id,
  613. ARRAY_AGG(d.id) FILTER (WHERE d.id IS NOT NULL) AS doc_ids
  614. FROM {self._get_table_name(PostgresUserHandler.TABLE_NAME)} u
  615. LEFT JOIN {self._get_table_name('documents')} d ON u.id = d.owner_id
  616. GROUP BY u.id
  617. ),
  618. user_docs AS (
  619. SELECT
  620. u.id,
  621. u.email,
  622. u.is_superuser,
  623. u.is_active,
  624. u.is_verified,
  625. u.name,
  626. u.bio,
  627. u.profile_picture,
  628. u.collection_ids,
  629. u.created_at,
  630. u.updated_at,
  631. COUNT(d.id) AS num_files,
  632. COALESCE(SUM(d.size_in_bytes), 0) AS total_size_in_bytes,
  633. ud.doc_ids as document_ids
  634. FROM {self._get_table_name(PostgresUserHandler.TABLE_NAME)} u
  635. LEFT JOIN {self._get_table_name('documents')} d ON u.id = d.owner_id
  636. LEFT JOIN user_document_ids ud ON u.id = ud.user_id
  637. {' WHERE u.id = ANY($3::uuid[])' if user_ids else ''}
  638. GROUP BY u.id, u.email, u.is_superuser, u.is_active, u.is_verified,
  639. u.created_at, u.updated_at, u.collection_ids, ud.doc_ids
  640. )
  641. SELECT
  642. user_docs.*,
  643. COUNT(*) OVER() AS total_entries
  644. FROM user_docs
  645. ORDER BY email
  646. OFFSET $1
  647. """
  648. params: list = [offset]
  649. if limit != -1:
  650. query += " LIMIT $2"
  651. params.append(limit)
  652. if user_ids:
  653. params.append(user_ids)
  654. results = await self.connection_manager.fetch_query(query, params)
  655. if not results:
  656. raise R2RException(status_code=404, message="No users found")
  657. users_list = []
  658. for row in results:
  659. users_list.append(
  660. User(
  661. id=row["id"],
  662. email=row["email"],
  663. is_superuser=row["is_superuser"],
  664. is_active=row["is_active"],
  665. is_verified=row["is_verified"],
  666. name=row["name"],
  667. bio=row["bio"],
  668. created_at=row["created_at"],
  669. updated_at=row["updated_at"],
  670. profile_picture=row["profile_picture"],
  671. collection_ids=row["collection_ids"] or [],
  672. num_files=row["num_files"],
  673. total_size_in_bytes=row["total_size_in_bytes"],
  674. document_ids=(
  675. list(row["document_ids"])
  676. if row["document_ids"]
  677. else []
  678. ),
  679. )
  680. )
  681. total_entries = results[0]["total_entries"]
  682. return {"results": users_list, "total_entries": total_entries}
  683. async def _collection_exists(self, collection_id: UUID) -> bool:
  684. """Check if a collection exists."""
  685. query = f"""
  686. SELECT 1 FROM {self._get_table_name(PostgresCollectionsHandler.TABLE_NAME)}
  687. WHERE id = $1
  688. """
  689. result = await self.connection_manager.fetchrow_query(
  690. query, [collection_id]
  691. )
  692. return result is not None
  693. async def get_user_validation_data(
  694. self,
  695. user_id: UUID,
  696. ) -> dict:
  697. """
  698. Get verification data for a specific user.
  699. This method should be called after superuser authorization has been verified.
  700. """
  701. query = f"""
  702. SELECT
  703. verification_code,
  704. verification_code_expiry,
  705. reset_token,
  706. reset_token_expiry
  707. FROM {self._get_table_name("users")}
  708. WHERE id = $1
  709. """
  710. result = await self.connection_manager.fetchrow_query(query, [user_id])
  711. if not result:
  712. raise R2RException(status_code=404, message="User not found")
  713. return {
  714. "verification_data": {
  715. "verification_code": result["verification_code"],
  716. "verification_code_expiry": (
  717. result["verification_code_expiry"].isoformat()
  718. if result["verification_code_expiry"]
  719. else None
  720. ),
  721. "reset_token": result["reset_token"],
  722. "reset_token_expiry": (
  723. result["reset_token_expiry"].isoformat()
  724. if result["reset_token_expiry"]
  725. else None
  726. ),
  727. }
  728. }
  729. # API Key methods
  730. async def store_user_api_key(
  731. self,
  732. user_id: UUID,
  733. key_id: str,
  734. hashed_key: str,
  735. name: Optional[str] = None,
  736. ) -> UUID:
  737. """Store a new API key for a user."""
  738. query = f"""
  739. INSERT INTO {self._get_table_name(PostgresUserHandler.API_KEYS_TABLE_NAME)}
  740. (user_id, public_key, hashed_key, name)
  741. VALUES ($1, $2, $3, $4)
  742. RETURNING id
  743. """
  744. result = await self.connection_manager.fetchrow_query(
  745. query, [user_id, key_id, hashed_key, name]
  746. )
  747. if not result:
  748. raise R2RException(
  749. status_code=500, message="Failed to store API key"
  750. )
  751. return result["id"]
  752. async def get_api_key_record(self, key_id: str) -> Optional[dict]:
  753. """
  754. Get API key record by 'public_key' and update 'updated_at' to now.
  755. Returns { "user_id", "hashed_key" } or None if not found.
  756. """
  757. query = f"""
  758. UPDATE {self._get_table_name(PostgresUserHandler.API_KEYS_TABLE_NAME)}
  759. SET updated_at = NOW()
  760. WHERE public_key = $1
  761. RETURNING user_id, hashed_key
  762. """
  763. result = await self.connection_manager.fetchrow_query(query, [key_id])
  764. if not result:
  765. return None
  766. return {
  767. "user_id": result["user_id"],
  768. "hashed_key": result["hashed_key"],
  769. }
  770. async def get_user_api_keys(self, user_id: UUID) -> list[dict]:
  771. """Get all API keys for a user."""
  772. query = f"""
  773. SELECT id, public_key, name, created_at, updated_at
  774. FROM {self._get_table_name(PostgresUserHandler.API_KEYS_TABLE_NAME)}
  775. WHERE user_id = $1
  776. ORDER BY created_at DESC
  777. """
  778. results = await self.connection_manager.fetch_query(query, [user_id])
  779. return [
  780. {
  781. "key_id": str(row["id"]),
  782. "public_key": row["public_key"],
  783. "name": row["name"] or "",
  784. "updated_at": row["updated_at"],
  785. }
  786. for row in results
  787. ]
  788. async def delete_api_key(self, user_id: UUID, key_id: UUID) -> dict:
  789. """Delete a specific API key."""
  790. query = f"""
  791. DELETE FROM {self._get_table_name(PostgresUserHandler.API_KEYS_TABLE_NAME)}
  792. WHERE id = $1 AND user_id = $2
  793. RETURNING id, public_key, name
  794. """
  795. result = await self.connection_manager.fetchrow_query(
  796. query, [key_id, user_id]
  797. )
  798. if result is None:
  799. raise R2RException(status_code=404, message="API key not found")
  800. return {
  801. "key_id": str(result["id"]),
  802. "public_key": str(result["public_key"]),
  803. "name": result["name"] or "",
  804. }
  805. async def update_api_key_name(
  806. self, user_id: UUID, key_id: UUID, name: str
  807. ) -> bool:
  808. """Update the name of an existing API key."""
  809. query = f"""
  810. UPDATE {self._get_table_name(PostgresUserHandler.API_KEYS_TABLE_NAME)}
  811. SET name = $1, updated_at = NOW()
  812. WHERE id = $2 AND user_id = $3
  813. RETURNING id
  814. """
  815. result = await self.connection_manager.fetchrow_query(
  816. query, [name, key_id, user_id]
  817. )
  818. if result is None:
  819. raise R2RException(status_code=404, message="API key not found")
  820. return True
  821. async def export_to_csv(
  822. self,
  823. columns: Optional[list[str]] = None,
  824. filters: Optional[dict] = None,
  825. include_header: bool = True,
  826. ) -> tuple[str, IO]:
  827. """
  828. Creates a CSV file from the PostgreSQL data and returns the path to the temp file.
  829. """
  830. valid_columns = {
  831. "id",
  832. "email",
  833. "is_superuser",
  834. "is_active",
  835. "is_verified",
  836. "name",
  837. "bio",
  838. "collection_ids",
  839. "created_at",
  840. "updated_at",
  841. }
  842. if not columns:
  843. columns = list(valid_columns)
  844. elif invalid_cols := set(columns) - valid_columns:
  845. raise ValueError(f"Invalid columns: {invalid_cols}")
  846. select_stmt = f"""
  847. SELECT
  848. id::text,
  849. email,
  850. is_superuser,
  851. is_active,
  852. is_verified,
  853. name,
  854. bio,
  855. to_char(created_at, 'YYYY-MM-DD HH24:MI:SS') AS created_at,
  856. to_char(updated_at, 'YYYY-MM-DD HH24:MI:SS') AS updated_at
  857. FROM {self._get_table_name(self.TABLE_NAME)}
  858. """
  859. params = []
  860. if filters:
  861. conditions = []
  862. param_index = 1
  863. for field, value in filters.items():
  864. if field not in valid_columns:
  865. continue
  866. if isinstance(value, dict):
  867. for op, val in value.items():
  868. if op == "$eq":
  869. conditions.append(f"{field} = ${param_index}")
  870. params.append(val)
  871. param_index += 1
  872. elif op == "$gt":
  873. conditions.append(f"{field} > ${param_index}")
  874. params.append(val)
  875. param_index += 1
  876. elif op == "$lt":
  877. conditions.append(f"{field} < ${param_index}")
  878. params.append(val)
  879. param_index += 1
  880. else:
  881. # Direct equality
  882. conditions.append(f"{field} = ${param_index}")
  883. params.append(value)
  884. param_index += 1
  885. if conditions:
  886. select_stmt = f"{select_stmt} WHERE {' AND '.join(conditions)}"
  887. select_stmt = f"{select_stmt} ORDER BY created_at DESC"
  888. temp_file = None
  889. try:
  890. temp_file = tempfile.NamedTemporaryFile(
  891. mode="w", delete=True, suffix=".csv"
  892. )
  893. writer = csv.writer(temp_file, quoting=csv.QUOTE_ALL)
  894. async with self.connection_manager.pool.get_connection() as conn: # type: ignore
  895. async with conn.transaction():
  896. cursor = await conn.cursor(select_stmt, *params)
  897. if include_header:
  898. writer.writerow(columns)
  899. chunk_size = 1000
  900. while True:
  901. rows = await cursor.fetch(chunk_size)
  902. if not rows:
  903. break
  904. for row in rows:
  905. writer.writerow(row)
  906. temp_file.flush()
  907. return temp_file.name, temp_file
  908. except Exception as e:
  909. if temp_file:
  910. temp_file.close()
  911. raise HTTPException(
  912. status_code=500,
  913. detail=f"Failed to export data: {str(e)}",
  914. )