Initial commit with translated description

This commit is contained in:
2026-03-29 14:33:06 +08:00
commit 3fbcdb9641
7 changed files with 2287 additions and 0 deletions

338
README.md Normal file
View File

@@ -0,0 +1,338 @@
# n8n Enhanced Workflow Management Skill
Comprehensive n8n automation skill with workflow creation, testing, execution monitoring, and performance optimization capabilities.
## Features
### ✨ Workflow Creation
- **Template Library:** 6 pre-built SaaS automation workflows
- **Interactive Builder:** Guided workflow creation
- **JSON Import:** Create from existing workflow files
- **Programmatic API:** Build workflows in Python
### 🧪 Testing & Validation
- **Structure Validation:** Check workflow integrity
- **Dry-Run Testing:** Test with sample data before activation
- **Test Suites:** Run multiple test cases
- **Validation Reports:** Detailed error and warning reports
### 📊 Execution Monitoring
- **Real-time Tracking:** Monitor workflow execution status
- **Execution Logs:** Detailed execution history
- **Error Analysis:** Identify and debug failures
- **Retry Logic:** Built-in failure handling
### ⚡ Performance Optimization
- **Performance Analysis:** Comprehensive workflow metrics
- **Bottleneck Detection:** Identify slow operations
- **Optimization Suggestions:** Actionable improvement recommendations
- **Performance Scoring:** 0-100 workflow health score
## Quick Start
### 1. Setup
```bash
# Set environment variables
export N8N_API_KEY="your-api-key"
export N8N_BASE_URL="your-n8n-url-here"
# Verify connection
python3 scripts/n8n_api.py list-workflows --pretty
```
### 2. Create Your First Workflow
```bash
# Deploy a template
python3 scripts/n8n_api.py create --from-template waitlist-pipeline
# Output: {"id": "123", "name": "Waitlist to Customer Pipeline", ...}
```
### 3. Test Before Activating
```bash
# Validate structure
python3 scripts/n8n_tester.py validate --id 123
# Dry run with test data
python3 scripts/n8n_tester.py dry-run --id 123 \
--data '{"email": "test@example.com", "name": "Test User"}'
```
### 4. Activate & Monitor
```bash
# Activate workflow
python3 scripts/n8n_api.py activate --id 123
# Monitor executions
python3 scripts/n8n_api.py list-executions --id 123 --limit 10 --pretty
```
### 5. Optimize Performance
```bash
# Generate optimization report
python3 scripts/n8n_optimizer.py report --id 123
```
## Available Templates
| Template | Description | Trigger | Key Features |
|----------|-------------|---------|--------------|
| **waitlist-pipeline** | Waitlist to customer automation | Webhook | Email validation, CRM integration, welcome emails |
| **product-hunt** | Monitor Product Hunt launches | Schedule (hourly) | Vote filtering, Slack notifications, Sheets logging |
| **social-media-crosspost** | Multi-platform posting | Webhook | Twitter, LinkedIn, Facebook parallel posting |
| **revenue-dashboard** | Revenue data collection | Schedule (daily) | Stripe integration, Sheets updates |
| **customer-onboarding** | Multi-day email sequence | Webhook | Time-delayed follow-ups, progressive engagement |
| **lead-scraping** | Lead generation pipeline | Schedule (daily) | Web scraping, data enrichment, DB storage |
## CLI Commands
### Workflow Management
```bash
# List all workflows
python3 scripts/n8n_api.py list-workflows --pretty
# Get workflow details
python3 scripts/n8n_api.py get-workflow --id <id> --pretty
# Create from template
python3 scripts/n8n_api.py create --from-template waitlist-pipeline
# Create from file
python3 scripts/n8n_api.py create --from-file workflow.json
# Activate/Deactivate
python3 scripts/n8n_api.py activate --id <id>
python3 scripts/n8n_api.py deactivate --id <id>
```
### Testing & Validation
```bash
# Validate workflow
python3 scripts/n8n_tester.py validate --id <id> --pretty
# Validate from file
python3 scripts/n8n_tester.py validate --file workflow.json --pretty
# Dry run with data
python3 scripts/n8n_tester.py dry-run --id <id> --data '{"key": "value"}'
# Dry run with file
python3 scripts/n8n_tester.py dry-run --id <id> --data-file test-data.json
# Generate test report
python3 scripts/n8n_tester.py report --id <id>
# Run test suite
python3 scripts/n8n_tester.py test-suite --id <id> --test-suite tests.json
```
### Execution Monitoring
```bash
# List executions
python3 scripts/n8n_api.py list-executions --limit 20 --pretty
# Get execution details
python3 scripts/n8n_api.py get-execution --id <exec-id> --pretty
# Execute manually
python3 scripts/n8n_api.py execute --id <workflow-id>
# Execute with data
python3 scripts/n8n_api.py execute --id <id> --data '{"key": "value"}'
# Get statistics
python3 scripts/n8n_api.py stats --id <id> --days 7 --pretty
```
### Performance Optimization
```bash
# Analyze performance
python3 scripts/n8n_optimizer.py analyze --id <id> --pretty
# Get suggestions
python3 scripts/n8n_optimizer.py suggest --id <id> --pretty
# Generate report
python3 scripts/n8n_optimizer.py report --id <id>
```
## Python API
### Basic Usage
```python
from scripts.n8n_api import N8nClient
client = N8nClient()
# List workflows
workflows = client.list_workflows(active=True)
print(f"Active workflows: {len(workflows)}")
# Create workflow
workflow = client.create_workflow({
'name': 'My Workflow',
'nodes': [...],
'connections': {...}
})
# Execute workflow
result = client.execute_workflow(workflow['id'], data={'test': True})
```
### Testing
```python
from scripts.n8n_tester import WorkflowTester
tester = WorkflowTester()
# Validate
validation = tester.validate_workflow(workflow_id='123')
if validation['valid']:
print("✓ Workflow is valid")
else:
print(f"✗ Errors: {validation['errors']}")
# Dry run
result = tester.dry_run('123', test_data={'email': 'test@example.com'})
print(f"Status: {result['status']}")
```
### Optimization
```python
from scripts.n8n_optimizer import WorkflowOptimizer
optimizer = WorkflowOptimizer()
# Analyze
analysis = optimizer.analyze_performance('123', days=30)
print(f"Performance Score: {analysis['performance_score']}/100")
print(f"Health: {analysis['execution_metrics']['health']}")
# Get suggestions
suggestions = optimizer.suggest_optimizations('123')
print(f"Priority Actions: {len(suggestions['priority_actions'])}")
```
## Common Workflows
### Create → Test → Deploy
```bash
# 1. Create from template
python3 scripts/n8n_api.py create --from-template waitlist-pipeline > workflow.json
WORKFLOW_ID=$(cat workflow.json | jq -r '.id')
# 2. Validate structure
python3 scripts/n8n_tester.py validate --id $WORKFLOW_ID
# 3. Test with sample data
python3 scripts/n8n_tester.py dry-run --id $WORKFLOW_ID \
--data '{"email": "test@example.com", "name": "Test User"}' --report
# 4. If tests pass, activate
python3 scripts/n8n_api.py activate --id $WORKFLOW_ID
```
### Debug Failed Workflow
```bash
# 1. Check recent executions
python3 scripts/n8n_api.py list-executions --id $WORKFLOW_ID --limit 5
# 2. Get execution details
python3 scripts/n8n_api.py get-execution --id $EXEC_ID --pretty
# 3. Validate workflow structure
python3 scripts/n8n_tester.py report --id $WORKFLOW_ID
# 4. Check for optimization issues
python3 scripts/n8n_optimizer.py report --id $WORKFLOW_ID
```
### Optimize Performance
```bash
# 1. Analyze current state
python3 scripts/n8n_optimizer.py analyze --id $WORKFLOW_ID --days 30 --pretty
# 2. Get recommendations
python3 scripts/n8n_optimizer.py suggest --id $WORKFLOW_ID --pretty
# 3. Generate full report
python3 scripts/n8n_optimizer.py report --id $WORKFLOW_ID > optimization-report.txt
# 4. Review execution stats
python3 scripts/n8n_api.py stats --id $WORKFLOW_ID --days 30 --pretty
```
## File Structure
```
~/clawd/skills/n8n/
├── README.md # This file
├── SKILL.md # Comprehensive documentation
├── scripts/
│ ├── n8n_api.py # Core API client
│ ├── n8n_tester.py # Testing & validation
│ └── n8n_optimizer.py # Performance optimization
├── templates/
│ ├── README.md # Template documentation
│ ├── *.json # Workflow templates
│ └── test-data-*.json # Test data files
└── references/
└── api.md # API reference
```
## Best Practices
1. **Always validate before activating:** Catch errors early
2. **Test with sample data:** Use dry-run to verify behavior
3. **Monitor execution metrics:** Track success rates and failures
4. **Regular optimization reviews:** Monthly performance analysis
5. **Use templates as starting points:** Proven patterns save time
6. **Document customizations:** Keep changelog of modifications
7. **Implement error handling:** Add error nodes for reliability
8. **Gradual rollout:** Test with limited traffic initially
## Troubleshooting
| Issue | Solution |
|-------|----------|
| Authentication error | Set `N8N_API_KEY` environment variable |
| Connection error | Verify `N8N_BASE_URL` and network access |
| Validation errors | Check workflow JSON structure |
| Execution timeout | Optimize expensive operations, reduce dataset size |
| Rate limiting | Add Wait nodes, implement backoff |
| Missing credentials | Configure in n8n UI, assign to nodes |
## Resources
- **Documentation:** [SKILL.md](SKILL.md)
- **Templates:** [templates/README.md](templates/README.md)
- **n8n Docs:** https://docs.n8n.io
- **n8n API:** https://docs.n8n.io/api/
- **n8n Community:** https://community.n8n.io
## Support
For issues or questions:
1. Check validation output: `python3 scripts/n8n_tester.py validate --id <id>`
2. Review execution logs: `python3 scripts/n8n_api.py get-execution --id <exec-id>`
3. Generate optimization report: `python3 scripts/n8n_optimizer.py report --id <id>`
4. Consult [SKILL.md](SKILL.md) for detailed documentation
## License
Part of the Clawdbot skills library.

