mirror of
https://github.com/infiniflow/ragflow.git
synced 2026-06-29 15:31:05 +08:00
Feat: mark mysql migrations as applied (#15504)
### What problem does this PR solve? mark mysql migrations as applied ### Type of change - [x] New Feature (non-breaking change which adds functionality)
This commit is contained in:
@@ -114,6 +114,24 @@ run_server(){
|
||||
fi
|
||||
}
|
||||
|
||||
ensure_db_init() {
|
||||
echo "Initializing database tables..."
|
||||
"$PY" -c "from api.db.db_models import init_database_tables as init_web_db; init_web_db()"
|
||||
echo "Database tables initialized."
|
||||
}
|
||||
|
||||
run_mysql_migrations() {
|
||||
echo "Running model provider table migrations..."
|
||||
"$PY" tools/scripts/mysql_migration.py --stages tenant_model_provider --config conf/service_conf.yaml --execute
|
||||
"$PY" tools/scripts/mysql_migration.py --stages tenant_model_instance --config conf/service_conf.yaml --execute
|
||||
"$PY" tools/scripts/mysql_migration.py --stages tenant_model --config conf/service_conf.yaml --execute
|
||||
"$PY" tools/scripts/mysql_migration.py --stages model_id_config --config conf/service_conf.yaml --execute
|
||||
echo "Model provider table migrations completed."
|
||||
}
|
||||
|
||||
ensure_db_init
|
||||
run_mysql_migrations
|
||||
|
||||
# Start task executors
|
||||
for ((i=0;i<WS;i++))
|
||||
do
|
||||
|
||||
@@ -176,6 +176,47 @@ class MigrationDatabase:
|
||||
)
|
||||
return cursor.fetchone()[0] > 0
|
||||
|
||||
def get_system_setting_value(self, name: str) -> str | None:
|
||||
if not self.table_exists("system_settings"):
|
||||
logger.info("Table 'system_settings' does not exist, migration marker is unavailable")
|
||||
return None
|
||||
cursor = self.execute_sql(
|
||||
"SELECT `value` FROM `system_settings` WHERE `name` = %s",
|
||||
(name,),
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
return row[0] if row else None
|
||||
|
||||
def upsert_system_setting(self, name: str, value: str, source: str = "migration", data_type: str = "string"):
|
||||
if not self.table_exists("system_settings"):
|
||||
logger.warning("Table 'system_settings' does not exist, migration marker was not saved")
|
||||
return
|
||||
|
||||
current_ts = int(time.time())
|
||||
self.execute_sql(
|
||||
"""
|
||||
INSERT INTO `system_settings`
|
||||
(`name`, `source`, `data_type`, `value`, `create_time`, `create_date`, `update_time`, `update_date`)
|
||||
VALUES (%s, %s, %s, %s, %s, FROM_UNIXTIME(%s), %s, FROM_UNIXTIME(%s))
|
||||
ON DUPLICATE KEY UPDATE
|
||||
`source` = VALUES(`source`),
|
||||
`data_type` = VALUES(`data_type`),
|
||||
`value` = VALUES(`value`),
|
||||
`update_time` = VALUES(`update_time`),
|
||||
`update_date` = VALUES(`update_date`)
|
||||
""",
|
||||
(
|
||||
name,
|
||||
source,
|
||||
data_type,
|
||||
value,
|
||||
current_ts * 1000,
|
||||
current_ts,
|
||||
current_ts * 1000,
|
||||
current_ts,
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# Define model classes for migration (not importing from api.db.db_models)
|
||||
class BaseModel(Model):
|
||||
@@ -225,11 +266,14 @@ class MigrationStage:
|
||||
description = "Base migration stage"
|
||||
source_tables = []
|
||||
target_tables = []
|
||||
migration_version = None
|
||||
migration_marker_prefix = "mysql_migration"
|
||||
|
||||
def __init__(self, db: MigrationDatabase, dry_run: bool = True, create_table_only: bool = False):
|
||||
self.db = db
|
||||
self.dry_run = dry_run
|
||||
self.create_table_only = create_table_only
|
||||
self._noop_completes_migration = False
|
||||
|
||||
def check(self) -> bool:
|
||||
"""Check if migration is needed"""
|
||||
@@ -243,6 +287,52 @@ class MigrationStage:
|
||||
"""Create target table (override in subclass if needed)"""
|
||||
pass
|
||||
|
||||
def migration_marker_name(self) -> str:
|
||||
return f"{self.migration_marker_prefix}.{self.name}.version"
|
||||
|
||||
def is_migration_version_applied(self) -> bool:
|
||||
if not self.migration_version:
|
||||
return False
|
||||
|
||||
marker_name = self.migration_marker_name()
|
||||
current_version = self.db.get_system_setting_value(marker_name)
|
||||
if current_version == self.migration_version:
|
||||
logger.info(
|
||||
"Stage '%s' already applied at version %s, skipping",
|
||||
self.name,
|
||||
self.migration_version,
|
||||
)
|
||||
return True
|
||||
|
||||
if current_version:
|
||||
logger.info(
|
||||
"Stage '%s' marker version is %s, target version is %s",
|
||||
self.name,
|
||||
current_version,
|
||||
self.migration_version,
|
||||
)
|
||||
return False
|
||||
|
||||
def mark_migration_version_applied(self):
|
||||
if not self.migration_version:
|
||||
return
|
||||
|
||||
self.db.upsert_system_setting(
|
||||
self.migration_marker_name(),
|
||||
self.migration_version,
|
||||
)
|
||||
logger.info(
|
||||
"Marked stage '%s' as applied at version %s",
|
||||
self.name,
|
||||
self.migration_version,
|
||||
)
|
||||
|
||||
def mark_noop_completes_migration(self):
|
||||
self._noop_completes_migration = True
|
||||
|
||||
def noop_completes_migration(self) -> bool:
|
||||
return self._noop_completes_migration
|
||||
|
||||
|
||||
class TenantModelProviderStage(MigrationStage):
|
||||
"""Migrate tenant_llm to tenant_model_provider"""
|
||||
@@ -251,6 +341,7 @@ class TenantModelProviderStage(MigrationStage):
|
||||
description = "Migrate tenant_llm.llm_factory to tenant_model_provider.provider_name"
|
||||
source_tables = ["tenant_llm"]
|
||||
target_tables = ["tenant_model_provider"]
|
||||
migration_version = "1"
|
||||
|
||||
def current_timestamp(self) -> int:
|
||||
return int(time.time())
|
||||
@@ -286,6 +377,7 @@ class TenantModelProviderStage(MigrationStage):
|
||||
count = cursor.fetchone()[0]
|
||||
|
||||
if count == 0:
|
||||
self.mark_noop_completes_migration()
|
||||
logger.info("No new data to migrate from tenant_llm to tenant_model_provider")
|
||||
return False
|
||||
|
||||
@@ -388,6 +480,7 @@ class TenantModelInstanceStage(MigrationStage):
|
||||
description = "Migrate tenant_llm to tenant_model_instance with provider_id lookup"
|
||||
source_tables = ["tenant_llm", "tenant_model_provider"]
|
||||
target_tables = ["tenant_model_instance"]
|
||||
migration_version = "1"
|
||||
|
||||
def current_timestamp(self) -> int:
|
||||
return int(time.time())
|
||||
@@ -438,6 +531,7 @@ class TenantModelInstanceStage(MigrationStage):
|
||||
count = cursor.fetchone()[0]
|
||||
|
||||
if count == 0:
|
||||
self.mark_noop_completes_migration()
|
||||
logger.info("No new data to migrate from tenant_llm to tenant_model_instance")
|
||||
return False
|
||||
|
||||
@@ -553,6 +647,7 @@ class TenantModelStage(MigrationStage):
|
||||
description = "Migrate tenant_llm to tenant_model (status='0' records, plus status='1' for empty-llm factories)"
|
||||
source_tables = ["tenant_llm", "tenant_model_provider", "tenant_model_instance"]
|
||||
target_tables = ["tenant_model"]
|
||||
migration_version = "1"
|
||||
|
||||
@staticmethod
|
||||
def _get_empty_llm_factories() -> list[str]:
|
||||
@@ -640,6 +735,7 @@ class TenantModelStage(MigrationStage):
|
||||
count = cursor.fetchone()[0]
|
||||
|
||||
if count == 0:
|
||||
self.mark_noop_completes_migration()
|
||||
logger.info("No new data to migrate from tenant_llm to tenant_model")
|
||||
return False
|
||||
|
||||
@@ -765,6 +861,7 @@ class ModelIdConfigStage(MigrationStage):
|
||||
|
||||
name = "model_id_config"
|
||||
description = "Normalize stored model IDs in config columns to model@default@provider"
|
||||
migration_version = "1"
|
||||
source_tables = [
|
||||
"tenant",
|
||||
"knowledgebase",
|
||||
@@ -938,6 +1035,7 @@ class ModelIdConfigStage(MigrationStage):
|
||||
def check(self) -> bool:
|
||||
rows, tables = self.count_changes()
|
||||
if rows == 0:
|
||||
self.mark_noop_completes_migration()
|
||||
logger.info("No stored model IDs need normalization")
|
||||
return False
|
||||
logger.info(
|
||||
@@ -1046,6 +1144,10 @@ def run_migration(config: MigrationConfig, stages: list, dry_run: bool = True,
|
||||
stage = stage_cls(db, dry_run=dry_run, create_table_only=create_table_only)
|
||||
|
||||
stage_start = time.time()
|
||||
|
||||
if not create_table_only and stage.is_migration_version_applied():
|
||||
stats.add_stage_stats(stage_name, [], 0, time.time() - stage_start)
|
||||
continue
|
||||
|
||||
# For create_table_only mode, skip check and directly execute
|
||||
if create_table_only:
|
||||
@@ -1055,11 +1157,15 @@ def run_migration(config: MigrationConfig, stages: list, dry_run: bool = True,
|
||||
# Check if migration is needed
|
||||
if not stage.check():
|
||||
logger.info(f"Stage '{stage_name}' check: no migration needed")
|
||||
if not dry_run and stage.noop_completes_migration():
|
||||
stage.mark_migration_version_applied()
|
||||
stats.add_stage_stats(stage_name, [], 0, time.time() - stage_start)
|
||||
continue
|
||||
|
||||
# Execute migration
|
||||
rows, tables = stage.execute()
|
||||
if not dry_run:
|
||||
stage.mark_migration_version_applied()
|
||||
|
||||
stage_duration = time.time() - stage_start
|
||||
|
||||
|
||||
Reference in New Issue
Block a user