diff --git a/tools/scripts/README.md b/tools/scripts/README.md new file mode 100644 index 0000000000..19ce229402 --- /dev/null +++ b/tools/scripts/README.md @@ -0,0 +1,193 @@ +# MySQL Data Migration Script + +A flexible MySQL data migration tool for migrating data between tables with stage-based execution. + +## Overview + +This script provides stage-based data migration between MySQL tables. Currently supports: +- `tenant_model_provider` +- `tenant_model_instance` +- `tenant_model` + +### Migration Stages + +| Stage | Source Table | Target Table | Description | +|-------|-------------|--------------|-------------| +| `tenant_model_provider` | `tenant_llm` | `tenant_model_provider` | Extracts distinct `(tenant_id, llm_factory)` pairs | +| `tenant_model_instance` | `tenant_llm` + `tenant_model_provider` | `tenant_model_instance` | Creates instances with distinct `(tenant_id, llm_factory, api_key)` | +| `tenant_model` | `tenant_llm` + `tenant_model_provider` + `tenant_model_instance` | `tenant_model` | Migrates model configurations (only `status='0'` records) | + +### Stage Dependencies + +``` +tenant_model_provider (no dependencies) + ↓ +tenant_model_instance (depends on tenant_model_provider) + ↓ +tenant_model (depends on tenant_model_provider and tenant_model_instance) +``` + +### Field Mapping Rules + +#### tenant_model_provider + +| Target Field | Source | Rule | +|--------------|--------|------| +| `id` | - | Random 32-character UUID1 | +| `provider_name` | `tenant_llm.llm_factory` | Direct mapping | +| `tenant_id` | `tenant_llm.tenant_id` | Direct mapping | + +- **Deduplication**: Groups by `(tenant_id, llm_factory)` and takes distinct pairs + +#### tenant_model_instance + +| Target Field | Source | Rule | +|--------------|--------|------| +| `id` | - | Random 32-character UUID1 | +| `instance_name` | `tenant_llm.llm_factory` | Direct mapping | +| `provider_id` | `tenant_model_provider.id` | JOIN on `tenant_id` and `provider_name=llm_factory` | +| `api_key` | `tenant_llm.api_key` | Direct mapping | +| `status` | `tenant_llm.status` | Direct mapping | + +- **Deduplication**: Groups by `(tenant_id, llm_factory, api_key)` and takes distinct records + +#### tenant_model + +| Target Field | Source | Rule | +|--------------|--------|------| +| `id` | - | Random 32-character UUID1 | +| `model_name` | `tenant_llm.llm_name` | Direct mapping | +| `provider_id` | `tenant_model_provider.id` | JOIN on `tenant_id` and `provider_name=llm_factory` | +| `instance_id` | `tenant_model_instance.id` | JOIN on `provider_id` and `api_key` | +| `model_type` | `tenant_llm.model_type` | Direct mapping | +| `status` | `tenant_llm.status` | Direct mapping | + +- **Filter**: Only migrates records where `tenant_llm.status='0'` + +## Usage + +### Command Line Arguments + +``` +python mysql_migration.py [OPTIONS] +``` + +| Option | Short | Description | +|--------|-------|-------------| +| `--config` | `-c` | Path to YAML config file (required) | +| `--stages` | `-s` | Comma-separated list of stages to run | +| `--list-stages` | `-l` | List available stages and exit | +| `--execute` | `-e` | Execute full migration (create tables and migrate data) | +| `--create-table-only` | - | Only create target tables, skip data migration | + +### Execution Modes + +The script has three mutually exclusive modes: + +1. **Dry-Run Mode** (default): Check only, no database writes + ```bash + python mysql_migration.py --stages tenant_model_provider --config config.yaml + ``` + +2. **Create Table Only Mode**: Create target tables without migrating data + ```bash + python mysql_migration.py --stages tenant_model_provider --config config.yaml --create-table-only + ``` + +3. **Execute Mode**: Create tables and migrate data + ```bash + python mysql_migration.py --stages tenant_model_provider --config config.yaml --execute + ``` + +### Configuration File + +Create a YAML configuration file with MySQL connection settings: + +```yaml +database: + host: localhost + port: 3306 + user: root + password: your_password + name: rag_flow +``` + +Alternative keys are also supported: + +```yaml +mysql: + host: localhost + port: 3306 + user: root + password: your_password + database: rag_flow +``` + +### Examples + +```bash +# List all available stages +python mysql_migration.py --list-stages + +# Dry run single stage +python mysql_migration.py --stages tenant_model_provider --config /path/to/config.yaml + +# Create tables only for multiple stages +python mysql_migration.py --stages tenant_model_provider,tenant_model_instance --config /path/to/config.yaml --create-table-only + +# Execute full migration for all stages (in dependency order) +python mysql_migration.py --stages tenant_model_provider,tenant_model_instance,tenant_model --config /path/to/config.yaml --execute +``` + +## Output Interpretation + +### Stage Execution Log + +Each stage displays a header showing progress: + +``` +============================================================ +Stage [1/3]: tenant_model_provider +============================================================ +``` + +The stage then performs: +1. Check phase: Verifies source/target tables exist and counts records to migrate +2. Execute phase: Creates tables (if needed) and migrates data in batches + +### Dry-Run Output + +In dry-run mode, the script outputs what it would do without writing: + +``` +[DRY RUN] Would insert 150 records + instance_name=OpenAI, provider_id=abc123, api_key=*** + ... and 145 more records +``` + +### Migration Summary + +After all stages complete, a summary is printed: + +``` +============================================================ +Migration Summary +============================================================ +Total Duration: 2.45s +Total Rows Processed: 350 +Tables Operated: tenant_model_provider, tenant_model_instance +------------------------------------------------------------ +Stage Details: + [tenant_model_provider] Tables: tenant_model_provider, Rows: 50, Duration: 0.82s + [tenant_model_instance] Tables: tenant_model_instance, Rows: 300, Duration: 1.63s +============================================================ +``` + +### Common Messages + +| Message | Meaning | +|---------|-------------------------------------------------------------------------| +| `No new data to migrate` | All records already exist in target table | +| `[DRY RUN] Target table does not exist` | Target table missing, use `--execute` or `--create-table-only`to create | +| `Dependency table does not exist` | Required table from previous stage missing | +| `Inserted batch X: Y records` | Successfully inserted batch of records | diff --git a/tools/scripts/mysql_migration.py b/tools/scripts/mysql_migration.py index c43b1eb64d..1e7aeeb21e 100644 --- a/tools/scripts/mysql_migration.py +++ b/tools/scripts/mysql_migration.py @@ -365,9 +365,362 @@ class TenantModelProviderStage(MigrationStage): logger.info("Created tenant_model_provider table") +class TenantModelInstanceStage(MigrationStage): + """Migrate tenant_llm to tenant_model_instance""" + + name = "tenant_model_instance" + description = "Migrate tenant_llm to tenant_model_instance with provider_id lookup" + source_tables = ["tenant_llm", "tenant_model_provider"] + target_tables = ["tenant_model_instance"] + + def current_timestamp(self) -> int: + return int(time.time()) + + def generate_uuid(self) -> str: + """Generate 32-character UUID1""" + return uuid.uuid1().hex + + def check(self) -> bool: + """Check if migration is needed""" + # Check if source table exists + if not self.db.table_exists("tenant_llm"): + logger.warning("Source table 'tenant_llm' does not exist") + return False + + # Check if tenant_model_provider exists (dependency) + if not self.db.table_exists("tenant_model_provider"): + if self.dry_run: + logger.info("[DRY RUN] Dependency table 'tenant_model_provider' does not exist. " + "Run 'tenant_model_provider' stage first or use --execute.") + return False + logger.warning("Dependency table 'tenant_model_provider' does not exist. " + "Please run 'tenant_model_provider' stage first.") + return False + + # Check if target table exists + if not self.db.table_exists("tenant_model_instance"): + if self.dry_run: + logger.info("[DRY RUN] Target table 'tenant_model_instance' does not exist. " + "Use --execute to create and populate the table.") + return False + logger.info("Target table 'tenant_model_instance' does not exist, will create") + return True + + # Check if there's data to migrate (distinct by tenant_id, llm_factory, api_key) + cursor = self.db.execute_sql( + "SELECT COUNT(*) FROM (" + " SELECT tl.tenant_id, tl.llm_factory, tl.api_key, tmp.id as provider_id " + " FROM tenant_llm tl " + " INNER JOIN tenant_model_provider tmp ON tmp.tenant_id = tl.tenant_id AND tmp.provider_name = tl.llm_factory " + " WHERE NOT EXISTS (" + " SELECT 1 FROM tenant_model_instance tmi " + " WHERE tmi.provider_id = tmp.id AND tmi.api_key = tl.api_key" + " ) " + " GROUP BY tl.tenant_id, tl.llm_factory, tl.api_key, tmp.id" + ") AS distinct_records" + ) + count = cursor.fetchone()[0] + + if count == 0: + logger.info("No new data to migrate from tenant_llm to tenant_model_instance") + return False + + logger.info(f"Found {count} rows to migrate from tenant_llm to tenant_model_instance") + return True + + def execute(self) -> tuple[int, list]: + """Execute migration""" + current_ts = self.current_timestamp() + rows_inserted = 0 + + # Check if tenant_model_provider exists (dependency) + if not self.db.table_exists("tenant_model_provider"): + logger.error("Dependency table 'tenant_model_provider' does not exist. " + "Please run 'tenant_model_provider' stage first.") + return 0, [] + + # Check if target table exists + if not self.db.table_exists("tenant_model_instance"): + if self.dry_run: + logger.info("[DRY RUN] Target table 'tenant_model_instance' does not exist. " + "Use --execute to create and populate the table.") + return 0, [] + logger.info("Target table 'tenant_model_instance' does not exist, will create") + self.create_target_table() + + # If create_table_only mode, skip data migration + if self.create_table_only: + logger.info("[CREATE TABLE ONLY] Target table created/verified, skipping data migration") + return 0, self.target_tables + + # Get records from tenant_llm with provider_id lookup + # Group by tenant_id, llm_factory, api_key to get distinct records + # instance_name = llm_factory, provider_id from tenant_model_provider, api_key from tenant_llm + cursor = self.db.execute_sql( + "SELECT tl.tenant_id, tl.llm_factory, tl.api_key, MAX(tl.status) as status, tmp.id as provider_id " + "FROM tenant_llm tl " + "INNER JOIN tenant_model_provider tmp ON tmp.tenant_id = tl.tenant_id AND tmp.provider_name = tl.llm_factory " + "WHERE NOT EXISTS (" + " SELECT 1 FROM tenant_model_instance tmi " + " WHERE tmi.provider_id = tmp.id AND tmi.api_key = tl.api_key" + ") " + "GROUP BY tl.tenant_id, tl.llm_factory, tl.api_key, tmp.id" + ) + + records = cursor.fetchall() + + if not records: + logger.info("No records to migrate") + return 0, [] + + logger.info(f"Migrating {len(records)} tenant_model_instance records...") + + if self.dry_run: + logger.info(f"[DRY RUN] Would insert {len(records)} records") + for tenant_id, llm_factory, api_key, status, provider_id in records[:5]: + logger.info(f" instance_name={llm_factory}, provider_id={provider_id}, api_key=***") + if len(records) > 5: + logger.info(f" ... and {len(records) - 5} more records") + return len(records), self.target_tables + + # Insert records in batches + batch_size = 100 + for i in range(0, len(records), batch_size): + batch = records[i:i + batch_size] + values = [] + for tenant_id, llm_factory, api_key, status, provider_id in batch: + record_id = self.generate_uuid() + instance_name = llm_factory.replace("'", "''") if llm_factory else "" + api_key_escaped = api_key.replace("'", "''") if api_key else "" + status_val = status if status else "active" + values.append(f"('{record_id}', '{instance_name}', '{provider_id}', " + f"'{api_key_escaped}', '{status_val}', " + f"{current_ts}, FROM_UNIXTIME({current_ts}), " + f"{current_ts}, FROM_UNIXTIME({current_ts}))") + + insert_sql = f""" + INSERT INTO tenant_model_instance + (id, instance_name, provider_id, api_key, status, create_time, create_date, update_time, update_date) + VALUES {', '.join(values)} + """ + self.db.execute_sql(insert_sql) + rows_inserted += len(batch) + logger.info(f"Inserted batch {i // batch_size + 1}: {len(batch)} records") + + return rows_inserted, self.target_tables + + def create_target_table(self): + """Create tenant_model_instance table""" + create_sql = """ + CREATE TABLE IF NOT EXISTS tenant_model_instance ( + id VARCHAR(32) NOT NULL PRIMARY KEY, + instance_name VARCHAR(128) NOT NULL, + provider_id VARCHAR(32) NOT NULL, + api_key VARCHAR(512) NOT NULL, + status VARCHAR(32) DEFAULT 'active', + create_time BIGINT, + create_date DATETIME, + update_time BIGINT, + update_date DATETIME, + UNIQUE INDEX idx_api_key_provider_id (api_key, provider_id), + INDEX idx_provider_id (provider_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + self.db.execute_sql(create_sql) + logger.info("Created tenant_model_instance table") + + +class TenantModelStage(MigrationStage): + """Migrate tenant_llm to tenant_model""" + + name = "tenant_model" + description = "Migrate tenant_llm to tenant_model (only status='0' records)" + source_tables = ["tenant_llm", "tenant_model_provider", "tenant_model_instance"] + target_tables = ["tenant_model"] + + def current_timestamp(self) -> int: + return int(time.time()) + + def generate_uuid(self) -> str: + """Generate 32-character UUID1""" + return uuid.uuid1().hex + + def check(self) -> bool: + """Check if migration is needed""" + # Check if source table exists + if not self.db.table_exists("tenant_llm"): + logger.warning("Source table 'tenant_llm' does not exist") + return False + + # Check if tenant_model_provider exists (dependency) + if not self.db.table_exists("tenant_model_provider"): + if self.dry_run: + logger.info("[DRY RUN] Dependency table 'tenant_model_provider' does not exist. " + "Run 'tenant_model_provider' stage first or use --execute.") + return False + logger.warning("Dependency table 'tenant_model_provider' does not exist. " + "Please run 'tenant_model_provider' stage first.") + return False + + # Check if tenant_model_instance exists (dependency) + if not self.db.table_exists("tenant_model_instance"): + if self.dry_run: + logger.info("[DRY RUN] Dependency table 'tenant_model_instance' does not exist. " + "Run 'tenant_model_instance' stage first or use --execute.") + return False + logger.warning("Dependency table 'tenant_model_instance' does not exist. " + "Please run 'tenant_model_instance' stage first.") + return False + + # Check if target table exists + if not self.db.table_exists("tenant_model"): + if self.dry_run: + logger.info("[DRY RUN] Target table 'tenant_model' does not exist. " + "Use --execute to create and populate the table.") + return False + logger.info("Target table 'tenant_model' does not exist, will create") + return True + + # Check if there's data to migrate (only status='0' records) + cursor = self.db.execute_sql( + "SELECT COUNT(*) FROM (" + " SELECT tl.id " + " FROM tenant_llm tl " + " INNER JOIN tenant_model_provider tmp ON tmp.tenant_id = tl.tenant_id AND tmp.provider_name = tl.llm_factory " + " INNER JOIN tenant_model_instance tmi ON tmi.provider_id = tmp.id AND tmi.api_key = tl.api_key " + " WHERE tl.status = '0' " + " AND NOT EXISTS (" + " SELECT 1 FROM tenant_model tm " + " WHERE tm.provider_id = tmp.id AND tm.model_name = tl.llm_name AND tm.instance_id = tmi.id" + " )" + ") AS distinct_records" + ) + count = cursor.fetchone()[0] + + if count == 0: + logger.info("No new data to migrate from tenant_llm to tenant_model (status='0' only)") + return False + + logger.info(f"Found {count} rows to migrate from tenant_llm to tenant_model") + return True + + def execute(self) -> tuple[int, list]: + """Execute migration""" + current_ts = self.current_timestamp() + rows_inserted = 0 + + # Check if tenant_model_provider exists (dependency) + if not self.db.table_exists("tenant_model_provider"): + logger.error("Dependency table 'tenant_model_provider' does not exist. " + "Please run 'tenant_model_provider' stage first.") + return 0, [] + + # Check if tenant_model_instance exists (dependency) + if not self.db.table_exists("tenant_model_instance"): + logger.error("Dependency table 'tenant_model_instance' does not exist. " + "Please run 'tenant_model_instance' stage first.") + return 0, [] + + # Check if target table exists + if not self.db.table_exists("tenant_model"): + if self.dry_run: + logger.info("[DRY RUN] Target table 'tenant_model' does not exist. " + "Use --execute to create and populate the table.") + return 0, [] + logger.info("Target table 'tenant_model' does not exist, will create") + self.create_target_table() + + # If create_table_only mode, skip data migration + if self.create_table_only: + logger.info("[CREATE TABLE ONLY] Target table created/verified, skipping data migration") + return 0, self.target_tables + + # Get records from tenant_llm with provider_id and instance_id lookup + # Only migrate records where status='0' + cursor = self.db.execute_sql( + "SELECT tl.id, tl.llm_name, tmp.id as provider_id, tmi.id as instance_id, " + " tl.model_type, tl.status " + "FROM tenant_llm tl " + "INNER JOIN tenant_model_provider tmp ON tmp.tenant_id = tl.tenant_id AND tmp.provider_name = tl.llm_factory " + "INNER JOIN tenant_model_instance tmi ON tmi.provider_id = tmp.id AND tmi.api_key = tl.api_key " + "WHERE tl.status = '0' " + "AND NOT EXISTS (" + " SELECT 1 FROM tenant_model tm " + " WHERE tm.provider_id = tmp.id AND tm.model_name = tl.llm_name AND tm.instance_id = tmi.id" + ")" + ) + + records = cursor.fetchall() + + if not records: + logger.info("No records to migrate") + return 0, [] + + logger.info(f"Migrating {len(records)} tenant_model records...") + + if self.dry_run: + logger.info(f"[DRY RUN] Would insert {len(records)} records") + for source_id, llm_name, provider_id, instance_id, model_type, status in records[:5]: + logger.info(f" model_name={llm_name}, provider_id={provider_id}, " + f"instance_id={instance_id}, model_type={model_type}") + if len(records) > 5: + logger.info(f" ... and {len(records) - 5} more records") + return len(records), self.target_tables + + # Insert records in batches + batch_size = 100 + for i in range(0, len(records), batch_size): + batch = records[i:i + batch_size] + values = [] + for source_id, llm_name, provider_id, instance_id, model_type, status in batch: + record_id = self.generate_uuid() + model_name_escaped = llm_name.replace("'", "''") if llm_name else "" + model_type_escaped = model_type.replace("'", "''") if model_type else "" + status_val = status if status else "active" + values.append(f"('{record_id}', '{model_name_escaped}', '{provider_id}', " + f"'{instance_id}', '{model_type_escaped}', '{status_val}', " + f"{current_ts}, FROM_UNIXTIME({current_ts}), " + f"{current_ts}, FROM_UNIXTIME({current_ts}))") + + insert_sql = f""" + INSERT INTO tenant_model + (id, model_name, provider_id, instance_id, model_type, status, + create_time, create_date, update_time, update_date) + VALUES {', '.join(values)} + """ + self.db.execute_sql(insert_sql) + rows_inserted += len(batch) + logger.info(f"Inserted batch {i // batch_size + 1}: {len(batch)} records") + + return rows_inserted, self.target_tables + + def create_target_table(self): + """Create tenant_model table""" + create_sql = """ + CREATE TABLE IF NOT EXISTS tenant_model ( + id VARCHAR(32) NOT NULL PRIMARY KEY, + model_name VARCHAR(128), + provider_id VARCHAR(32) NOT NULL, + instance_id VARCHAR(32) NOT NULL, + model_type VARCHAR(32) NOT NULL, + status VARCHAR(32) DEFAULT 'active', + create_time BIGINT, + create_date DATETIME, + update_time BIGINT, + update_date DATETIME, + INDEX idx_instance_id (instance_id), + UNIQUE INDEX idx_provider_model_instance (provider_id, model_name, instance_id) + ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 + """ + self.db.execute_sql(create_sql) + logger.info("Created tenant_model table") + + # Registry of available migration stages MIGRATION_STAGES = { 'tenant_model_provider': TenantModelProviderStage, + 'tenant_model_instance': TenantModelInstanceStage, + 'tenant_model': TenantModelStage, } @@ -394,7 +747,7 @@ def run_migration(config: MigrationConfig, stages: list, dry_run: bool = True, total_stages = len(stages) for idx, stage_name in enumerate(stages, 1): - logger.info(f"\n{'=' * 60}") + logger.info(f"{'=' * 60}") logger.info(f"Stage [{idx}/{total_stages}]: {stage_name}") logger.info(f"{'=' * 60}")