537
SKILL.md Normal file
View File

@@ -0,0 +1,537 @@
---
name: n8n
description: "通过API管理n8n工作流和自动化。"
metadata: {"openclaw":{"emoji":"\u2699\ufe0f","requires":{"env":["N8N_API_KEY","N8N_BASE_URL"]},"primaryEnv":"N8N_API_KEY"}}
---
# n8n Workflow Management
Comprehensive workflow automation management for n8n platform with creation, testing, execution monitoring, and performance optimization capabilities.
## ⚠️ CRITICAL: Workflow Creation Rules
**When creating n8n workflows, ALWAYS:**
1.**Generate COMPLETE workflows** with all functional nodes
2.**Include actual HTTP Request nodes** for API calls (ImageFX, Gemini, Veo, Suno, etc.)
3.**Add Code nodes** for data transformation and logic
4.**Create proper connections** between all nodes
5.**Use real node types** (n8n-nodes-base.httpRequest, n8n-nodes-base.code, n8n-nodes-base.set)
**NEVER:**
- ❌ Create "Setup Instructions" placeholder nodes
- ❌ Generate workflows with only TODO comments
- ❌ Make incomplete workflows requiring manual node addition
- ❌ Use text-only nodes as substitutes for real functionality
**Example GOOD workflow:**
```
Manual Trigger → Set Config → HTTP Request (API call) → Code (parse) → Response
```
**Example BAD workflow:**
```
Manual Trigger → Code ("Add HTTP nodes here, configure APIs...")
```
Always build the complete, functional workflow with all necessary nodes configured and connected.
## Setup
**Required environment variables:**
- `N8N_API_KEY` — Your n8n API key (Settings → API in the n8n UI)
- `N8N_BASE_URL` — Your n8n instance URL
**Configure credentials via OpenClaw settings:**
Add to `~/.config/openclaw/settings.json`:
```json
{
"skills": {
"n8n": {
"env": {
"N8N_API_KEY": "your-api-key-here",
"N8N_BASE_URL": "your-n8n-url-here"
}
}
}
}
```
Or set per-session (do **not** persist secrets in shell rc files):
```bash
export N8N_API_KEY="your-api-key-here"
export N8N_BASE_URL="your-n8n-url-here"
```
**Verify connection:**
```bash
python3 scripts/n8n_api.py list-workflows --pretty
```
> **Security note:** Never store API keys in plaintext shell config files (`~/.bashrc`, `~/.zshrc`). Use the OpenClaw settings file or a secure secret manager.
## Quick Reference
### Workflow Management
#### List Workflows
```bash
python3 scripts/n8n_api.py list-workflows --pretty
python3 scripts/n8n_api.py list-workflows --active true --pretty
```
#### Get Workflow Details
```bash
python3 scripts/n8n_api.py get-workflow --id <workflow-id> --pretty
```
#### Create Workflows
```bash
# From JSON file
python3 scripts/n8n_api.py create --from-file workflow.json
```
#### Activate/Deactivate
```bash
python3 scripts/n8n_api.py activate --id <workflow-id>
python3 scripts/n8n_api.py deactivate --id <workflow-id>
```
### Testing & Validation
#### Validate Workflow Structure
```bash
# Validate existing workflow
python3 scripts/n8n_tester.py validate --id <workflow-id>
# Validate from file
python3 scripts/n8n_tester.py validate --file workflow.json --pretty
# Generate validation report
python3 scripts/n8n_tester.py report --id <workflow-id>
```
#### Dry Run Testing
```bash
# Test with data
python3 scripts/n8n_tester.py dry-run --id <workflow-id> --data '{"email": "test@example.com"}'
# Test with data file
python3 scripts/n8n_tester.py dry-run --id <workflow-id> --data-file test-data.json
# Full test report (validation + dry run)
python3 scripts/n8n_tester.py dry-run --id <workflow-id> --data-file test.json --report
```
#### Test Suite
```bash
# Run multiple test cases
python3 scripts/n8n_tester.py test-suite --id <workflow-id> --test-suite test-cases.json
```
### Execution Monitoring
#### List Executions
```bash
# Recent executions (all workflows)
python3 scripts/n8n_api.py list-executions --limit 10 --pretty
# Specific workflow executions
python3 scripts/n8n_api.py list-executions --id <workflow-id> --limit 20 --pretty
```
#### Get Execution Details
```bash
python3 scripts/n8n_api.py get-execution --id <execution-id> --pretty
```
#### Manual Execution
```bash
# Trigger workflow
python3 scripts/n8n_api.py execute --id <workflow-id>
# Execute with data
python3 scripts/n8n_api.py execute --id <workflow-id> --data '{"key": "value"}'
```
### Performance Optimization
#### Analyze Performance
```bash
# Full performance analysis
python3 scripts/n8n_optimizer.py analyze --id <workflow-id> --pretty
# Analyze specific period
python3 scripts/n8n_optimizer.py analyze --id <workflow-id> --days 30 --pretty
```
#### Get Optimization Suggestions
```bash
# Priority-ranked suggestions
python3 scripts/n8n_optimizer.py suggest --id <workflow-id> --pretty
```
#### Generate Optimization Report
```bash
# Human-readable report with metrics, bottlenecks, and suggestions
python3 scripts/n8n_optimizer.py report --id <workflow-id>
```
#### Get Workflow Statistics
```bash
# Execution statistics
python3 scripts/n8n_api.py stats --id <workflow-id> --days 7 --pretty
```
## Python API
### Basic Usage
```python
from scripts.n8n_api import N8nClient
client = N8nClient()
# List workflows
workflows = client.list_workflows(active=True)
# Get workflow
workflow = client.get_workflow('workflow-id')
# Create workflow
new_workflow = client.create_workflow({
'name': 'My Workflow',
'nodes': [...],
'connections': {...}
})
# Activate/deactivate
client.activate_workflow('workflow-id')
client.deactivate_workflow('workflow-id')
# Executions
executions = client.list_executions(workflow_id='workflow-id', limit=10)
execution = client.get_execution('execution-id')
# Execute workflow
result = client.execute_workflow('workflow-id', data={'key': 'value'})
```
### Validation & Testing
```python
from scripts.n8n_api import N8nClient
from scripts.n8n_tester import WorkflowTester
client = N8nClient()
tester = WorkflowTester(client)
# Validate workflow
validation = tester.validate_workflow(workflow_id='123')
print(f"Valid: {validation['valid']}")
print(f"Errors: {validation['errors']}")
print(f"Warnings: {validation['warnings']}")
# Dry run
result = tester.dry_run(
workflow_id='123',
test_data={'email': 'test@example.com'}
)
print(f"Status: {result['status']}")
# Test suite
test_cases = [
{'name': 'Test 1', 'input': {...}, 'expected': {...}},
{'name': 'Test 2', 'input': {...}, 'expected': {...}}
]
results = tester.test_suite('123', test_cases)
print(f"Passed: {results['passed']}/{results['total_tests']}")
# Generate report
report = tester.generate_test_report(validation, result)
print(report)
```
### Performance Optimization
```python
from scripts.n8n_optimizer import WorkflowOptimizer
optimizer = WorkflowOptimizer()
# Analyze performance
analysis = optimizer.analyze_performance('workflow-id', days=7)
print(f"Performance Score: {analysis['performance_score']}/100")
print(f"Health: {analysis['execution_metrics']['health']}")
# Get suggestions
suggestions = optimizer.suggest_optimizations('workflow-id')
print(f"Priority Actions: {len(suggestions['priority_actions'])}")
print(f"Quick Wins: {len(suggestions['quick_wins'])}")
# Generate report
report = optimizer.generate_optimization_report(analysis)
print(report)
```
## Common Workflows
### 1. Validate and Test Workflow
```bash
# Validate workflow structure
python3 scripts/n8n_tester.py validate --id <workflow-id> --pretty
# Test with sample data
python3 scripts/n8n_tester.py dry-run --id <workflow-id> \
--data '{"email": "test@example.com", "name": "Test User"}'
# If tests pass, activate
python3 scripts/n8n_api.py activate --id <workflow-id>
```
### 2. Debug Failed Workflow
```bash
# Check recent executions
python3 scripts/n8n_api.py list-executions --id <workflow-id> --limit 10 --pretty
# Get specific execution details
python3 scripts/n8n_api.py get-execution --id <execution-id> --pretty
# Validate workflow structure
python3 scripts/n8n_tester.py validate --id <workflow-id>
# Generate test report
python3 scripts/n8n_tester.py report --id <workflow-id>
# Check for optimization issues
python3 scripts/n8n_optimizer.py report --id <workflow-id>
```
### 3. Optimize Workflow Performance
```bash
# Analyze current performance
python3 scripts/n8n_optimizer.py analyze --id <workflow-id> --days 30 --pretty
# Get actionable suggestions
python3 scripts/n8n_optimizer.py suggest --id <workflow-id> --pretty
# Generate comprehensive report
python3 scripts/n8n_optimizer.py report --id <workflow-id>
# Review execution statistics
python3 scripts/n8n_api.py stats --id <workflow-id> --days 30 --pretty
# Test optimizations with dry run
python3 scripts/n8n_tester.py dry-run --id <workflow-id> --data-file test-data.json
```
### 4. Monitor Workflow Health
```bash
# Check active workflows
python3 scripts/n8n_api.py list-workflows --active true --pretty
# Review recent execution status
python3 scripts/n8n_api.py list-executions --limit 20 --pretty
# Get statistics for each critical workflow
python3 scripts/n8n_api.py stats --id <workflow-id> --pretty
# Generate health reports
python3 scripts/n8n_optimizer.py report --id <workflow-id>
```
## Validation Checks
The testing module performs comprehensive validation:
### Structure Validation
- ✓ Required fields present (nodes, connections)
- ✓ All nodes have names and types
- ✓ Connection targets exist
- ✓ No disconnected nodes (warning)
### Configuration Validation
- ✓ Nodes requiring credentials are configured
- ✓ Required parameters are set
- ✓ HTTP nodes have URLs
- ✓ Webhook nodes have paths
- ✓ Email nodes have content
### Flow Validation
- ✓ Workflow has trigger nodes
- ✓ Proper execution flow
- ✓ No circular dependencies
- ✓ End nodes identified
## Optimization Analysis
The optimizer analyzes multiple dimensions:
### Execution Metrics
- Total executions
- Success/failure rates
- Health status (excellent/good/fair/poor)
- Error patterns
### Performance Metrics
- Node count and complexity
- Connection patterns
- Expensive operations (API calls, database queries)
- Parallel execution opportunities
### Bottleneck Detection
- Sequential expensive operations
- High failure rates
- Missing error handling
- Rate limit issues
### Optimization Opportunities
- **Parallel Execution:** Identify nodes that can run concurrently
- **Caching:** Suggest caching for repeated API calls
- **Batch Processing:** Recommend batching for large datasets
- **Error Handling:** Add error recovery mechanisms
- **Complexity Reduction:** Split complex workflows
- **Timeout Settings:** Configure execution limits
## Performance Scoring
Workflows receive a performance score (0-100) based on:
- **Success Rate:** Higher is better (50% weight)
- **Complexity:** Lower is better (30% weight)
- **Bottlenecks:** Fewer is better (critical: -20, high: -10, medium: -5)
- **Optimizations:** Implemented best practices (+5 each)
Score interpretation:
- **90-100:** Excellent - Well-optimized
- **70-89:** Good - Minor improvements possible
- **50-69:** Fair - Optimization recommended
- **0-49:** Poor - Significant issues
## Best Practices
### Development
1. **Plan Structure:** Design workflow nodes and connections before building
2. **Validate First:** Always validate before deployment
3. **Test Thoroughly:** Use dry-run with multiple test cases
4. **Error Handling:** Add error nodes for reliability
5. **Documentation:** Comment complex logic in Code nodes
### Testing
1. **Sample Data:** Create realistic test data files
2. **Edge Cases:** Test boundary conditions and errors
3. **Incremental:** Test each node addition
4. **Regression:** Retest after changes
5. **Production-like:** Use staging environment that mirrors production
### Deployment
1. **Inactive First:** Deploy workflows in inactive state
2. **Gradual Rollout:** Test with limited traffic initially
3. **Monitor Closely:** Watch first executions carefully
4. **Quick Rollback:** Be ready to deactivate if issues arise
5. **Document Changes:** Keep changelog of modifications
### Optimization
1. **Baseline Metrics:** Capture performance before changes
2. **One Change at a Time:** Isolate optimization impacts
3. **Measure Results:** Compare before/after metrics
4. **Regular Reviews:** Schedule monthly optimization reviews
5. **Cost Awareness:** Monitor API usage and execution costs
### Maintenance
1. **Health Checks:** Weekly execution statistics review
2. **Error Analysis:** Investigate failure patterns
3. **Performance Monitoring:** Track execution times
4. **Credential Rotation:** Update credentials regularly
5. **Cleanup:** Archive or delete unused workflows
## Troubleshooting
### Authentication Error
```
Error: N8N_API_KEY not found in environment
```
**Solution:** Set environment variable:
```bash
export N8N_API_KEY="your-api-key"
```
### Connection Error
```
Error: HTTP 401: Unauthorized
```
**Solution:**
1. Verify API key is correct
2. Check N8N_BASE_URL is set correctly
3. Confirm API access is enabled in n8n
### Validation Errors
```
Validation failed: Node missing 'name' field
```
**Solution:** Check workflow JSON structure, ensure all required fields present
### Execution Timeout
```
Status: timeout - Execution did not complete
```
**Solution:**
1. Check workflow for infinite loops
2. Reduce dataset size for testing
3. Optimize expensive operations
4. Set execution timeout in workflow settings
### Rate Limiting
```
Error: HTTP 429: Too Many Requests
```
**Solution:**
1. Add Wait nodes between API calls
2. Implement exponential backoff
3. Use batch processing
4. Check API rate limits
### Missing Credentials
```
Warning: Node 'HTTP_Request' may require credentials
```
**Solution:**
1. Configure credentials in n8n UI
2. Assign credentials to node
3. Test connection before activating
## File Structure
```
~/clawd/skills/n8n/
├── SKILL.md # This file
├── scripts/
│ ├── n8n_api.py # Core API client (extended)
│ ├── n8n_tester.py # Testing & validation
│ └── n8n_optimizer.py # Performance optimization
└── references/
└── api.md # n8n API reference
```
## API Reference
For detailed n8n REST API documentation, see [references/api.md](references/api.md) or visit:
https://docs.n8n.io/api/
## Support
**Documentation:**
- n8n Official Docs: https://docs.n8n.io
- n8n Community Forum: https://community.n8n.io
- n8n API Reference: https://docs.n8n.io/api/
**Debugging:**
1. Use validation: `python3 scripts/n8n_tester.py validate --id <workflow-id>`
2. Check execution logs: `python3 scripts/n8n_api.py get-execution --id <execution-id>`
3. Review optimization report: `python3 scripts/n8n_optimizer.py report --id <workflow-id>`
4. Test with dry-run: `python3 scripts/n8n_tester.py dry-run --id <workflow-id> --data-file test.json`

