database.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. import sys
  2. import asyncclick as click
  3. from ..utils.database_utils import (
  4. check_database_connection,
  5. get_database_url_from_env,
  6. run_alembic_command,
  7. )
  8. @click.group()
  9. def db():
  10. """Database management commands."""
  11. pass
  12. @db.command()
  13. @click.option(
  14. "--schema", help="Schema name to operate on (defaults to R2R_PROJECT_NAME)"
  15. )
  16. async def history(schema):
  17. """Show database migration history for a specific schema."""
  18. try:
  19. db_url = get_database_url_from_env(False)
  20. if not await check_database_connection(db_url):
  21. click.secho(
  22. "Database connection failed. Please check your environment variables.",
  23. fg="red",
  24. )
  25. sys.exit(1)
  26. result = await run_alembic_command("history", schema_name=schema)
  27. if result != 0:
  28. click.secho("Failed to get migration history.", fg="red")
  29. sys.exit(1)
  30. except Exception as e:
  31. click.secho(f"Error getting migration history: {str(e)}", fg="red")
  32. sys.exit(1)
  33. @db.command()
  34. @click.option(
  35. "--schema", help="Schema name to operate on (defaults to R2R_PROJECT_NAME)"
  36. )
  37. async def current(schema):
  38. """Show current database revision for a specific schema."""
  39. try:
  40. db_url = get_database_url_from_env(False)
  41. if not await check_database_connection(db_url):
  42. click.secho(
  43. "Database connection failed. Please check your environment variables.",
  44. fg="red",
  45. )
  46. sys.exit(1)
  47. result = await run_alembic_command("current", schema_name=schema)
  48. if result != 0:
  49. click.secho("Failed to get current revision.", fg="red")
  50. sys.exit(1)
  51. except Exception as e:
  52. click.secho(f"Error getting current revision: {str(e)}", fg="red")
  53. sys.exit(1)
  54. @db.command()
  55. @click.option(
  56. "--schema", help="Schema name to operate on (defaults to R2R_PROJECT_NAME)"
  57. )
  58. @click.option("--revision", help="Upgrade to a specific revision")
  59. async def upgrade(schema, revision):
  60. """Upgrade database schema to the latest revision or a specific revision."""
  61. try:
  62. db_url = get_database_url_from_env(False)
  63. if not await check_database_connection(db_url):
  64. click.secho(
  65. "Database connection failed. Please check your environment variables.",
  66. fg="red",
  67. )
  68. sys.exit(1)
  69. click.echo(
  70. f"Running database upgrade for schema {schema or 'default'}..."
  71. )
  72. print(f"Upgrading revision = {revision}")
  73. command = f"upgrade {revision}" if revision else "upgrade"
  74. result = await run_alembic_command(command, schema_name=schema)
  75. if result == 0:
  76. click.secho("Database upgrade completed successfully.", fg="green")
  77. else:
  78. click.secho("Database upgrade failed.", fg="red")
  79. sys.exit(1)
  80. except Exception as e:
  81. click.secho(f"Unexpected error: {str(e)}", fg="red")
  82. sys.exit(1)
  83. @db.command()
  84. @click.option(
  85. "--schema", help="Schema name to operate on (defaults to R2R_PROJECT_NAME)"
  86. )
  87. @click.option("--revision", help="Downgrade to a specific revision")
  88. async def downgrade(schema, revision):
  89. """Downgrade database schema to the previous revision or a specific revision."""
  90. if not revision:
  91. if not click.confirm(
  92. "No revision specified. This will downgrade the database by one revision. Continue?"
  93. ):
  94. return
  95. try:
  96. db_url = get_database_url_from_env(log=False)
  97. if not await check_database_connection(db_url):
  98. click.secho(
  99. "Database connection failed. Please check your environment variables.",
  100. fg="red",
  101. )
  102. sys.exit(1)
  103. click.echo(
  104. f"Running database downgrade for schema {schema or 'default'}..."
  105. )
  106. command = f"downgrade {revision}" if revision else "downgrade"
  107. result = await run_alembic_command(command, schema_name=schema)
  108. if result == 0:
  109. click.secho(
  110. "Database downgrade completed successfully.", fg="green"
  111. )
  112. else:
  113. click.secho("Database downgrade failed.", fg="red")
  114. sys.exit(1)
  115. except Exception as e:
  116. click.secho(f"Unexpected error: {str(e)}", fg="red")
  117. sys.exit(1)