d342e632358a_migrate_to_asyncpg.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. """migrate_to_asyncpg
  2. Revision ID: d342e632358a
  3. Revises:
  4. Create Date: 2024-10-22 11:55:49.461015
  5. """
  6. import os
  7. from typing import Sequence, Union
  8. import sqlalchemy as sa
  9. from alembic import op
  10. from sqlalchemy import inspect
  11. from sqlalchemy.dialects import postgresql
  12. from sqlalchemy.types import UserDefinedType
  13. # revision identifiers, used by Alembic.
  14. revision: str = "d342e632358a"
  15. down_revision: Union[str, None] = None
  16. branch_labels: Union[str, Sequence[str], None] = None
  17. depends_on: Union[str, Sequence[str], None] = None
  18. project_name = os.getenv("R2R_PROJECT_NAME") or "r2r_default"
  19. new_vector_table_name = "vectors"
  20. old_vector_table_name = project_name
  21. class Vector(UserDefinedType):
  22. def get_col_spec(self, **kw):
  23. return "vector"
  24. def check_if_upgrade_needed():
  25. """Check if the upgrade has already been applied"""
  26. # Get database connection
  27. connection = op.get_bind()
  28. inspector = inspect(connection)
  29. # Check if the new vectors table exists
  30. has_new_table = inspector.has_table(
  31. new_vector_table_name, schema=project_name
  32. )
  33. if has_new_table:
  34. print(
  35. f"Migration not needed: '{new_vector_table_name}' table already exists"
  36. )
  37. return False
  38. print(f"Migration needed: '{new_vector_table_name}' table does not exist")
  39. return True
  40. def upgrade() -> None:
  41. if check_if_upgrade_needed():
  42. # Create required extensions
  43. op.execute("CREATE EXTENSION IF NOT EXISTS vector")
  44. op.execute("CREATE EXTENSION IF NOT EXISTS pg_trgm")
  45. op.execute("CREATE EXTENSION IF NOT EXISTS btree_gin")
  46. # KG table migrations
  47. op.execute(
  48. f"ALTER TABLE IF EXISTS {project_name}.entity_raw RENAME TO chunk_entity"
  49. )
  50. op.execute(
  51. f"ALTER TABLE IF EXISTS {project_name}.triple_raw RENAME TO chunk_triple"
  52. )
  53. op.execute(
  54. f"ALTER TABLE IF EXISTS {project_name}.entity_embedding RENAME TO document_entity"
  55. )
  56. op.execute(
  57. f"ALTER TABLE IF EXISTS {project_name}.community RENAME TO community_info"
  58. )
  59. # Create the new table
  60. op.create_table(
  61. new_vector_table_name,
  62. sa.Column("extraction_id", postgresql.UUID(), nullable=False),
  63. sa.Column("document_id", postgresql.UUID(), nullable=False),
  64. sa.Column("user_id", postgresql.UUID(), nullable=False),
  65. sa.Column(
  66. "collection_ids",
  67. postgresql.ARRAY(postgresql.UUID()),
  68. server_default="{}",
  69. ),
  70. sa.Column("vec", Vector), # This will be handled as a vector type
  71. sa.Column("text", sa.Text(), nullable=True),
  72. sa.Column(
  73. "fts",
  74. postgresql.TSVECTOR,
  75. nullable=False,
  76. server_default=sa.text(
  77. "to_tsvector('english'::regconfig, '')"
  78. ),
  79. ),
  80. sa.Column(
  81. "metadata",
  82. postgresql.JSONB(),
  83. server_default="{}",
  84. nullable=False,
  85. ),
  86. sa.PrimaryKeyConstraint("extraction_id"),
  87. schema=project_name,
  88. )
  89. # Create indices
  90. op.create_index(
  91. "idx_vectors_document_id",
  92. new_vector_table_name,
  93. ["document_id"],
  94. schema=project_name,
  95. )
  96. op.create_index(
  97. "idx_vectors_user_id",
  98. new_vector_table_name,
  99. ["user_id"],
  100. schema=project_name,
  101. )
  102. op.create_index(
  103. "idx_vectors_collection_ids",
  104. new_vector_table_name,
  105. ["collection_ids"],
  106. schema=project_name,
  107. postgresql_using="gin",
  108. )
  109. op.create_index(
  110. "idx_vectors_fts",
  111. new_vector_table_name,
  112. ["fts"],
  113. schema=project_name,
  114. postgresql_using="gin",
  115. )
  116. # Migrate data from old table (assuming old table name is 'old_vectors')
  117. # Note: You'll need to replace 'old_schema' and 'old_vectors' with your actual names
  118. op.execute(
  119. f"""
  120. INSERT INTO {project_name}.{new_vector_table_name}
  121. (extraction_id, document_id, user_id, collection_ids, vec, text, metadata)
  122. SELECT
  123. extraction_id,
  124. document_id,
  125. user_id,
  126. collection_ids,
  127. vec,
  128. text,
  129. metadata
  130. FROM {project_name}.{old_vector_table_name}
  131. """
  132. )
  133. # Verify data migration
  134. op.execute(
  135. f"""
  136. SELECT COUNT(*) old_count FROM {project_name}.{old_vector_table_name};
  137. SELECT COUNT(*) new_count FROM {project_name}.{new_vector_table_name};
  138. """
  139. )
  140. # If we get here, migration was successful, so drop the old table
  141. op.execute(
  142. f"""
  143. DROP TABLE IF EXISTS {project_name}.{old_vector_table_name};
  144. """
  145. )
  146. def downgrade() -> None:
  147. # Drop all indices
  148. op.drop_index("idx_vectors_fts", schema=project_name)
  149. op.drop_index("idx_vectors_collection_ids", schema=project_name)
  150. op.drop_index("idx_vectors_user_id", schema=project_name)
  151. op.drop_index("idx_vectors_document_id", schema=project_name)
  152. # Drop the new table
  153. op.drop_table(new_vector_table_name, schema=project_name)
  154. # Revert KG table migrations
  155. op.execute(
  156. f"ALTER TABLE IF EXISTS {project_name}.chunk_entity RENAME TO entity_raw"
  157. )
  158. op.execute(
  159. f"ALTER TABLE IF EXISTS {project_name}.chunk_relationship RENAME TO relationship_raw"
  160. )
  161. op.execute(
  162. f"ALTER TABLE IF EXISTS {project_name}.document_entity RENAME TO entity_embedding"
  163. )
  164. op.execute(
  165. f"ALTER TABLE IF EXISTS {project_name}.community_info RENAME TO community"
  166. )