6
_meta.json Normal file
View File

@@ -0,0 +1,6 @@
{
"ownerId": "kn797ds1ncqqb6bf6c9w26wtcn7z640c",
"slug": "n8n",
"version": "2.0.0",
"publishedAt": 1770748031005
}

156
references/api.md Normal file
View File

@@ -0,0 +1,156 @@
# n8n API Reference
## Authentication
The n8n API uses API key authentication via the `X-N8N-API-KEY` header.
API keys are managed in the n8n UI:
- Settings → API
- User profile → API
## Base URL
`N8N_BASE_URL`
Default: `${N8N_BASE_URL}$/api/v1`
## Endpoints
### Workflows
#### List Workflows
```
GET /workflows
Query params: ?active=true|false
```
Response:
```json
{
"data": [
{
"id": "123",
"name": "My Workflow",
"active": true,
"createdAt": "2026-01-14T10:00:00.000Z",
"updatedAt": "2026-01-14T12:00:00.000Z"
}
]
}
```
#### Get Workflow
```
GET /workflows/{id}
```
#### Create Workflow
```
POST /workflows
Body: workflow JSON
```
#### Update Workflow
```
PATCH /workflows/{id}
Body: partial workflow JSON
```
#### Activate/Deactivate
```
PATCH /workflows/{id}
Body: {"active": true|false}
```
#### Delete Workflow
```
DELETE /workflows/{id}
```
### Executions
#### List Executions
```
GET /executions?limit=20&workflowId={id}
```
Response:
```json
{
"data": [
{
"id": "456",
"finished": true,
"mode": "trigger",
"startedAt": "2026-01-14T12:00:00.000Z",
"stoppedAt": "2026-01-14T12:00:05.000Z",
"workflowId": "123",
"status": "success"
}
]
}
```
#### Get Execution
```
GET /executions/{id}
```
#### Delete Execution
```
DELETE /executions/{id}
```
#### Manual Execution
```
POST /workflows/{id}/execute
Body: {"data": {...}}
```
## Common Patterns
### List Active Workflows
```bash
python3 scripts/n8n_api.py list-workflows --active true --pretty
```
### Get Workflow Details
```bash
python3 scripts/n8n_api.py get-workflow --id <workflow-id> --pretty
```
### Activate/Deactivate Workflow
```bash
python3 scripts/n8n_api.py activate --id <workflow-id>
python3 scripts/n8n_api.py deactivate --id <workflow-id>
```
### List Recent Executions
```bash
python3 scripts/n8n_api.py list-executions --limit 10 --pretty
```
### Manually Execute Workflow
```bash
python3 scripts/n8n_api.py execute --id <workflow-id>
```
With data:
```bash
python3 scripts/n8n_api.py execute --id <workflow-id> --data '{"key": "value"}'
```
## Error Handling
HTTP status codes:
- `200` - Success
- `400` - Bad request
- `401` - Unauthorized (invalid API key)
- `404` - Not found
- `500` - Server error
## Environment Variables
Required:
- `N8N_API_KEY` - n8n API key
- `N8N_BASE_URL` - Base URL

