From 4018f02d962882993d4f7d348ab739fed8bd8e3b Mon Sep 17 00:00:00 2001 From: buua436 Date: Tue, 2 Jun 2026 15:04:33 +0800 Subject: [PATCH] Feat: mark mysql migrations as applied (#15504) ### What problem does this PR solve? mark mysql migrations as applied ### Type of change - [x] New Feature (non-breaking change which adds functionality) --- docker/launch_backend_service.sh | 18 ++++++ tools/scripts/mysql_migration.py | 106 +++++++++++++++++++++++++++++++ 2 files changed, 124 insertions(+) diff --git a/docker/launch_backend_service.sh b/docker/launch_backend_service.sh index 2f5ddb14c8..b20ddc973c 100755 --- a/docker/launch_backend_service.sh +++ b/docker/launch_backend_service.sh @@ -114,6 +114,24 @@ run_server(){ fi } +ensure_db_init() { + echo "Initializing database tables..." + "$PY" -c "from api.db.db_models import init_database_tables as init_web_db; init_web_db()" + echo "Database tables initialized." +} + +run_mysql_migrations() { + echo "Running model provider table migrations..." + "$PY" tools/scripts/mysql_migration.py --stages tenant_model_provider --config conf/service_conf.yaml --execute + "$PY" tools/scripts/mysql_migration.py --stages tenant_model_instance --config conf/service_conf.yaml --execute + "$PY" tools/scripts/mysql_migration.py --stages tenant_model --config conf/service_conf.yaml --execute + "$PY" tools/scripts/mysql_migration.py --stages model_id_config --config conf/service_conf.yaml --execute + echo "Model provider table migrations completed." +} + +ensure_db_init +run_mysql_migrations + # Start task executors for ((i=0;i 0 + def get_system_setting_value(self, name: str) -> str | None: + if not self.table_exists("system_settings"): + logger.info("Table 'system_settings' does not exist, migration marker is unavailable") + return None + cursor = self.execute_sql( + "SELECT `value` FROM `system_settings` WHERE `name` = %s", + (name,), + ) + row = cursor.fetchone() + return row[0] if row else None + + def upsert_system_setting(self, name: str, value: str, source: str = "migration", data_type: str = "string"): + if not self.table_exists("system_settings"): + logger.warning("Table 'system_settings' does not exist, migration marker was not saved") + return + + current_ts = int(time.time()) + self.execute_sql( + """ + INSERT INTO `system_settings` + (`name`, `source`, `data_type`, `value`, `create_time`, `create_date`, `update_time`, `update_date`) + VALUES (%s, %s, %s, %s, %s, FROM_UNIXTIME(%s), %s, FROM_UNIXTIME(%s)) + ON DUPLICATE KEY UPDATE + `source` = VALUES(`source`), + `data_type` = VALUES(`data_type`), + `value` = VALUES(`value`), + `update_time` = VALUES(`update_time`), + `update_date` = VALUES(`update_date`) + """, + ( + name, + source, + data_type, + value, + current_ts * 1000, + current_ts, + current_ts * 1000, + current_ts, + ), + ) + # Define model classes for migration (not importing from api.db.db_models) class BaseModel(Model): @@ -225,11 +266,14 @@ class MigrationStage: description = "Base migration stage" source_tables = [] target_tables = [] + migration_version = None + migration_marker_prefix = "mysql_migration" def __init__(self, db: MigrationDatabase, dry_run: bool = True, create_table_only: bool = False): self.db = db self.dry_run = dry_run self.create_table_only = create_table_only + self._noop_completes_migration = False def check(self) -> bool: """Check if migration is needed""" @@ -243,6 +287,52 @@ class MigrationStage: """Create target table (override in subclass if needed)""" pass + def migration_marker_name(self) -> str: + return f"{self.migration_marker_prefix}.{self.name}.version" + + def is_migration_version_applied(self) -> bool: + if not self.migration_version: + return False + + marker_name = self.migration_marker_name() + current_version = self.db.get_system_setting_value(marker_name) + if current_version == self.migration_version: + logger.info( + "Stage '%s' already applied at version %s, skipping", + self.name, + self.migration_version, + ) + return True + + if current_version: + logger.info( + "Stage '%s' marker version is %s, target version is %s", + self.name, + current_version, + self.migration_version, + ) + return False + + def mark_migration_version_applied(self): + if not self.migration_version: + return + + self.db.upsert_system_setting( + self.migration_marker_name(), + self.migration_version, + ) + logger.info( + "Marked stage '%s' as applied at version %s", + self.name, + self.migration_version, + ) + + def mark_noop_completes_migration(self): + self._noop_completes_migration = True + + def noop_completes_migration(self) -> bool: + return self._noop_completes_migration + class TenantModelProviderStage(MigrationStage): """Migrate tenant_llm to tenant_model_provider""" @@ -251,6 +341,7 @@ class TenantModelProviderStage(MigrationStage): description = "Migrate tenant_llm.llm_factory to tenant_model_provider.provider_name" source_tables = ["tenant_llm"] target_tables = ["tenant_model_provider"] + migration_version = "1" def current_timestamp(self) -> int: return int(time.time()) @@ -286,6 +377,7 @@ class TenantModelProviderStage(MigrationStage): count = cursor.fetchone()[0] if count == 0: + self.mark_noop_completes_migration() logger.info("No new data to migrate from tenant_llm to tenant_model_provider") return False @@ -388,6 +480,7 @@ class TenantModelInstanceStage(MigrationStage): description = "Migrate tenant_llm to tenant_model_instance with provider_id lookup" source_tables = ["tenant_llm", "tenant_model_provider"] target_tables = ["tenant_model_instance"] + migration_version = "1" def current_timestamp(self) -> int: return int(time.time()) @@ -438,6 +531,7 @@ class TenantModelInstanceStage(MigrationStage): count = cursor.fetchone()[0] if count == 0: + self.mark_noop_completes_migration() logger.info("No new data to migrate from tenant_llm to tenant_model_instance") return False @@ -553,6 +647,7 @@ class TenantModelStage(MigrationStage): description = "Migrate tenant_llm to tenant_model (status='0' records, plus status='1' for empty-llm factories)" source_tables = ["tenant_llm", "tenant_model_provider", "tenant_model_instance"] target_tables = ["tenant_model"] + migration_version = "1" @staticmethod def _get_empty_llm_factories() -> list[str]: @@ -640,6 +735,7 @@ class TenantModelStage(MigrationStage): count = cursor.fetchone()[0] if count == 0: + self.mark_noop_completes_migration() logger.info("No new data to migrate from tenant_llm to tenant_model") return False @@ -765,6 +861,7 @@ class ModelIdConfigStage(MigrationStage): name = "model_id_config" description = "Normalize stored model IDs in config columns to model@default@provider" + migration_version = "1" source_tables = [ "tenant", "knowledgebase", @@ -938,6 +1035,7 @@ class ModelIdConfigStage(MigrationStage): def check(self) -> bool: rows, tables = self.count_changes() if rows == 0: + self.mark_noop_completes_migration() logger.info("No stored model IDs need normalization") return False logger.info( @@ -1046,6 +1144,10 @@ def run_migration(config: MigrationConfig, stages: list, dry_run: bool = True, stage = stage_cls(db, dry_run=dry_run, create_table_only=create_table_only) stage_start = time.time() + + if not create_table_only and stage.is_migration_version_applied(): + stats.add_stage_stats(stage_name, [], 0, time.time() - stage_start) + continue # For create_table_only mode, skip check and directly execute if create_table_only: @@ -1055,11 +1157,15 @@ def run_migration(config: MigrationConfig, stages: list, dry_run: bool = True, # Check if migration is needed if not stage.check(): logger.info(f"Stage '{stage_name}' check: no migration needed") + if not dry_run and stage.noop_completes_migration(): + stage.mark_migration_version_applied() stats.add_stage_stats(stage_name, [], 0, time.time() - stage_start) continue # Execute migration rows, tables = stage.execute() + if not dry_run: + stage.mark_migration_version_applied() stage_duration = time.time() - stage_start