Feat: migrate script (#13976)

### What problem does this PR solve?

Add stage for migrate tenant_llm data into table tenant_model_instance
and tenant_model.

### Type of change

- [x] Other (please describe): tool script


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **Chores**
* Added two new migration stages to move tenant model and instance
records into new target tables, with dry-run, full-execute, and "create
table only" modes; migration skips already-migrated rows to avoid
duplicates.
* **Bug Fixes**
  * Cleaned up migration header logging for clearer output.
* **Documentation**
* Added usage guide describing stages, options, modes, config format,
examples, and expected logs.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
Lynn
2026-04-09 11:03:39 +08:00
committed by GitHub
parent c5871c1078
commit dbfb439239
2 changed files with 547 additions and 1 deletions

193
tools/scripts/README.md Normal file
View File

@@ -0,0 +1,193 @@
# MySQL Data Migration Script
A flexible MySQL data migration tool for migrating data between tables with stage-based execution.
## Overview
This script provides stage-based data migration between MySQL tables. Currently supports:
- `tenant_model_provider`
- `tenant_model_instance`
- `tenant_model`
### Migration Stages
| Stage | Source Table | Target Table | Description |
|-------|-------------|--------------|-------------|
| `tenant_model_provider` | `tenant_llm` | `tenant_model_provider` | Extracts distinct `(tenant_id, llm_factory)` pairs |
| `tenant_model_instance` | `tenant_llm` + `tenant_model_provider` | `tenant_model_instance` | Creates instances with distinct `(tenant_id, llm_factory, api_key)` |
| `tenant_model` | `tenant_llm` + `tenant_model_provider` + `tenant_model_instance` | `tenant_model` | Migrates model configurations (only `status='0'` records) |
### Stage Dependencies
```
tenant_model_provider (no dependencies)
tenant_model_instance (depends on tenant_model_provider)
tenant_model (depends on tenant_model_provider and tenant_model_instance)
```
### Field Mapping Rules
#### tenant_model_provider
| Target Field | Source | Rule |
|--------------|--------|------|
| `id` | - | Random 32-character UUID1 |
| `provider_name` | `tenant_llm.llm_factory` | Direct mapping |
| `tenant_id` | `tenant_llm.tenant_id` | Direct mapping |
- **Deduplication**: Groups by `(tenant_id, llm_factory)` and takes distinct pairs
#### tenant_model_instance
| Target Field | Source | Rule |
|--------------|--------|------|
| `id` | - | Random 32-character UUID1 |
| `instance_name` | `tenant_llm.llm_factory` | Direct mapping |
| `provider_id` | `tenant_model_provider.id` | JOIN on `tenant_id` and `provider_name=llm_factory` |
| `api_key` | `tenant_llm.api_key` | Direct mapping |
| `status` | `tenant_llm.status` | Direct mapping |
- **Deduplication**: Groups by `(tenant_id, llm_factory, api_key)` and takes distinct records
#### tenant_model
| Target Field | Source | Rule |
|--------------|--------|------|
| `id` | - | Random 32-character UUID1 |
| `model_name` | `tenant_llm.llm_name` | Direct mapping |
| `provider_id` | `tenant_model_provider.id` | JOIN on `tenant_id` and `provider_name=llm_factory` |
| `instance_id` | `tenant_model_instance.id` | JOIN on `provider_id` and `api_key` |
| `model_type` | `tenant_llm.model_type` | Direct mapping |
| `status` | `tenant_llm.status` | Direct mapping |
- **Filter**: Only migrates records where `tenant_llm.status='0'`
## Usage
### Command Line Arguments
```
python mysql_migration.py [OPTIONS]
```
| Option | Short | Description |
|--------|-------|-------------|
| `--config` | `-c` | Path to YAML config file (required) |
| `--stages` | `-s` | Comma-separated list of stages to run |
| `--list-stages` | `-l` | List available stages and exit |
| `--execute` | `-e` | Execute full migration (create tables and migrate data) |
| `--create-table-only` | - | Only create target tables, skip data migration |
### Execution Modes
The script has three mutually exclusive modes:
1. **Dry-Run Mode** (default): Check only, no database writes
```bash
python mysql_migration.py --stages tenant_model_provider --config config.yaml
```
2. **Create Table Only Mode**: Create target tables without migrating data
```bash
python mysql_migration.py --stages tenant_model_provider --config config.yaml --create-table-only
```
3. **Execute Mode**: Create tables and migrate data
```bash
python mysql_migration.py --stages tenant_model_provider --config config.yaml --execute
```
### Configuration File
Create a YAML configuration file with MySQL connection settings:
```yaml
database:
host: localhost
port: 3306
user: root
password: your_password
name: rag_flow
```
Alternative keys are also supported:
```yaml
mysql:
host: localhost
port: 3306
user: root
password: your_password
database: rag_flow
```
### Examples
```bash
# List all available stages
python mysql_migration.py --list-stages
# Dry run single stage
python mysql_migration.py --stages tenant_model_provider --config /path/to/config.yaml
# Create tables only for multiple stages
python mysql_migration.py --stages tenant_model_provider,tenant_model_instance --config /path/to/config.yaml --create-table-only
# Execute full migration for all stages (in dependency order)
python mysql_migration.py --stages tenant_model_provider,tenant_model_instance,tenant_model --config /path/to/config.yaml --execute
```
## Output Interpretation
### Stage Execution Log
Each stage displays a header showing progress:
```
============================================================
Stage [1/3]: tenant_model_provider
============================================================
```
The stage then performs:
1. Check phase: Verifies source/target tables exist and counts records to migrate
2. Execute phase: Creates tables (if needed) and migrates data in batches
### Dry-Run Output
In dry-run mode, the script outputs what it would do without writing:
```
[DRY RUN] Would insert 150 records
instance_name=OpenAI, provider_id=abc123, api_key=***
... and 145 more records
```
### Migration Summary
After all stages complete, a summary is printed:
```
============================================================
Migration Summary
============================================================
Total Duration: 2.45s
Total Rows Processed: 350
Tables Operated: tenant_model_provider, tenant_model_instance
------------------------------------------------------------
Stage Details:
[tenant_model_provider] Tables: tenant_model_provider, Rows: 50, Duration: 0.82s
[tenant_model_instance] Tables: tenant_model_instance, Rows: 300, Duration: 1.63s
============================================================
```
### Common Messages
| Message | Meaning |
|---------|-------------------------------------------------------------------------|
| `No new data to migrate` | All records already exist in target table |
| `[DRY RUN] Target table does not exist` | Target table missing, use `--execute` or `--create-table-only`to create |
| `Dependency table does not exist` | Required table from previous stage missing |
| `Inserted batch X: Y records` | Successfully inserted batch of records |

View File

@@ -365,9 +365,362 @@ class TenantModelProviderStage(MigrationStage):
logger.info("Created tenant_model_provider table")
class TenantModelInstanceStage(MigrationStage):
"""Migrate tenant_llm to tenant_model_instance"""
name = "tenant_model_instance"
description = "Migrate tenant_llm to tenant_model_instance with provider_id lookup"
source_tables = ["tenant_llm", "tenant_model_provider"]
target_tables = ["tenant_model_instance"]
def current_timestamp(self) -> int:
return int(time.time())
def generate_uuid(self) -> str:
"""Generate 32-character UUID1"""
return uuid.uuid1().hex
def check(self) -> bool:
"""Check if migration is needed"""
# Check if source table exists
if not self.db.table_exists("tenant_llm"):
logger.warning("Source table 'tenant_llm' does not exist")
return False
# Check if tenant_model_provider exists (dependency)
if not self.db.table_exists("tenant_model_provider"):
if self.dry_run:
logger.info("[DRY RUN] Dependency table 'tenant_model_provider' does not exist. "
"Run 'tenant_model_provider' stage first or use --execute.")
return False
logger.warning("Dependency table 'tenant_model_provider' does not exist. "
"Please run 'tenant_model_provider' stage first.")
return False
# Check if target table exists
if not self.db.table_exists("tenant_model_instance"):
if self.dry_run:
logger.info("[DRY RUN] Target table 'tenant_model_instance' does not exist. "
"Use --execute to create and populate the table.")
return False
logger.info("Target table 'tenant_model_instance' does not exist, will create")
return True
# Check if there's data to migrate (distinct by tenant_id, llm_factory, api_key)
cursor = self.db.execute_sql(
"SELECT COUNT(*) FROM ("
" SELECT tl.tenant_id, tl.llm_factory, tl.api_key, tmp.id as provider_id "
" FROM tenant_llm tl "
" INNER JOIN tenant_model_provider tmp ON tmp.tenant_id = tl.tenant_id AND tmp.provider_name = tl.llm_factory "
" WHERE NOT EXISTS ("
" SELECT 1 FROM tenant_model_instance tmi "
" WHERE tmi.provider_id = tmp.id AND tmi.api_key = tl.api_key"
" ) "
" GROUP BY tl.tenant_id, tl.llm_factory, tl.api_key, tmp.id"
") AS distinct_records"
)
count = cursor.fetchone()[0]
if count == 0:
logger.info("No new data to migrate from tenant_llm to tenant_model_instance")
return False
logger.info(f"Found {count} rows to migrate from tenant_llm to tenant_model_instance")
return True
def execute(self) -> tuple[int, list]:
"""Execute migration"""
current_ts = self.current_timestamp()
rows_inserted = 0
# Check if tenant_model_provider exists (dependency)
if not self.db.table_exists("tenant_model_provider"):
logger.error("Dependency table 'tenant_model_provider' does not exist. "
"Please run 'tenant_model_provider' stage first.")
return 0, []
# Check if target table exists
if not self.db.table_exists("tenant_model_instance"):
if self.dry_run:
logger.info("[DRY RUN] Target table 'tenant_model_instance' does not exist. "
"Use --execute to create and populate the table.")
return 0, []
logger.info("Target table 'tenant_model_instance' does not exist, will create")
self.create_target_table()
# If create_table_only mode, skip data migration
if self.create_table_only:
logger.info("[CREATE TABLE ONLY] Target table created/verified, skipping data migration")
return 0, self.target_tables
# Get records from tenant_llm with provider_id lookup
# Group by tenant_id, llm_factory, api_key to get distinct records
# instance_name = llm_factory, provider_id from tenant_model_provider, api_key from tenant_llm
cursor = self.db.execute_sql(
"SELECT tl.tenant_id, tl.llm_factory, tl.api_key, MAX(tl.status) as status, tmp.id as provider_id "
"FROM tenant_llm tl "
"INNER JOIN tenant_model_provider tmp ON tmp.tenant_id = tl.tenant_id AND tmp.provider_name = tl.llm_factory "
"WHERE NOT EXISTS ("
" SELECT 1 FROM tenant_model_instance tmi "
" WHERE tmi.provider_id = tmp.id AND tmi.api_key = tl.api_key"
") "
"GROUP BY tl.tenant_id, tl.llm_factory, tl.api_key, tmp.id"
)
records = cursor.fetchall()
if not records:
logger.info("No records to migrate")
return 0, []
logger.info(f"Migrating {len(records)} tenant_model_instance records...")
if self.dry_run:
logger.info(f"[DRY RUN] Would insert {len(records)} records")
for tenant_id, llm_factory, api_key, status, provider_id in records[:5]:
logger.info(f" instance_name={llm_factory}, provider_id={provider_id}, api_key=***")
if len(records) > 5:
logger.info(f" ... and {len(records) - 5} more records")
return len(records), self.target_tables
# Insert records in batches
batch_size = 100
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
values = []
for tenant_id, llm_factory, api_key, status, provider_id in batch:
record_id = self.generate_uuid()
instance_name = llm_factory.replace("'", "''") if llm_factory else ""
api_key_escaped = api_key.replace("'", "''") if api_key else ""
status_val = status if status else "active"
values.append(f"('{record_id}', '{instance_name}', '{provider_id}', "
f"'{api_key_escaped}', '{status_val}', "
f"{current_ts}, FROM_UNIXTIME({current_ts}), "
f"{current_ts}, FROM_UNIXTIME({current_ts}))")
insert_sql = f"""
INSERT INTO tenant_model_instance
(id, instance_name, provider_id, api_key, status, create_time, create_date, update_time, update_date)
VALUES {', '.join(values)}
"""
self.db.execute_sql(insert_sql)
rows_inserted += len(batch)
logger.info(f"Inserted batch {i // batch_size + 1}: {len(batch)} records")
return rows_inserted, self.target_tables
def create_target_table(self):
"""Create tenant_model_instance table"""
create_sql = """
CREATE TABLE IF NOT EXISTS tenant_model_instance (
id VARCHAR(32) NOT NULL PRIMARY KEY,
instance_name VARCHAR(128) NOT NULL,
provider_id VARCHAR(32) NOT NULL,
api_key VARCHAR(512) NOT NULL,
status VARCHAR(32) DEFAULT 'active',
create_time BIGINT,
create_date DATETIME,
update_time BIGINT,
update_date DATETIME,
UNIQUE INDEX idx_api_key_provider_id (api_key, provider_id),
INDEX idx_provider_id (provider_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
self.db.execute_sql(create_sql)
logger.info("Created tenant_model_instance table")
class TenantModelStage(MigrationStage):
"""Migrate tenant_llm to tenant_model"""
name = "tenant_model"
description = "Migrate tenant_llm to tenant_model (only status='0' records)"
source_tables = ["tenant_llm", "tenant_model_provider", "tenant_model_instance"]
target_tables = ["tenant_model"]
def current_timestamp(self) -> int:
return int(time.time())
def generate_uuid(self) -> str:
"""Generate 32-character UUID1"""
return uuid.uuid1().hex
def check(self) -> bool:
"""Check if migration is needed"""
# Check if source table exists
if not self.db.table_exists("tenant_llm"):
logger.warning("Source table 'tenant_llm' does not exist")
return False
# Check if tenant_model_provider exists (dependency)
if not self.db.table_exists("tenant_model_provider"):
if self.dry_run:
logger.info("[DRY RUN] Dependency table 'tenant_model_provider' does not exist. "
"Run 'tenant_model_provider' stage first or use --execute.")
return False
logger.warning("Dependency table 'tenant_model_provider' does not exist. "
"Please run 'tenant_model_provider' stage first.")
return False
# Check if tenant_model_instance exists (dependency)
if not self.db.table_exists("tenant_model_instance"):
if self.dry_run:
logger.info("[DRY RUN] Dependency table 'tenant_model_instance' does not exist. "
"Run 'tenant_model_instance' stage first or use --execute.")
return False
logger.warning("Dependency table 'tenant_model_instance' does not exist. "
"Please run 'tenant_model_instance' stage first.")
return False
# Check if target table exists
if not self.db.table_exists("tenant_model"):
if self.dry_run:
logger.info("[DRY RUN] Target table 'tenant_model' does not exist. "
"Use --execute to create and populate the table.")
return False
logger.info("Target table 'tenant_model' does not exist, will create")
return True
# Check if there's data to migrate (only status='0' records)
cursor = self.db.execute_sql(
"SELECT COUNT(*) FROM ("
" SELECT tl.id "
" FROM tenant_llm tl "
" INNER JOIN tenant_model_provider tmp ON tmp.tenant_id = tl.tenant_id AND tmp.provider_name = tl.llm_factory "
" INNER JOIN tenant_model_instance tmi ON tmi.provider_id = tmp.id AND tmi.api_key = tl.api_key "
" WHERE tl.status = '0' "
" AND NOT EXISTS ("
" SELECT 1 FROM tenant_model tm "
" WHERE tm.provider_id = tmp.id AND tm.model_name = tl.llm_name AND tm.instance_id = tmi.id"
" )"
") AS distinct_records"
)
count = cursor.fetchone()[0]
if count == 0:
logger.info("No new data to migrate from tenant_llm to tenant_model (status='0' only)")
return False
logger.info(f"Found {count} rows to migrate from tenant_llm to tenant_model")
return True
def execute(self) -> tuple[int, list]:
"""Execute migration"""
current_ts = self.current_timestamp()
rows_inserted = 0
# Check if tenant_model_provider exists (dependency)
if not self.db.table_exists("tenant_model_provider"):
logger.error("Dependency table 'tenant_model_provider' does not exist. "
"Please run 'tenant_model_provider' stage first.")
return 0, []
# Check if tenant_model_instance exists (dependency)
if not self.db.table_exists("tenant_model_instance"):
logger.error("Dependency table 'tenant_model_instance' does not exist. "
"Please run 'tenant_model_instance' stage first.")
return 0, []
# Check if target table exists
if not self.db.table_exists("tenant_model"):
if self.dry_run:
logger.info("[DRY RUN] Target table 'tenant_model' does not exist. "
"Use --execute to create and populate the table.")
return 0, []
logger.info("Target table 'tenant_model' does not exist, will create")
self.create_target_table()
# If create_table_only mode, skip data migration
if self.create_table_only:
logger.info("[CREATE TABLE ONLY] Target table created/verified, skipping data migration")
return 0, self.target_tables
# Get records from tenant_llm with provider_id and instance_id lookup
# Only migrate records where status='0'
cursor = self.db.execute_sql(
"SELECT tl.id, tl.llm_name, tmp.id as provider_id, tmi.id as instance_id, "
" tl.model_type, tl.status "
"FROM tenant_llm tl "
"INNER JOIN tenant_model_provider tmp ON tmp.tenant_id = tl.tenant_id AND tmp.provider_name = tl.llm_factory "
"INNER JOIN tenant_model_instance tmi ON tmi.provider_id = tmp.id AND tmi.api_key = tl.api_key "
"WHERE tl.status = '0' "
"AND NOT EXISTS ("
" SELECT 1 FROM tenant_model tm "
" WHERE tm.provider_id = tmp.id AND tm.model_name = tl.llm_name AND tm.instance_id = tmi.id"
")"
)
records = cursor.fetchall()
if not records:
logger.info("No records to migrate")
return 0, []
logger.info(f"Migrating {len(records)} tenant_model records...")
if self.dry_run:
logger.info(f"[DRY RUN] Would insert {len(records)} records")
for source_id, llm_name, provider_id, instance_id, model_type, status in records[:5]:
logger.info(f" model_name={llm_name}, provider_id={provider_id}, "
f"instance_id={instance_id}, model_type={model_type}")
if len(records) > 5:
logger.info(f" ... and {len(records) - 5} more records")
return len(records), self.target_tables
# Insert records in batches
batch_size = 100
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
values = []
for source_id, llm_name, provider_id, instance_id, model_type, status in batch:
record_id = self.generate_uuid()
model_name_escaped = llm_name.replace("'", "''") if llm_name else ""
model_type_escaped = model_type.replace("'", "''") if model_type else ""
status_val = status if status else "active"
values.append(f"('{record_id}', '{model_name_escaped}', '{provider_id}', "
f"'{instance_id}', '{model_type_escaped}', '{status_val}', "
f"{current_ts}, FROM_UNIXTIME({current_ts}), "
f"{current_ts}, FROM_UNIXTIME({current_ts}))")
insert_sql = f"""
INSERT INTO tenant_model
(id, model_name, provider_id, instance_id, model_type, status,
create_time, create_date, update_time, update_date)
VALUES {', '.join(values)}
"""
self.db.execute_sql(insert_sql)
rows_inserted += len(batch)
logger.info(f"Inserted batch {i // batch_size + 1}: {len(batch)} records")
return rows_inserted, self.target_tables
def create_target_table(self):
"""Create tenant_model table"""
create_sql = """
CREATE TABLE IF NOT EXISTS tenant_model (
id VARCHAR(32) NOT NULL PRIMARY KEY,
model_name VARCHAR(128),
provider_id VARCHAR(32) NOT NULL,
instance_id VARCHAR(32) NOT NULL,
model_type VARCHAR(32) NOT NULL,
status VARCHAR(32) DEFAULT 'active',
create_time BIGINT,
create_date DATETIME,
update_time BIGINT,
update_date DATETIME,
INDEX idx_instance_id (instance_id),
UNIQUE INDEX idx_provider_model_instance (provider_id, model_name, instance_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
"""
self.db.execute_sql(create_sql)
logger.info("Created tenant_model table")
# Registry of available migration stages
MIGRATION_STAGES = {
'tenant_model_provider': TenantModelProviderStage,
'tenant_model_instance': TenantModelInstanceStage,
'tenant_model': TenantModelStage,
}
@@ -394,7 +747,7 @@ def run_migration(config: MigrationConfig, stages: list, dry_run: bool = True,
total_stages = len(stages)
for idx, stage_name in enumerate(stages, 1):
logger.info(f"\n{'=' * 60}")
logger.info(f"{'=' * 60}")
logger.info(f"Stage [{idx}/{total_stages}]: {stage_name}")
logger.info(f"{'=' * 60}")