355
scripts/n8n_api.py Normal file
View File

@@ -0,0 +1,355 @@
#!/usr/bin/env python3
"""
n8n API client for Clawdbot
Manages workflows, executions, and credentials via n8n REST API
"""
import os
import sys
import json
import argparse
import requests
from pathlib import Path
from typing import Optional, Dict, Any, List
class N8nClient:
"""n8n API client"""
def __init__(self, base_url: str = None, api_key: str = None):
self.base_url = base_url or os.getenv('N8N_BASE_URL')
self.api_key = api_key or os.getenv('N8N_API_KEY')
if not self.api_key:
raise ValueError("N8N_API_KEY not found in environment")
self.session = requests.Session()
self.session.headers.update({
'X-N8N-API-KEY': self.api_key,
'Accept': 'application/json',
'Content-Type': 'application/json'
})
def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
"""Make API request"""
url = f"{self.base_url}/api/v1/{endpoint.lstrip('/')}"
response = self.session.request(method, url, **kwargs)
try:
response.raise_for_status()
return response.json() if response.content else {}
except requests.exceptions.HTTPError as e:
error_msg = f"HTTP {response.status_code}: {response.text}"
raise Exception(error_msg) from e
# Workflows
def list_workflows(self, active: bool = None) -> List[Dict]:
"""List all workflows"""
params = {}
if active is not None:
params['active'] = str(active).lower()
return self._request('GET', 'workflows', params=params)
def get_workflow(self, workflow_id: str) -> Dict:
"""Get workflow details"""
return self._request('GET', f'workflows/{workflow_id}')
def create_workflow(self, workflow_data: Dict) -> Dict:
"""Create new workflow"""
# Remove read-only fields that n8n API doesn't accept on create
clean_data = workflow_data.copy()
clean_data.pop('active', None)
clean_data.pop('id', None)
return self._request('POST', 'workflows', json=clean_data)
def update_workflow(self, workflow_id: str, workflow_data: Dict) -> Dict:
"""Update existing workflow"""
return self._request('PATCH', f'workflows/{workflow_id}', json=workflow_data)
def delete_workflow(self, workflow_id: str) -> Dict:
"""Delete workflow"""
return self._request('DELETE', f'workflows/{workflow_id}')
def activate_workflow(self, workflow_id: str) -> Dict:
"""Activate workflow"""
return self._request('PATCH', f'workflows/{workflow_id}', json={'active': True})
def deactivate_workflow(self, workflow_id: str) -> Dict:
"""Deactivate workflow"""
return self._request('PATCH', f'workflows/{workflow_id}', json={'active': False})
# Executions
def list_executions(self, workflow_id: str = None, limit: int = 20) -> List[Dict]:
"""List workflow executions"""
params = {'limit': limit}
if workflow_id:
params['workflowId'] = workflow_id
return self._request('GET', 'executions', params=params)
def get_execution(self, execution_id: str) -> Dict:
"""Get execution details"""
return self._request('GET', f'executions/{execution_id}')
def delete_execution(self, execution_id: str) -> Dict:
"""Delete execution"""
return self._request('DELETE', f'executions/{execution_id}')
# Manual execution
def execute_workflow(self, workflow_id: str, data: Dict = None) -> Dict:
"""Manually trigger workflow execution"""
payload = {'workflowId': workflow_id}
if data:
payload['data'] = data
return self._request('POST', f'workflows/{workflow_id}/execute', json=payload)
# Testing & Validation
def validate_workflow(self, workflow_data: Dict) -> Dict:
"""Validate workflow structure and configuration"""
issues = {
'errors': [],
'warnings': [],
'valid': True
}
# Check required fields
if 'nodes' not in workflow_data:
issues['errors'].append("Missing 'nodes' field")
issues['valid'] = False
return issues
nodes = workflow_data.get('nodes', [])
connections = workflow_data.get('connections', {})
# Validate nodes
node_names = set()
for node in nodes:
if 'name' not in node:
issues['errors'].append("Node missing 'name' field")
issues['valid'] = False
else:
node_names.add(node['name'])
if 'type' not in node:
issues['errors'].append(f"Node '{node.get('name', 'unknown')}' missing 'type' field")
issues['valid'] = False
# Check for required credentials
if node.get('type', '').startswith('n8n-nodes-base'):
credentials = node.get('credentials', {})
if not credentials and node['type'] not in ['n8n-nodes-base.start', 'n8n-nodes-base.set']:
issues['warnings'].append(f"Node '{node['name']}' may require credentials")
# Validate connections
for source_node, targets in connections.items():
if source_node not in node_names:
issues['errors'].append(f"Connection references non-existent source node: {source_node}")
issues['valid'] = False
for output_type, output_connections in targets.items():
for conn_list in output_connections:
for conn in conn_list:
target_node = conn.get('node')
if target_node and target_node not in node_names:
issues['errors'].append(f"Connection references non-existent target node: {target_node}")
issues['valid'] = False
# Check for disconnected nodes
connected_nodes = set(connections.keys())
for targets in connections.values():
for output_connections in targets.values():
for conn_list in output_connections:
for conn in conn_list:
connected_nodes.add(conn.get('node'))
disconnected = node_names - connected_nodes
if disconnected and len(nodes) > 1:
for node in disconnected:
issues['warnings'].append(f"Node '{node}' appears to be disconnected")
return issues
def dry_run_workflow(self, workflow_id: str, test_data: Dict = None) -> Dict:
"""Test workflow execution with mock data (creates temp execution)"""
# Execute workflow with test data and return results
result = self.execute_workflow(workflow_id, data=test_data)
return {
'execution_id': result.get('data', {}).get('executionId'),
'status': 'initiated',
'test_data': test_data
}
# Optimization & Analytics
def get_workflow_statistics(self, workflow_id: str, days: int = 7) -> Dict:
"""Get workflow execution statistics"""
executions = self.list_executions(workflow_id=workflow_id, limit=100)
stats = {
'total_executions': len(executions),
'successful': 0,
'failed': 0,
'execution_times': [],
'error_patterns': {}
}
for execution in executions:
status = execution.get('finished')
if status:
stats['successful'] += 1
else:
stats['failed'] += 1
error = execution.get('data', {}).get('resultData', {}).get('error', {}).get('message', 'Unknown error')
stats['error_patterns'][error] = stats['error_patterns'].get(error, 0) + 1
# Execution time
start = execution.get('startedAt')
stop = execution.get('stoppedAt')
if start and stop:
# Calculate duration (simplified)
stats['execution_times'].append({'start': start, 'stop': stop})
# Calculate success rate
if stats['total_executions'] > 0:
stats['success_rate'] = (stats['successful'] / stats['total_executions']) * 100
else:
stats['success_rate'] = 0
return stats
def analyze_workflow_performance(self, workflow_id: str) -> Dict:
"""Analyze workflow performance and identify bottlenecks"""
workflow = self.get_workflow(workflow_id)
executions = self.list_executions(workflow_id=workflow_id, limit=10)
analysis = {
'node_count': len(workflow.get('nodes', [])),
'connection_count': sum(len(conns) for conns in workflow.get('connections', {}).values()),
'parallel_opportunities': [],
'bottlenecks': [],
'optimization_suggestions': []
}
# Analyze node structure for parallel opportunities
nodes = workflow.get('nodes', [])
connections = workflow.get('connections', {})
# Find nodes that could be parallelized
for node in nodes:
node_name = node['name']
if node_name in connections:
outputs = connections[node_name]
# If node has multiple outputs, suggest parallel execution
total_connections = sum(len(conn_list) for output in outputs.values() for conn_list in output)
if total_connections > 1:
analysis['parallel_opportunities'].append({
'node': node_name,
'connection_count': total_connections,
'suggestion': 'Consider using Split In Batches for parallel processing'
})
# Analyze execution patterns
if executions:
analysis['optimization_suggestions'].append({
'type': 'monitoring',
'suggestion': 'Enable execution data retention for better debugging'
})
# Check for common anti-patterns
for node in nodes:
if node.get('type') == 'n8n-nodes-base.httpRequest':
analysis['optimization_suggestions'].append({
'type': 'caching',
'node': node['name'],
'suggestion': 'Consider caching HTTP responses for frequently accessed data'
})
return analysis
def main():
parser = argparse.ArgumentParser(description='n8n API Client')
parser.add_argument('action', choices=[
'list-workflows', 'get-workflow', 'create', 'activate', 'deactivate',
'list-executions', 'get-execution', 'execute', 'validate', 'stats'
])
parser.add_argument('--id', help='Workflow or execution ID')
parser.add_argument('--active', type=lambda x: x.lower() == 'true', help='Filter by active status')
parser.add_argument('--limit', type=int, default=20, help='Limit results')
parser.add_argument('--data', help='JSON data for execution')
parser.add_argument('--from-file', help='Create workflow from JSON file')
parser.add_argument('--from-template', help='Create workflow from template name')
parser.add_argument('--days', type=int, default=7, help='Days for statistics')
parser.add_argument('--pretty', action='store_true', help='Pretty print JSON output')
args = parser.parse_args()
try:
client = N8nClient()
result = None
if args.action == 'list-workflows':
result = client.list_workflows(active=args.active)
elif args.action == 'get-workflow':
if not args.id:
raise ValueError("--id required for get-workflow")
result = client.get_workflow(args.id)
elif args.action == 'create':
if args.from_file:
with open(args.from_file, 'r') as f:
workflow_data = json.load(f)
result = client.create_workflow(workflow_data)
elif args.from_template:
# Load template
template_path = Path(__file__).parent.parent / 'templates' / f'{args.from_template}.json'
if not template_path.exists():
raise ValueError(f"Template not found: {args.from_template}")
with open(template_path, 'r') as f:
workflow_data = json.load(f)
result = client.create_workflow(workflow_data)
else:
raise ValueError("--from-file or --from-template required for create")
elif args.action == 'activate':
if not args.id:
raise ValueError("--id required for activate")
result = client.activate_workflow(args.id)
elif args.action == 'deactivate':
if not args.id:
raise ValueError("--id required for deactivate")
result = client.deactivate_workflow(args.id)
elif args.action == 'list-executions':
result = client.list_executions(workflow_id=args.id, limit=args.limit)
elif args.action == 'get-execution':
if not args.id:
raise ValueError("--id required for get-execution")
result = client.get_execution(args.id)
elif args.action == 'execute':
if not args.id:
raise ValueError("--id required for execute")
data = json.loads(args.data) if args.data else None
result = client.execute_workflow(args.id, data=data)
elif args.action == 'validate':
if args.from_file:
with open(args.from_file, 'r') as f:
workflow_data = json.load(f)
elif args.id:
workflow_data = client.get_workflow(args.id)
else:
raise ValueError("--id or --from-file required for validate")
result = client.validate_workflow(workflow_data)
elif args.action == 'stats':
if not args.id:
raise ValueError("--id required for stats")
result = client.get_workflow_statistics(args.id, days=args.days)
# Output
if args.pretty:
print(json.dumps(result, indent=2))
else:
print(json.dumps(result))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

479
scripts/n8n_optimizer.py Normal file
View File

@@ -0,0 +1,479 @@
#!/usr/bin/env python3
"""
n8n Workflow Optimizer
Analyze and optimize workflow performance
"""
import sys
import json
import argparse
from datetime import datetime, timedelta
from typing import Dict, List, Any
from collections import defaultdict
# Import N8nClient - handle both direct and module imports
try:
from n8n_api import N8nClient
except ImportError:
from scripts.n8n_api import N8nClient
class WorkflowOptimizer:
"""Workflow performance analyzer and optimizer"""
def __init__(self, client: N8nClient = None):
self.client = client or N8nClient()
def analyze_performance(self, workflow_id: str, days: int = 7) -> Dict:
"""Comprehensive performance analysis"""
workflow = self.client.get_workflow(workflow_id)
statistics = self.client.get_workflow_statistics(workflow_id, days=days)
analysis = {
'workflow_id': workflow_id,
'workflow_name': workflow.get('name'),
'analysis_period_days': days,
'execution_metrics': self._analyze_execution_metrics(statistics),
'node_analysis': self._analyze_nodes(workflow),
'connection_analysis': self._analyze_connections(workflow),
'performance_score': 0,
'bottlenecks': [],
'optimization_opportunities': []
}
# Identify bottlenecks
analysis['bottlenecks'] = self._identify_bottlenecks(workflow, statistics)
# Find optimization opportunities
analysis['optimization_opportunities'] = self._find_optimizations(workflow, statistics)
# Calculate performance score (0-100)
analysis['performance_score'] = self._calculate_performance_score(analysis)
return analysis
def _analyze_execution_metrics(self, statistics: Dict) -> Dict:
"""Analyze execution metrics"""
metrics = {
'total_executions': statistics.get('total_executions', 0),
'successful_executions': statistics.get('successful', 0),
'failed_executions': statistics.get('failed', 0),
'success_rate': statistics.get('success_rate', 0),
'failure_rate': 0
}
if metrics['total_executions'] > 0:
metrics['failure_rate'] = (metrics['failed_executions'] / metrics['total_executions']) * 100
# Categorize health
if metrics['success_rate'] >= 95:
metrics['health'] = 'excellent'
elif metrics['success_rate'] >= 80:
metrics['health'] = 'good'
elif metrics['success_rate'] >= 60:
metrics['health'] = 'fair'
else:
metrics['health'] = 'poor'
return metrics
def _analyze_nodes(self, workflow: Dict) -> Dict:
"""Analyze workflow nodes"""
nodes = workflow.get('nodes', [])
analysis = {
'total_nodes': len(nodes),
'node_types': defaultdict(int),
'complexity_score': 0,
'expensive_nodes': []
}
# Count node types
for node in nodes:
node_type = node.get('type', '').split('.')[-1]
analysis['node_types'][node_type] += 1
# Identify potentially expensive operations
expensive_types = [
'httpRequest',
'postgres',
'mysql',
'mongodb',
'googleSheets',
'airtable',
'webhook'
]
for node in nodes:
node_type = node.get('type', '')
for exp_type in expensive_types:
if exp_type in node_type:
analysis['expensive_nodes'].append({
'name': node.get('name'),
'type': node_type,
'reason': self._get_expense_reason(exp_type)
})
# Calculate complexity score
analysis['complexity_score'] = self._calculate_complexity(workflow)
return analysis
def _get_expense_reason(self, node_type: str) -> str:
"""Get reason why node type is expensive"""
reasons = {
'httpRequest': 'External API calls can be slow and rate-limited',
'postgres': 'Database queries can be slow with large datasets',
'mysql': 'Database queries can be slow with large datasets',
'mongodb': 'Database queries can be slow with large datasets',
'googleSheets': 'Google Sheets API has rate limits and can be slow',
'airtable': 'Airtable API has rate limits',
'webhook': 'Waiting for webhook responses can cause delays'
}
return reasons.get(node_type, 'Potentially expensive operation')
def _analyze_connections(self, workflow: Dict) -> Dict:
"""Analyze workflow connections"""
connections = workflow.get('connections', {})
analysis = {
'total_connections': 0,
'parallel_paths': 0,
'sequential_paths': 0,
'max_path_length': 0
}
# Count connections
for source, targets in connections.items():
for output_type, output_conns in targets.items():
for conn_list in output_conns:
analysis['total_connections'] += len(conn_list)
# Check for parallel paths
if len(conn_list) > 1:
analysis['parallel_paths'] += 1
return analysis
def _calculate_complexity(self, workflow: Dict) -> int:
"""Calculate workflow complexity score (0-100)"""
nodes = workflow.get('nodes', [])
connections = workflow.get('connections', {})
# Base complexity from node count
complexity = min(len(nodes) * 5, 50)
# Add complexity for connections
total_connections = sum(
len(conn)
for targets in connections.values()
for output_conns in targets.values()
for conn in output_conns
)
complexity += min(total_connections * 3, 30)
# Add complexity for conditional logic
for node in nodes:
if node.get('type') == 'n8n-nodes-base.if':
complexity += 5
elif node.get('type') == 'n8n-nodes-base.switch':
complexity += 10
return min(complexity, 100)
def _identify_bottlenecks(self, workflow: Dict, statistics: Dict) -> List[Dict]:
"""Identify performance bottlenecks"""
bottlenecks = []
nodes = workflow.get('nodes', [])
# Check for sequential expensive operations
expensive_types = ['httpRequest', 'postgres', 'mysql', 'mongodb']
expensive_nodes = [
node for node in nodes
if any(exp in node.get('type', '') for exp in expensive_types)
]
if len(expensive_nodes) > 3:
bottlenecks.append({
'type': 'sequential_expensive_operations',
'severity': 'high',
'description': f'Workflow has {len(expensive_nodes)} potentially expensive operations running sequentially',
'affected_nodes': [node['name'] for node in expensive_nodes],
'impact': 'High execution time'
})
# Check for high failure rate
if statistics.get('failed', 0) > statistics.get('successful', 0):
bottlenecks.append({
'type': 'high_failure_rate',
'severity': 'critical',
'description': 'Workflow has more failures than successes',
'impact': 'Unreliable execution'
})
# Check for missing error handling
has_error_handling = any(
node.get('type') in ['n8n-nodes-base.errorTrigger', 'n8n-nodes-base.if']
for node in nodes
)
if not has_error_handling and len(nodes) > 3:
bottlenecks.append({
'type': 'missing_error_handling',
'severity': 'medium',
'description': 'Workflow lacks error handling nodes',
'impact': 'Failures may not be handled gracefully'
})
return bottlenecks
def _find_optimizations(self, workflow: Dict, statistics: Dict) -> List[Dict]:
"""Find optimization opportunities"""
optimizations = []
nodes = workflow.get('nodes', [])
connections = workflow.get('connections', {})
# Opportunity 1: Parallel execution
for source_node, targets in connections.items():
for output_conns in targets.values():
for conn_list in output_conns:
if len(conn_list) > 1:
optimizations.append({
'type': 'parallel_execution',
'priority': 'high',
'description': f'Node "{source_node}" branches to multiple nodes - already optimized for parallel execution',
'node': source_node,
'benefit': 'Reduced execution time through parallelization'
})
# Opportunity 2: Caching
http_nodes = [node for node in nodes if 'httpRequest' in node.get('type', '')]
if http_nodes:
optimizations.append({
'type': 'caching',
'priority': 'medium',
'description': f'Found {len(http_nodes)} HTTP request nodes - consider caching responses',
'affected_nodes': [node['name'] for node in http_nodes],
'benefit': 'Reduced API calls and faster execution',
'implementation': 'Use Function or Code nodes to implement simple caching'
})
# Opportunity 3: Batch processing
loop_nodes = [node for node in nodes if 'loop' in node.get('type', '').lower()]
if not loop_nodes:
optimizations.append({
'type': 'batch_processing',
'priority': 'low',
'description': 'Consider using "Split In Batches" node for processing large datasets',
'benefit': 'Better memory management and parallel processing',
'implementation': 'Add "Split In Batches" node before expensive operations'
})
# Opportunity 4: Error handling
error_nodes = [node for node in nodes if 'error' in node.get('type', '').lower()]
if not error_nodes and len(nodes) > 3:
optimizations.append({
'type': 'error_handling',
'priority': 'high',
'description': 'Add error handling to improve reliability',
'benefit': 'Graceful error recovery and better debugging',
'implementation': 'Add "Error Trigger" or "IF" nodes to handle failures'
})
# Opportunity 5: Reduce complexity
complexity = self._calculate_complexity(workflow)
if complexity > 70:
optimizations.append({
'type': 'reduce_complexity',
'priority': 'medium',
'description': f'Workflow complexity score is {complexity}/100 - consider splitting into sub-workflows',
'benefit': 'Easier maintenance and debugging',
'implementation': 'Break workflow into smaller, reusable workflows'
})
# Opportunity 6: Execution settings
workflow_settings = workflow.get('settings', {})
if not workflow_settings.get('executionTimeout'):
optimizations.append({
'type': 'execution_timeout',
'priority': 'low',
'description': 'Set execution timeout to prevent hanging workflows',
'benefit': 'Prevent resource waste from stuck executions',
'implementation': 'Add timeout in workflow settings'
})
return optimizations
def _calculate_performance_score(self, analysis: Dict) -> int:
"""Calculate overall performance score (0-100)"""
score = 100
# Deduct for execution failures
metrics = analysis.get('execution_metrics', {})
success_rate = metrics.get('success_rate', 100)
score -= (100 - success_rate) * 0.5
# Deduct for complexity
complexity = analysis.get('node_analysis', {}).get('complexity_score', 0)
if complexity > 70:
score -= (complexity - 70) * 0.3
# Deduct for bottlenecks
bottlenecks = analysis.get('bottlenecks', [])
for bottleneck in bottlenecks:
severity = bottleneck.get('severity', 'low')
if severity == 'critical':
score -= 20
elif severity == 'high':
score -= 10
elif severity == 'medium':
score -= 5
# Deduct for high-priority optimizations not implemented
optimizations = analysis.get('optimization_opportunities', [])
high_priority = [opt for opt in optimizations if opt.get('priority') == 'high']
score -= len(high_priority) * 5
return max(0, min(100, int(score)))
def suggest_optimizations(self, workflow_id: str) -> Dict:
"""Generate optimization suggestions"""
analysis = self.analyze_performance(workflow_id)
suggestions = {
'workflow_id': workflow_id,
'performance_score': analysis['performance_score'],
'health': analysis['execution_metrics']['health'],
'priority_actions': [],
'quick_wins': [],
'long_term_improvements': []
}
# Categorize optimizations by effort and impact
for opt in analysis['optimization_opportunities']:
priority = opt.get('priority', 'low')
if priority == 'high':
suggestions['priority_actions'].append(opt)
elif priority == 'medium':
suggestions['quick_wins'].append(opt)
else:
suggestions['long_term_improvements'].append(opt)
# Add bottleneck fixes as priority actions
for bottleneck in analysis['bottlenecks']:
if bottleneck.get('severity') in ['critical', 'high']:
suggestions['priority_actions'].append({
'type': 'fix_bottleneck',
'priority': 'critical',
'description': f"Fix bottleneck: {bottleneck['description']}",
'benefit': f"Resolve: {bottleneck['impact']}"
})
return suggestions
def generate_optimization_report(self, analysis: Dict) -> str:
"""Generate human-readable optimization report"""
report = []
report.append("=" * 70)
report.append("n8n Workflow Optimization Report")
report.append("=" * 70)
report.append(f"\nWorkflow: {analysis['workflow_name']}")
report.append(f"Analysis Period: {analysis['analysis_period_days']} days")
report.append(f"Performance Score: {analysis['performance_score']}/100")
# Execution Metrics
metrics = analysis['execution_metrics']
report.append(f"\n## Execution Metrics")
report.append(f"Health Status: {metrics['health'].upper()}")
report.append(f"Total Executions: {metrics['total_executions']}")
report.append(f"Success Rate: {metrics['success_rate']:.1f}%")
report.append(f"Failure Rate: {metrics['failure_rate']:.1f}%")
# Node Analysis
node_analysis = analysis['node_analysis']
report.append(f"\n## Workflow Structure")
report.append(f"Total Nodes: {node_analysis['total_nodes']}")
report.append(f"Complexity Score: {node_analysis['complexity_score']}/100")
if node_analysis['expensive_nodes']:
report.append(f"\nExpensive Operations ({len(node_analysis['expensive_nodes'])}):")
for node in node_analysis['expensive_nodes'][:5]:
report.append(f"{node['name']}: {node['reason']}")
# Bottlenecks
if analysis['bottlenecks']:
report.append(f"\n## Bottlenecks ({len(analysis['bottlenecks'])})")
for bottleneck in analysis['bottlenecks']:
severity = bottleneck['severity'].upper()
report.append(f"\n[{severity}] {bottleneck['type']}")
report.append(f" Description: {bottleneck['description']}")
report.append(f" Impact: {bottleneck['impact']}")
# Optimization Opportunities
optimizations = analysis['optimization_opportunities']
if optimizations:
report.append(f"\n## Optimization Opportunities ({len(optimizations)})")
# Group by priority
high_priority = [opt for opt in optimizations if opt.get('priority') == 'high']
medium_priority = [opt for opt in optimizations if opt.get('priority') == 'medium']
low_priority = [opt for opt in optimizations if opt.get('priority') == 'low']
if high_priority:
report.append(f"\n### High Priority ({len(high_priority)})")
for opt in high_priority:
report.append(f"\n{opt['type'].replace('_', ' ').title()}")
report.append(f" {opt['description']}")
report.append(f" Benefit: {opt['benefit']}")
if 'implementation' in opt:
report.append(f" How: {opt['implementation']}")
if medium_priority:
report.append(f"\n### Medium Priority ({len(medium_priority)})")
for opt in medium_priority:
report.append(f"\n{opt['type'].replace('_', ' ').title()}")
report.append(f" {opt['description']}")
if low_priority:
report.append(f"\n### Low Priority ({len(low_priority)})")
for opt in low_priority:
report.append(f"{opt['description']}")
report.append("\n" + "=" * 70)
return "\n".join(report)
def main():
parser = argparse.ArgumentParser(description='n8n Workflow Optimizer')
parser.add_argument('action', choices=['analyze', 'suggest', 'report'])
parser.add_argument('--id', required=True, help='Workflow ID')
parser.add_argument('--days', type=int, default=7, help='Analysis period in days')
parser.add_argument('--pretty', action='store_true', help='Pretty print JSON output')
args = parser.parse_args()
try:
optimizer = WorkflowOptimizer()
if args.action == 'analyze':
result = optimizer.analyze_performance(args.id, days=args.days)
print(json.dumps(result, indent=2 if args.pretty else None))
elif args.action == 'suggest':
result = optimizer.suggest_optimizations(args.id)
print(json.dumps(result, indent=2 if args.pretty else None))
elif args.action == 'report':
analysis = optimizer.analyze_performance(args.id, days=args.days)
print(optimizer.generate_optimization_report(analysis))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()

416
scripts/n8n_tester.py Normal file
View File

@@ -0,0 +1,416 @@
#!/usr/bin/env python3
"""
n8n Workflow Testing & Validation
Test workflows before activation with validation and dry-runs
"""
import sys
import json
import argparse
import time
from pathlib import Path
from typing import Dict, List, Any
# Import N8nClient - handle both direct and module imports
try:
from n8n_api import N8nClient
except ImportError:
from scripts.n8n_api import N8nClient
class WorkflowTester:
"""Workflow testing and validation"""
def __init__(self, client: N8nClient = None):
self.client = client # Only initialize when needed
def validate_workflow(self, workflow_id: str = None, workflow_file: str = None) -> Dict:
"""Validate workflow structure and configuration"""
if workflow_id:
if not self.client:
self.client = N8nClient()
workflow_data = self.client.get_workflow(workflow_id)
elif workflow_file:
with open(workflow_file, 'r') as f:
workflow_data = json.load(f)
else:
raise ValueError("Either workflow_id or workflow_file required")
# Perform validation - use standalone validation for files
validation = self._perform_validation(workflow_data)
# Additional validation checks
self._check_credentials(workflow_data, validation)
self._check_node_configurations(workflow_data, validation)
self._check_execution_flow(workflow_data, validation)
return validation
def _perform_validation(self, workflow_data: Dict) -> Dict:
"""Perform standalone workflow validation"""
issues = {
'errors': [],
'warnings': [],
'valid': True
}
# Check required fields
if 'nodes' not in workflow_data:
issues['errors'].append("Missing 'nodes' field")
issues['valid'] = False
return issues
nodes = workflow_data.get('nodes', [])
connections = workflow_data.get('connections', {})
# Validate nodes
node_names = set()
for node in nodes:
if 'name' not in node:
issues['errors'].append("Node missing 'name' field")
issues['valid'] = False
else:
node_names.add(node['name'])
if 'type' not in node:
issues['errors'].append(f"Node '{node.get('name', 'unknown')}' missing 'type' field")
issues['valid'] = False
# Validate connections
for source_node, targets in connections.items():
if source_node not in node_names:
issues['errors'].append(f"Connection references non-existent source node: {source_node}")
issues['valid'] = False
for output_type, output_connections in targets.items():
for conn_list in output_connections:
for conn in conn_list:
target_node = conn.get('node')
if target_node and target_node not in node_names:
issues['errors'].append(f"Connection references non-existent target node: {target_node}")
issues['valid'] = False
# Check for disconnected nodes
connected_nodes = set(connections.keys())
for targets in connections.values():
for output_connections in targets.values():
for conn_list in output_connections:
for conn in conn_list:
connected_nodes.add(conn.get('node'))
disconnected = node_names - connected_nodes
if disconnected and len(nodes) > 1:
for node in disconnected:
issues['warnings'].append(f"Node '{node}' appears to be disconnected")
return issues
def _check_credentials(self, workflow_data: Dict, validation: Dict):
"""Check for missing or invalid credentials"""
nodes = workflow_data.get('nodes', [])
# Nodes that typically require credentials
credential_nodes = [
'n8n-nodes-base.httpRequest',
'n8n-nodes-base.googleSheets',
'n8n-nodes-base.slack',
'n8n-nodes-base.twitter',
'n8n-nodes-base.stripe',
'n8n-nodes-base.postgres',
'n8n-nodes-base.mysql',
'n8n-nodes-base.emailSend'
]
for node in nodes:
node_type = node.get('type', '')
if node_type in credential_nodes:
credentials = node.get('credentials', {})
if not credentials:
validation['warnings'].append(
f"Node '{node['name']}' ({node_type}) likely requires credentials"
)
def _check_node_configurations(self, workflow_data: Dict, validation: Dict):
"""Check for invalid node configurations"""
nodes = workflow_data.get('nodes', [])
for node in nodes:
node_type = node.get('type', '')
parameters = node.get('parameters', {})
# Check HTTP Request nodes
if node_type == 'n8n-nodes-base.httpRequest':
if not parameters.get('url'):
validation['errors'].append(
f"Node '{node['name']}' missing required URL parameter"
)
validation['valid'] = False
# Check webhook nodes
elif node_type == 'n8n-nodes-base.webhook':
if not parameters.get('path'):
validation['errors'].append(
f"Node '{node['name']}' missing required path parameter"
)
validation['valid'] = False
# Check email nodes
elif node_type == 'n8n-nodes-base.emailSend':
if not parameters.get('subject') and not parameters.get('text'):
validation['warnings'].append(
f"Node '{node['name']}' missing subject or text"
)
def _check_execution_flow(self, workflow_data: Dict, validation: Dict):
"""Check workflow execution flow for issues"""
nodes = workflow_data.get('nodes', [])
connections = workflow_data.get('connections', {})
# Check for trigger nodes
trigger_types = [
'n8n-nodes-base.webhook',
'n8n-nodes-base.scheduleTrigger',
'n8n-nodes-base.manualTrigger',
'n8n-nodes-base.start'
]
has_trigger = any(node.get('type') in trigger_types for node in nodes)
if not has_trigger and len(nodes) > 0:
validation['warnings'].append(
"Workflow has no trigger node. It can only be executed manually."
)
# Check for end nodes (nodes with no outgoing connections)
node_names = {node['name'] for node in nodes}
connected_as_source = set(connections.keys())
end_nodes = node_names - connected_as_source
if not end_nodes and len(nodes) > 1:
validation['warnings'].append(
"Workflow has no end nodes. This may indicate circular dependencies."
)
def dry_run(self, workflow_id: str, test_data: Dict = None, test_data_file: str = None) -> Dict:
"""Execute workflow with test data"""
# Load test data if from file
if test_data_file:
with open(test_data_file, 'r') as f:
test_data = json.load(f)
print(f"Running workflow {workflow_id} with test data...")
# Execute workflow
execution_result = self.client.execute_workflow(workflow_id, data=test_data)
execution_id = execution_result.get('data', {}).get('executionId')
if not execution_id:
return {
'status': 'failed',
'error': 'No execution ID returned',
'result': execution_result
}
print(f"Execution started: {execution_id}")
print("Waiting for execution to complete...")
# Poll for execution completion
max_attempts = 30
attempt = 0
while attempt < max_attempts:
time.sleep(2)
attempt += 1
try:
execution = self.client.get_execution(execution_id)
finished = execution.get('finished', False)
if finished:
# Execution completed
success = execution.get('data', {}).get('resultData', {}).get('error') is None
result = {
'status': 'success' if success else 'failed',
'execution_id': execution_id,
'finished': True,
'started_at': execution.get('startedAt'),
'stopped_at': execution.get('stoppedAt'),
'mode': execution.get('mode'),
'data': execution.get('data', {})
}
if not success:
error_data = execution.get('data', {}).get('resultData', {}).get('error', {})
result['error'] = {
'message': error_data.get('message'),
'description': error_data.get('description')
}
return result
except Exception as e:
print(f"Error checking execution status: {e}")
continue
return {
'status': 'timeout',
'execution_id': execution_id,
'message': 'Execution did not complete within expected time'
}
def test_suite(self, workflow_id: str, test_cases: List[Dict]) -> Dict:
"""Run multiple test cases against workflow"""
results = {
'workflow_id': workflow_id,
'total_tests': len(test_cases),
'passed': 0,
'failed': 0,
'test_results': []
}
for i, test_case in enumerate(test_cases, 1):
print(f"\nRunning test case {i}/{len(test_cases)}: {test_case.get('name', 'Unnamed')}")
test_data = test_case.get('input', {})
expected_output = test_case.get('expected', {})
# Run test
result = self.dry_run(workflow_id, test_data=test_data)
# Check result
passed = result.get('status') == 'success'
test_result = {
'test_name': test_case.get('name'),
'passed': passed,
'input': test_data,
'output': result,
'expected': expected_output
}
results['test_results'].append(test_result)
if passed:
results['passed'] += 1
print(f"✓ Test passed")
else:
results['failed'] += 1
print(f"✗ Test failed: {result.get('error', 'Unknown error')}")
return results
def generate_test_report(self, validation: Dict, dry_run: Dict = None) -> str:
"""Generate human-readable test report"""
report = []
report.append("=" * 60)
report.append("n8n Workflow Test Report")
report.append("=" * 60)
# Validation results
report.append("\n## Validation Results")
report.append(f"Status: {'✓ VALID' if validation['valid'] else '✗ INVALID'}")
if validation['errors']:
report.append(f"\nErrors ({len(validation['errors'])}):")
for error in validation['errors']:
report.append(f"{error}")
if validation['warnings']:
report.append(f"\nWarnings ({len(validation['warnings'])}):")
for warning in validation['warnings']:
report.append(f"{warning}")
if not validation['errors'] and not validation['warnings']:
report.append("\n✓ No issues found")
# Dry run results
if dry_run:
report.append("\n## Dry Run Results")
report.append(f"Status: {dry_run.get('status', 'unknown').upper()}")
report.append(f"Execution ID: {dry_run.get('execution_id', 'N/A')}")
if dry_run.get('started_at'):
report.append(f"Started: {dry_run['started_at']}")
if dry_run.get('stopped_at'):
report.append(f"Stopped: {dry_run['stopped_at']}")
if dry_run.get('error'):
report.append(f"\nError: {dry_run['error'].get('message', 'Unknown error')}")
if dry_run['error'].get('description'):
report.append(f"Description: {dry_run['error']['description']}")
report.append("\n" + "=" * 60)
return "\n".join(report)
def main():
parser = argparse.ArgumentParser(description='n8n Workflow Testing & Validation')
parser.add_argument('action', choices=['validate', 'dry-run', 'test-suite', 'report'])
parser.add_argument('--id', help='Workflow ID')
parser.add_argument('--file', help='Workflow JSON file')
parser.add_argument('--data', help='Test data JSON string')
parser.add_argument('--data-file', help='Test data JSON file')
parser.add_argument('--test-suite', help='Test suite JSON file')
parser.add_argument('--pretty', action='store_true', help='Pretty print output')
parser.add_argument('--report', action='store_true', help='Generate human-readable report')
args = parser.parse_args()
try:
tester = WorkflowTester()
result = None
if args.action == 'validate':
result = tester.validate_workflow(workflow_id=args.id, workflow_file=args.file)
if args.report:
print(tester.generate_test_report(result))
else:
print(json.dumps(result, indent=2 if args.pretty else None))
elif args.action == 'dry-run':
if not args.id:
raise ValueError("--id required for dry-run")
test_data = None
if args.data:
test_data = json.loads(args.data)
result = tester.dry_run(
workflow_id=args.id,
test_data=test_data,
test_data_file=args.data_file
)
if args.report:
validation = tester.validate_workflow(workflow_id=args.id)
print(tester.generate_test_report(validation, result))
else:
print(json.dumps(result, indent=2 if args.pretty else None))
elif args.action == 'test-suite':
if not args.id:
raise ValueError("--id required for test-suite")
if not args.test_suite:
raise ValueError("--test-suite required for test-suite")
with open(args.test_suite, 'r') as f:
test_cases = json.load(f)
result = tester.test_suite(args.id, test_cases)
print(json.dumps(result, indent=2 if args.pretty else None))
elif args.action == 'report':
if not args.id:
raise ValueError("--id required for report")
validation = tester.validate_workflow(workflow_id=args.id)
print(tester.generate_test_report(validation))
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
if __name__ == '__main__':
main()