commit 3fbcdb96410c898df55511321b1a6b1187df143f Author: zlei9 Date: Sun Mar 29 14:33:06 2026 +0800 Initial commit with translated description diff --git a/README.md b/README.md new file mode 100644 index 0000000..9b37cc8 --- /dev/null +++ b/README.md @@ -0,0 +1,338 @@ +# n8n Enhanced Workflow Management Skill + +Comprehensive n8n automation skill with workflow creation, testing, execution monitoring, and performance optimization capabilities. + +## Features + +### โœจ Workflow Creation +- **Template Library:** 6 pre-built SaaS automation workflows +- **Interactive Builder:** Guided workflow creation +- **JSON Import:** Create from existing workflow files +- **Programmatic API:** Build workflows in Python + +### ๐Ÿงช Testing & Validation +- **Structure Validation:** Check workflow integrity +- **Dry-Run Testing:** Test with sample data before activation +- **Test Suites:** Run multiple test cases +- **Validation Reports:** Detailed error and warning reports + +### ๐Ÿ“Š Execution Monitoring +- **Real-time Tracking:** Monitor workflow execution status +- **Execution Logs:** Detailed execution history +- **Error Analysis:** Identify and debug failures +- **Retry Logic:** Built-in failure handling + +### โšก Performance Optimization +- **Performance Analysis:** Comprehensive workflow metrics +- **Bottleneck Detection:** Identify slow operations +- **Optimization Suggestions:** Actionable improvement recommendations +- **Performance Scoring:** 0-100 workflow health score + +## Quick Start + +### 1. Setup + +```bash +# Set environment variables +export N8N_API_KEY="your-api-key" +export N8N_BASE_URL="your-n8n-url-here" + +# Verify connection +python3 scripts/n8n_api.py list-workflows --pretty +``` + +### 2. Create Your First Workflow + +```bash +# Deploy a template +python3 scripts/n8n_api.py create --from-template waitlist-pipeline + +# Output: {"id": "123", "name": "Waitlist to Customer Pipeline", ...} +``` + +### 3. Test Before Activating + +```bash +# Validate structure +python3 scripts/n8n_tester.py validate --id 123 + +# Dry run with test data +python3 scripts/n8n_tester.py dry-run --id 123 \ + --data '{"email": "test@example.com", "name": "Test User"}' +``` + +### 4. Activate & Monitor + +```bash +# Activate workflow +python3 scripts/n8n_api.py activate --id 123 + +# Monitor executions +python3 scripts/n8n_api.py list-executions --id 123 --limit 10 --pretty +``` + +### 5. Optimize Performance + +```bash +# Generate optimization report +python3 scripts/n8n_optimizer.py report --id 123 +``` + +## Available Templates + +| Template | Description | Trigger | Key Features | +|----------|-------------|---------|--------------| +| **waitlist-pipeline** | Waitlist to customer automation | Webhook | Email validation, CRM integration, welcome emails | +| **product-hunt** | Monitor Product Hunt launches | Schedule (hourly) | Vote filtering, Slack notifications, Sheets logging | +| **social-media-crosspost** | Multi-platform posting | Webhook | Twitter, LinkedIn, Facebook parallel posting | +| **revenue-dashboard** | Revenue data collection | Schedule (daily) | Stripe integration, Sheets updates | +| **customer-onboarding** | Multi-day email sequence | Webhook | Time-delayed follow-ups, progressive engagement | +| **lead-scraping** | Lead generation pipeline | Schedule (daily) | Web scraping, data enrichment, DB storage | + +## CLI Commands + +### Workflow Management + +```bash +# List all workflows +python3 scripts/n8n_api.py list-workflows --pretty + +# Get workflow details +python3 scripts/n8n_api.py get-workflow --id --pretty + +# Create from template +python3 scripts/n8n_api.py create --from-template waitlist-pipeline + +# Create from file +python3 scripts/n8n_api.py create --from-file workflow.json + +# Activate/Deactivate +python3 scripts/n8n_api.py activate --id +python3 scripts/n8n_api.py deactivate --id +``` + +### Testing & Validation + +```bash +# Validate workflow +python3 scripts/n8n_tester.py validate --id --pretty + +# Validate from file +python3 scripts/n8n_tester.py validate --file workflow.json --pretty + +# Dry run with data +python3 scripts/n8n_tester.py dry-run --id --data '{"key": "value"}' + +# Dry run with file +python3 scripts/n8n_tester.py dry-run --id --data-file test-data.json + +# Generate test report +python3 scripts/n8n_tester.py report --id + +# Run test suite +python3 scripts/n8n_tester.py test-suite --id --test-suite tests.json +``` + +### Execution Monitoring + +```bash +# List executions +python3 scripts/n8n_api.py list-executions --limit 20 --pretty + +# Get execution details +python3 scripts/n8n_api.py get-execution --id --pretty + +# Execute manually +python3 scripts/n8n_api.py execute --id + +# Execute with data +python3 scripts/n8n_api.py execute --id --data '{"key": "value"}' + +# Get statistics +python3 scripts/n8n_api.py stats --id --days 7 --pretty +``` + +### Performance Optimization + +```bash +# Analyze performance +python3 scripts/n8n_optimizer.py analyze --id --pretty + +# Get suggestions +python3 scripts/n8n_optimizer.py suggest --id --pretty + +# Generate report +python3 scripts/n8n_optimizer.py report --id +``` + +## Python API + +### Basic Usage + +```python +from scripts.n8n_api import N8nClient + +client = N8nClient() + +# List workflows +workflows = client.list_workflows(active=True) +print(f"Active workflows: {len(workflows)}") + +# Create workflow +workflow = client.create_workflow({ + 'name': 'My Workflow', + 'nodes': [...], + 'connections': {...} +}) + +# Execute workflow +result = client.execute_workflow(workflow['id'], data={'test': True}) +``` + +### Testing + +```python +from scripts.n8n_tester import WorkflowTester + +tester = WorkflowTester() + +# Validate +validation = tester.validate_workflow(workflow_id='123') +if validation['valid']: + print("โœ“ Workflow is valid") +else: + print(f"โœ— Errors: {validation['errors']}") + +# Dry run +result = tester.dry_run('123', test_data={'email': 'test@example.com'}) +print(f"Status: {result['status']}") +``` + +### Optimization + +```python +from scripts.n8n_optimizer import WorkflowOptimizer + +optimizer = WorkflowOptimizer() + +# Analyze +analysis = optimizer.analyze_performance('123', days=30) +print(f"Performance Score: {analysis['performance_score']}/100") +print(f"Health: {analysis['execution_metrics']['health']}") + +# Get suggestions +suggestions = optimizer.suggest_optimizations('123') +print(f"Priority Actions: {len(suggestions['priority_actions'])}") +``` + +## Common Workflows + +### Create โ†’ Test โ†’ Deploy + +```bash +# 1. Create from template +python3 scripts/n8n_api.py create --from-template waitlist-pipeline > workflow.json +WORKFLOW_ID=$(cat workflow.json | jq -r '.id') + +# 2. Validate structure +python3 scripts/n8n_tester.py validate --id $WORKFLOW_ID + +# 3. Test with sample data +python3 scripts/n8n_tester.py dry-run --id $WORKFLOW_ID \ + --data '{"email": "test@example.com", "name": "Test User"}' --report + +# 4. If tests pass, activate +python3 scripts/n8n_api.py activate --id $WORKFLOW_ID +``` + +### Debug Failed Workflow + +```bash +# 1. Check recent executions +python3 scripts/n8n_api.py list-executions --id $WORKFLOW_ID --limit 5 + +# 2. Get execution details +python3 scripts/n8n_api.py get-execution --id $EXEC_ID --pretty + +# 3. Validate workflow structure +python3 scripts/n8n_tester.py report --id $WORKFLOW_ID + +# 4. Check for optimization issues +python3 scripts/n8n_optimizer.py report --id $WORKFLOW_ID +``` + +### Optimize Performance + +```bash +# 1. Analyze current state +python3 scripts/n8n_optimizer.py analyze --id $WORKFLOW_ID --days 30 --pretty + +# 2. Get recommendations +python3 scripts/n8n_optimizer.py suggest --id $WORKFLOW_ID --pretty + +# 3. Generate full report +python3 scripts/n8n_optimizer.py report --id $WORKFLOW_ID > optimization-report.txt + +# 4. Review execution stats +python3 scripts/n8n_api.py stats --id $WORKFLOW_ID --days 30 --pretty +``` + +## File Structure + +``` +~/clawd/skills/n8n/ +โ”œโ”€โ”€ README.md # This file +โ”œโ”€โ”€ SKILL.md # Comprehensive documentation +โ”œโ”€โ”€ scripts/ +โ”‚ โ”œโ”€โ”€ n8n_api.py # Core API client +โ”‚ โ”œโ”€โ”€ n8n_tester.py # Testing & validation +โ”‚ โ””โ”€โ”€ n8n_optimizer.py # Performance optimization +โ”œโ”€โ”€ templates/ +โ”‚ โ”œโ”€โ”€ README.md # Template documentation +โ”‚ โ”œโ”€โ”€ *.json # Workflow templates +โ”‚ โ””โ”€โ”€ test-data-*.json # Test data files +โ””โ”€โ”€ references/ + โ””โ”€โ”€ api.md # API reference +``` + +## Best Practices + +1. **Always validate before activating:** Catch errors early +2. **Test with sample data:** Use dry-run to verify behavior +3. **Monitor execution metrics:** Track success rates and failures +4. **Regular optimization reviews:** Monthly performance analysis +5. **Use templates as starting points:** Proven patterns save time +6. **Document customizations:** Keep changelog of modifications +7. **Implement error handling:** Add error nodes for reliability +8. **Gradual rollout:** Test with limited traffic initially + +## Troubleshooting + +| Issue | Solution | +|-------|----------| +| Authentication error | Set `N8N_API_KEY` environment variable | +| Connection error | Verify `N8N_BASE_URL` and network access | +| Validation errors | Check workflow JSON structure | +| Execution timeout | Optimize expensive operations, reduce dataset size | +| Rate limiting | Add Wait nodes, implement backoff | +| Missing credentials | Configure in n8n UI, assign to nodes | + +## Resources + +- **Documentation:** [SKILL.md](SKILL.md) +- **Templates:** [templates/README.md](templates/README.md) +- **n8n Docs:** https://docs.n8n.io +- **n8n API:** https://docs.n8n.io/api/ +- **n8n Community:** https://community.n8n.io + +## Support + +For issues or questions: +1. Check validation output: `python3 scripts/n8n_tester.py validate --id ` +2. Review execution logs: `python3 scripts/n8n_api.py get-execution --id ` +3. Generate optimization report: `python3 scripts/n8n_optimizer.py report --id ` +4. Consult [SKILL.md](SKILL.md) for detailed documentation + +## License + +Part of the Clawdbot skills library. diff --git a/SKILL.md b/SKILL.md new file mode 100644 index 0000000..7772341 --- /dev/null +++ b/SKILL.md @@ -0,0 +1,537 @@ +--- +name: n8n +description: "้€š่ฟ‡API็ฎก็†n8nๅทฅไฝœๆตๅ’Œ่‡ชๅŠจๅŒ–ใ€‚" +metadata: {"openclaw":{"emoji":"\u2699\ufe0f","requires":{"env":["N8N_API_KEY","N8N_BASE_URL"]},"primaryEnv":"N8N_API_KEY"}} +--- + +# n8n Workflow Management + +Comprehensive workflow automation management for n8n platform with creation, testing, execution monitoring, and performance optimization capabilities. + +## โš ๏ธ CRITICAL: Workflow Creation Rules + +**When creating n8n workflows, ALWAYS:** + +1. โœ… **Generate COMPLETE workflows** with all functional nodes +2. โœ… **Include actual HTTP Request nodes** for API calls (ImageFX, Gemini, Veo, Suno, etc.) +3. โœ… **Add Code nodes** for data transformation and logic +4. โœ… **Create proper connections** between all nodes +5. โœ… **Use real node types** (n8n-nodes-base.httpRequest, n8n-nodes-base.code, n8n-nodes-base.set) + +**NEVER:** +- โŒ Create "Setup Instructions" placeholder nodes +- โŒ Generate workflows with only TODO comments +- โŒ Make incomplete workflows requiring manual node addition +- โŒ Use text-only nodes as substitutes for real functionality + +**Example GOOD workflow:** +``` +Manual Trigger โ†’ Set Config โ†’ HTTP Request (API call) โ†’ Code (parse) โ†’ Response +``` + +**Example BAD workflow:** +``` +Manual Trigger โ†’ Code ("Add HTTP nodes here, configure APIs...") +``` + +Always build the complete, functional workflow with all necessary nodes configured and connected. + +## Setup + +**Required environment variables:** +- `N8N_API_KEY` โ€” Your n8n API key (Settings โ†’ API in the n8n UI) +- `N8N_BASE_URL` โ€” Your n8n instance URL + +**Configure credentials via OpenClaw settings:** + +Add to `~/.config/openclaw/settings.json`: +```json +{ + "skills": { + "n8n": { + "env": { + "N8N_API_KEY": "your-api-key-here", + "N8N_BASE_URL": "your-n8n-url-here" + } + } + } +} +``` + +Or set per-session (do **not** persist secrets in shell rc files): +```bash +export N8N_API_KEY="your-api-key-here" +export N8N_BASE_URL="your-n8n-url-here" +``` + +**Verify connection:** +```bash +python3 scripts/n8n_api.py list-workflows --pretty +``` + +> **Security note:** Never store API keys in plaintext shell config files (`~/.bashrc`, `~/.zshrc`). Use the OpenClaw settings file or a secure secret manager. + +## Quick Reference + +### Workflow Management + +#### List Workflows +```bash +python3 scripts/n8n_api.py list-workflows --pretty +python3 scripts/n8n_api.py list-workflows --active true --pretty +``` + +#### Get Workflow Details +```bash +python3 scripts/n8n_api.py get-workflow --id --pretty +``` + +#### Create Workflows +```bash +# From JSON file +python3 scripts/n8n_api.py create --from-file workflow.json +``` + +#### Activate/Deactivate +```bash +python3 scripts/n8n_api.py activate --id +python3 scripts/n8n_api.py deactivate --id +``` + +### Testing & Validation + +#### Validate Workflow Structure +```bash +# Validate existing workflow +python3 scripts/n8n_tester.py validate --id + +# Validate from file +python3 scripts/n8n_tester.py validate --file workflow.json --pretty + +# Generate validation report +python3 scripts/n8n_tester.py report --id +``` + +#### Dry Run Testing +```bash +# Test with data +python3 scripts/n8n_tester.py dry-run --id --data '{"email": "test@example.com"}' + +# Test with data file +python3 scripts/n8n_tester.py dry-run --id --data-file test-data.json + +# Full test report (validation + dry run) +python3 scripts/n8n_tester.py dry-run --id --data-file test.json --report +``` + +#### Test Suite +```bash +# Run multiple test cases +python3 scripts/n8n_tester.py test-suite --id --test-suite test-cases.json +``` + +### Execution Monitoring + +#### List Executions +```bash +# Recent executions (all workflows) +python3 scripts/n8n_api.py list-executions --limit 10 --pretty + +# Specific workflow executions +python3 scripts/n8n_api.py list-executions --id --limit 20 --pretty +``` + +#### Get Execution Details +```bash +python3 scripts/n8n_api.py get-execution --id --pretty +``` + +#### Manual Execution +```bash +# Trigger workflow +python3 scripts/n8n_api.py execute --id + +# Execute with data +python3 scripts/n8n_api.py execute --id --data '{"key": "value"}' +``` + +### Performance Optimization + +#### Analyze Performance +```bash +# Full performance analysis +python3 scripts/n8n_optimizer.py analyze --id --pretty + +# Analyze specific period +python3 scripts/n8n_optimizer.py analyze --id --days 30 --pretty +``` + +#### Get Optimization Suggestions +```bash +# Priority-ranked suggestions +python3 scripts/n8n_optimizer.py suggest --id --pretty +``` + +#### Generate Optimization Report +```bash +# Human-readable report with metrics, bottlenecks, and suggestions +python3 scripts/n8n_optimizer.py report --id +``` + +#### Get Workflow Statistics +```bash +# Execution statistics +python3 scripts/n8n_api.py stats --id --days 7 --pretty +``` + +## Python API + +### Basic Usage + +```python +from scripts.n8n_api import N8nClient + +client = N8nClient() + +# List workflows +workflows = client.list_workflows(active=True) + +# Get workflow +workflow = client.get_workflow('workflow-id') + +# Create workflow +new_workflow = client.create_workflow({ + 'name': 'My Workflow', + 'nodes': [...], + 'connections': {...} +}) + +# Activate/deactivate +client.activate_workflow('workflow-id') +client.deactivate_workflow('workflow-id') + +# Executions +executions = client.list_executions(workflow_id='workflow-id', limit=10) +execution = client.get_execution('execution-id') + +# Execute workflow +result = client.execute_workflow('workflow-id', data={'key': 'value'}) +``` + +### Validation & Testing + +```python +from scripts.n8n_api import N8nClient +from scripts.n8n_tester import WorkflowTester + +client = N8nClient() +tester = WorkflowTester(client) + +# Validate workflow +validation = tester.validate_workflow(workflow_id='123') +print(f"Valid: {validation['valid']}") +print(f"Errors: {validation['errors']}") +print(f"Warnings: {validation['warnings']}") + +# Dry run +result = tester.dry_run( + workflow_id='123', + test_data={'email': 'test@example.com'} +) +print(f"Status: {result['status']}") + +# Test suite +test_cases = [ + {'name': 'Test 1', 'input': {...}, 'expected': {...}}, + {'name': 'Test 2', 'input': {...}, 'expected': {...}} +] +results = tester.test_suite('123', test_cases) +print(f"Passed: {results['passed']}/{results['total_tests']}") + +# Generate report +report = tester.generate_test_report(validation, result) +print(report) +``` + +### Performance Optimization + +```python +from scripts.n8n_optimizer import WorkflowOptimizer + +optimizer = WorkflowOptimizer() + +# Analyze performance +analysis = optimizer.analyze_performance('workflow-id', days=7) +print(f"Performance Score: {analysis['performance_score']}/100") +print(f"Health: {analysis['execution_metrics']['health']}") + +# Get suggestions +suggestions = optimizer.suggest_optimizations('workflow-id') +print(f"Priority Actions: {len(suggestions['priority_actions'])}") +print(f"Quick Wins: {len(suggestions['quick_wins'])}") + +# Generate report +report = optimizer.generate_optimization_report(analysis) +print(report) +``` + +## Common Workflows + +### 1. Validate and Test Workflow + +```bash +# Validate workflow structure +python3 scripts/n8n_tester.py validate --id --pretty + +# Test with sample data +python3 scripts/n8n_tester.py dry-run --id \ + --data '{"email": "test@example.com", "name": "Test User"}' + +# If tests pass, activate +python3 scripts/n8n_api.py activate --id +``` + +### 2. Debug Failed Workflow + +```bash +# Check recent executions +python3 scripts/n8n_api.py list-executions --id --limit 10 --pretty + +# Get specific execution details +python3 scripts/n8n_api.py get-execution --id --pretty + +# Validate workflow structure +python3 scripts/n8n_tester.py validate --id + +# Generate test report +python3 scripts/n8n_tester.py report --id + +# Check for optimization issues +python3 scripts/n8n_optimizer.py report --id +``` + +### 3. Optimize Workflow Performance + +```bash +# Analyze current performance +python3 scripts/n8n_optimizer.py analyze --id --days 30 --pretty + +# Get actionable suggestions +python3 scripts/n8n_optimizer.py suggest --id --pretty + +# Generate comprehensive report +python3 scripts/n8n_optimizer.py report --id + +# Review execution statistics +python3 scripts/n8n_api.py stats --id --days 30 --pretty + +# Test optimizations with dry run +python3 scripts/n8n_tester.py dry-run --id --data-file test-data.json +``` + +### 4. Monitor Workflow Health + +```bash +# Check active workflows +python3 scripts/n8n_api.py list-workflows --active true --pretty + +# Review recent execution status +python3 scripts/n8n_api.py list-executions --limit 20 --pretty + +# Get statistics for each critical workflow +python3 scripts/n8n_api.py stats --id --pretty + +# Generate health reports +python3 scripts/n8n_optimizer.py report --id +``` + +## Validation Checks + +The testing module performs comprehensive validation: + +### Structure Validation +- โœ“ Required fields present (nodes, connections) +- โœ“ All nodes have names and types +- โœ“ Connection targets exist +- โœ“ No disconnected nodes (warning) + +### Configuration Validation +- โœ“ Nodes requiring credentials are configured +- โœ“ Required parameters are set +- โœ“ HTTP nodes have URLs +- โœ“ Webhook nodes have paths +- โœ“ Email nodes have content + +### Flow Validation +- โœ“ Workflow has trigger nodes +- โœ“ Proper execution flow +- โœ“ No circular dependencies +- โœ“ End nodes identified + +## Optimization Analysis + +The optimizer analyzes multiple dimensions: + +### Execution Metrics +- Total executions +- Success/failure rates +- Health status (excellent/good/fair/poor) +- Error patterns + +### Performance Metrics +- Node count and complexity +- Connection patterns +- Expensive operations (API calls, database queries) +- Parallel execution opportunities + +### Bottleneck Detection +- Sequential expensive operations +- High failure rates +- Missing error handling +- Rate limit issues + +### Optimization Opportunities +- **Parallel Execution:** Identify nodes that can run concurrently +- **Caching:** Suggest caching for repeated API calls +- **Batch Processing:** Recommend batching for large datasets +- **Error Handling:** Add error recovery mechanisms +- **Complexity Reduction:** Split complex workflows +- **Timeout Settings:** Configure execution limits + +## Performance Scoring + +Workflows receive a performance score (0-100) based on: + +- **Success Rate:** Higher is better (50% weight) +- **Complexity:** Lower is better (30% weight) +- **Bottlenecks:** Fewer is better (critical: -20, high: -10, medium: -5) +- **Optimizations:** Implemented best practices (+5 each) + +Score interpretation: +- **90-100:** Excellent - Well-optimized +- **70-89:** Good - Minor improvements possible +- **50-69:** Fair - Optimization recommended +- **0-49:** Poor - Significant issues + +## Best Practices + +### Development +1. **Plan Structure:** Design workflow nodes and connections before building +2. **Validate First:** Always validate before deployment +3. **Test Thoroughly:** Use dry-run with multiple test cases +4. **Error Handling:** Add error nodes for reliability +5. **Documentation:** Comment complex logic in Code nodes + +### Testing +1. **Sample Data:** Create realistic test data files +2. **Edge Cases:** Test boundary conditions and errors +3. **Incremental:** Test each node addition +4. **Regression:** Retest after changes +5. **Production-like:** Use staging environment that mirrors production + +### Deployment +1. **Inactive First:** Deploy workflows in inactive state +2. **Gradual Rollout:** Test with limited traffic initially +3. **Monitor Closely:** Watch first executions carefully +4. **Quick Rollback:** Be ready to deactivate if issues arise +5. **Document Changes:** Keep changelog of modifications + +### Optimization +1. **Baseline Metrics:** Capture performance before changes +2. **One Change at a Time:** Isolate optimization impacts +3. **Measure Results:** Compare before/after metrics +4. **Regular Reviews:** Schedule monthly optimization reviews +5. **Cost Awareness:** Monitor API usage and execution costs + +### Maintenance +1. **Health Checks:** Weekly execution statistics review +2. **Error Analysis:** Investigate failure patterns +3. **Performance Monitoring:** Track execution times +4. **Credential Rotation:** Update credentials regularly +5. **Cleanup:** Archive or delete unused workflows + +## Troubleshooting + +### Authentication Error +``` +Error: N8N_API_KEY not found in environment +``` +**Solution:** Set environment variable: +```bash +export N8N_API_KEY="your-api-key" +``` + +### Connection Error +``` +Error: HTTP 401: Unauthorized +``` +**Solution:** +1. Verify API key is correct +2. Check N8N_BASE_URL is set correctly +3. Confirm API access is enabled in n8n + +### Validation Errors +``` +Validation failed: Node missing 'name' field +``` +**Solution:** Check workflow JSON structure, ensure all required fields present + +### Execution Timeout +``` +Status: timeout - Execution did not complete +``` +**Solution:** +1. Check workflow for infinite loops +2. Reduce dataset size for testing +3. Optimize expensive operations +4. Set execution timeout in workflow settings + +### Rate Limiting +``` +Error: HTTP 429: Too Many Requests +``` +**Solution:** +1. Add Wait nodes between API calls +2. Implement exponential backoff +3. Use batch processing +4. Check API rate limits + +### Missing Credentials +``` +Warning: Node 'HTTP_Request' may require credentials +``` +**Solution:** +1. Configure credentials in n8n UI +2. Assign credentials to node +3. Test connection before activating + +## File Structure + +``` +~/clawd/skills/n8n/ +โ”œโ”€โ”€ SKILL.md # This file +โ”œโ”€โ”€ scripts/ +โ”‚ โ”œโ”€โ”€ n8n_api.py # Core API client (extended) +โ”‚ โ”œโ”€โ”€ n8n_tester.py # Testing & validation +โ”‚ โ””โ”€โ”€ n8n_optimizer.py # Performance optimization +โ””โ”€โ”€ references/ + โ””โ”€โ”€ api.md # n8n API reference +``` + +## API Reference + +For detailed n8n REST API documentation, see [references/api.md](references/api.md) or visit: +https://docs.n8n.io/api/ + +## Support + +**Documentation:** +- n8n Official Docs: https://docs.n8n.io +- n8n Community Forum: https://community.n8n.io +- n8n API Reference: https://docs.n8n.io/api/ + +**Debugging:** +1. Use validation: `python3 scripts/n8n_tester.py validate --id ` +2. Check execution logs: `python3 scripts/n8n_api.py get-execution --id ` +3. Review optimization report: `python3 scripts/n8n_optimizer.py report --id ` +4. Test with dry-run: `python3 scripts/n8n_tester.py dry-run --id --data-file test.json` diff --git a/_meta.json b/_meta.json new file mode 100644 index 0000000..daa4d71 --- /dev/null +++ b/_meta.json @@ -0,0 +1,6 @@ +{ + "ownerId": "kn797ds1ncqqb6bf6c9w26wtcn7z640c", + "slug": "n8n", + "version": "2.0.0", + "publishedAt": 1770748031005 +} \ No newline at end of file diff --git a/references/api.md b/references/api.md new file mode 100644 index 0000000..2ffe908 --- /dev/null +++ b/references/api.md @@ -0,0 +1,156 @@ +# n8n API Reference + +## Authentication + +The n8n API uses API key authentication via the `X-N8N-API-KEY` header. + +API keys are managed in the n8n UI: +- Settings โ†’ API +- User profile โ†’ API + +## Base URL + +`N8N_BASE_URL` + +Default: `${N8N_BASE_URL}$/api/v1` + +## Endpoints + +### Workflows + +#### List Workflows +``` +GET /workflows +Query params: ?active=true|false +``` + +Response: +```json +{ + "data": [ + { + "id": "123", + "name": "My Workflow", + "active": true, + "createdAt": "2026-01-14T10:00:00.000Z", + "updatedAt": "2026-01-14T12:00:00.000Z" + } + ] +} +``` + +#### Get Workflow +``` +GET /workflows/{id} +``` + +#### Create Workflow +``` +POST /workflows +Body: workflow JSON +``` + +#### Update Workflow +``` +PATCH /workflows/{id} +Body: partial workflow JSON +``` + +#### Activate/Deactivate +``` +PATCH /workflows/{id} +Body: {"active": true|false} +``` + +#### Delete Workflow +``` +DELETE /workflows/{id} +``` + +### Executions + +#### List Executions +``` +GET /executions?limit=20&workflowId={id} +``` + +Response: +```json +{ + "data": [ + { + "id": "456", + "finished": true, + "mode": "trigger", + "startedAt": "2026-01-14T12:00:00.000Z", + "stoppedAt": "2026-01-14T12:00:05.000Z", + "workflowId": "123", + "status": "success" + } + ] +} +``` + +#### Get Execution +``` +GET /executions/{id} +``` + +#### Delete Execution +``` +DELETE /executions/{id} +``` + +#### Manual Execution +``` +POST /workflows/{id}/execute +Body: {"data": {...}} +``` + +## Common Patterns + +### List Active Workflows +```bash +python3 scripts/n8n_api.py list-workflows --active true --pretty +``` + +### Get Workflow Details +```bash +python3 scripts/n8n_api.py get-workflow --id --pretty +``` + +### Activate/Deactivate Workflow +```bash +python3 scripts/n8n_api.py activate --id +python3 scripts/n8n_api.py deactivate --id +``` + +### List Recent Executions +```bash +python3 scripts/n8n_api.py list-executions --limit 10 --pretty +``` + +### Manually Execute Workflow +```bash +python3 scripts/n8n_api.py execute --id +``` + +With data: +```bash +python3 scripts/n8n_api.py execute --id --data '{"key": "value"}' +``` + +## Error Handling + +HTTP status codes: +- `200` - Success +- `400` - Bad request +- `401` - Unauthorized (invalid API key) +- `404` - Not found +- `500` - Server error + +## Environment Variables + +Required: +- `N8N_API_KEY` - n8n API key +- `N8N_BASE_URL` - Base URL \ No newline at end of file diff --git a/scripts/n8n_api.py b/scripts/n8n_api.py new file mode 100644 index 0000000..2fce8df --- /dev/null +++ b/scripts/n8n_api.py @@ -0,0 +1,355 @@ +#!/usr/bin/env python3 +""" +n8n API client for Clawdbot +Manages workflows, executions, and credentials via n8n REST API +""" + +import os +import sys +import json +import argparse +import requests +from pathlib import Path +from typing import Optional, Dict, Any, List + + +class N8nClient: + """n8n API client""" + + def __init__(self, base_url: str = None, api_key: str = None): + self.base_url = base_url or os.getenv('N8N_BASE_URL') + self.api_key = api_key or os.getenv('N8N_API_KEY') + + if not self.api_key: + raise ValueError("N8N_API_KEY not found in environment") + + self.session = requests.Session() + self.session.headers.update({ + 'X-N8N-API-KEY': self.api_key, + 'Accept': 'application/json', + 'Content-Type': 'application/json' + }) + + def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]: + """Make API request""" + url = f"{self.base_url}/api/v1/{endpoint.lstrip('/')}" + response = self.session.request(method, url, **kwargs) + + try: + response.raise_for_status() + return response.json() if response.content else {} + except requests.exceptions.HTTPError as e: + error_msg = f"HTTP {response.status_code}: {response.text}" + raise Exception(error_msg) from e + + # Workflows + def list_workflows(self, active: bool = None) -> List[Dict]: + """List all workflows""" + params = {} + if active is not None: + params['active'] = str(active).lower() + return self._request('GET', 'workflows', params=params) + + def get_workflow(self, workflow_id: str) -> Dict: + """Get workflow details""" + return self._request('GET', f'workflows/{workflow_id}') + + def create_workflow(self, workflow_data: Dict) -> Dict: + """Create new workflow""" + # Remove read-only fields that n8n API doesn't accept on create + clean_data = workflow_data.copy() + clean_data.pop('active', None) + clean_data.pop('id', None) + return self._request('POST', 'workflows', json=clean_data) + + def update_workflow(self, workflow_id: str, workflow_data: Dict) -> Dict: + """Update existing workflow""" + return self._request('PATCH', f'workflows/{workflow_id}', json=workflow_data) + + def delete_workflow(self, workflow_id: str) -> Dict: + """Delete workflow""" + return self._request('DELETE', f'workflows/{workflow_id}') + + def activate_workflow(self, workflow_id: str) -> Dict: + """Activate workflow""" + return self._request('PATCH', f'workflows/{workflow_id}', json={'active': True}) + + def deactivate_workflow(self, workflow_id: str) -> Dict: + """Deactivate workflow""" + return self._request('PATCH', f'workflows/{workflow_id}', json={'active': False}) + + # Executions + def list_executions(self, workflow_id: str = None, limit: int = 20) -> List[Dict]: + """List workflow executions""" + params = {'limit': limit} + if workflow_id: + params['workflowId'] = workflow_id + return self._request('GET', 'executions', params=params) + + def get_execution(self, execution_id: str) -> Dict: + """Get execution details""" + return self._request('GET', f'executions/{execution_id}') + + def delete_execution(self, execution_id: str) -> Dict: + """Delete execution""" + return self._request('DELETE', f'executions/{execution_id}') + + # Manual execution + def execute_workflow(self, workflow_id: str, data: Dict = None) -> Dict: + """Manually trigger workflow execution""" + payload = {'workflowId': workflow_id} + if data: + payload['data'] = data + return self._request('POST', f'workflows/{workflow_id}/execute', json=payload) + + # Testing & Validation + def validate_workflow(self, workflow_data: Dict) -> Dict: + """Validate workflow structure and configuration""" + issues = { + 'errors': [], + 'warnings': [], + 'valid': True + } + + # Check required fields + if 'nodes' not in workflow_data: + issues['errors'].append("Missing 'nodes' field") + issues['valid'] = False + return issues + + nodes = workflow_data.get('nodes', []) + connections = workflow_data.get('connections', {}) + + # Validate nodes + node_names = set() + for node in nodes: + if 'name' not in node: + issues['errors'].append("Node missing 'name' field") + issues['valid'] = False + else: + node_names.add(node['name']) + + if 'type' not in node: + issues['errors'].append(f"Node '{node.get('name', 'unknown')}' missing 'type' field") + issues['valid'] = False + + # Check for required credentials + if node.get('type', '').startswith('n8n-nodes-base'): + credentials = node.get('credentials', {}) + if not credentials and node['type'] not in ['n8n-nodes-base.start', 'n8n-nodes-base.set']: + issues['warnings'].append(f"Node '{node['name']}' may require credentials") + + # Validate connections + for source_node, targets in connections.items(): + if source_node not in node_names: + issues['errors'].append(f"Connection references non-existent source node: {source_node}") + issues['valid'] = False + + for output_type, output_connections in targets.items(): + for conn_list in output_connections: + for conn in conn_list: + target_node = conn.get('node') + if target_node and target_node not in node_names: + issues['errors'].append(f"Connection references non-existent target node: {target_node}") + issues['valid'] = False + + # Check for disconnected nodes + connected_nodes = set(connections.keys()) + for targets in connections.values(): + for output_connections in targets.values(): + for conn_list in output_connections: + for conn in conn_list: + connected_nodes.add(conn.get('node')) + + disconnected = node_names - connected_nodes + if disconnected and len(nodes) > 1: + for node in disconnected: + issues['warnings'].append(f"Node '{node}' appears to be disconnected") + + return issues + + def dry_run_workflow(self, workflow_id: str, test_data: Dict = None) -> Dict: + """Test workflow execution with mock data (creates temp execution)""" + # Execute workflow with test data and return results + result = self.execute_workflow(workflow_id, data=test_data) + return { + 'execution_id': result.get('data', {}).get('executionId'), + 'status': 'initiated', + 'test_data': test_data + } + + # Optimization & Analytics + def get_workflow_statistics(self, workflow_id: str, days: int = 7) -> Dict: + """Get workflow execution statistics""" + executions = self.list_executions(workflow_id=workflow_id, limit=100) + + stats = { + 'total_executions': len(executions), + 'successful': 0, + 'failed': 0, + 'execution_times': [], + 'error_patterns': {} + } + + for execution in executions: + status = execution.get('finished') + if status: + stats['successful'] += 1 + else: + stats['failed'] += 1 + error = execution.get('data', {}).get('resultData', {}).get('error', {}).get('message', 'Unknown error') + stats['error_patterns'][error] = stats['error_patterns'].get(error, 0) + 1 + + # Execution time + start = execution.get('startedAt') + stop = execution.get('stoppedAt') + if start and stop: + # Calculate duration (simplified) + stats['execution_times'].append({'start': start, 'stop': stop}) + + # Calculate success rate + if stats['total_executions'] > 0: + stats['success_rate'] = (stats['successful'] / stats['total_executions']) * 100 + else: + stats['success_rate'] = 0 + + return stats + + def analyze_workflow_performance(self, workflow_id: str) -> Dict: + """Analyze workflow performance and identify bottlenecks""" + workflow = self.get_workflow(workflow_id) + executions = self.list_executions(workflow_id=workflow_id, limit=10) + + analysis = { + 'node_count': len(workflow.get('nodes', [])), + 'connection_count': sum(len(conns) for conns in workflow.get('connections', {}).values()), + 'parallel_opportunities': [], + 'bottlenecks': [], + 'optimization_suggestions': [] + } + + # Analyze node structure for parallel opportunities + nodes = workflow.get('nodes', []) + connections = workflow.get('connections', {}) + + # Find nodes that could be parallelized + for node in nodes: + node_name = node['name'] + if node_name in connections: + outputs = connections[node_name] + # If node has multiple outputs, suggest parallel execution + total_connections = sum(len(conn_list) for output in outputs.values() for conn_list in output) + if total_connections > 1: + analysis['parallel_opportunities'].append({ + 'node': node_name, + 'connection_count': total_connections, + 'suggestion': 'Consider using Split In Batches for parallel processing' + }) + + # Analyze execution patterns + if executions: + analysis['optimization_suggestions'].append({ + 'type': 'monitoring', + 'suggestion': 'Enable execution data retention for better debugging' + }) + + # Check for common anti-patterns + for node in nodes: + if node.get('type') == 'n8n-nodes-base.httpRequest': + analysis['optimization_suggestions'].append({ + 'type': 'caching', + 'node': node['name'], + 'suggestion': 'Consider caching HTTP responses for frequently accessed data' + }) + + return analysis + + +def main(): + parser = argparse.ArgumentParser(description='n8n API Client') + parser.add_argument('action', choices=[ + 'list-workflows', 'get-workflow', 'create', 'activate', 'deactivate', + 'list-executions', 'get-execution', 'execute', 'validate', 'stats' + ]) + parser.add_argument('--id', help='Workflow or execution ID') + parser.add_argument('--active', type=lambda x: x.lower() == 'true', help='Filter by active status') + parser.add_argument('--limit', type=int, default=20, help='Limit results') + parser.add_argument('--data', help='JSON data for execution') + parser.add_argument('--from-file', help='Create workflow from JSON file') + parser.add_argument('--from-template', help='Create workflow from template name') + parser.add_argument('--days', type=int, default=7, help='Days for statistics') + parser.add_argument('--pretty', action='store_true', help='Pretty print JSON output') + + args = parser.parse_args() + + try: + client = N8nClient() + result = None + + if args.action == 'list-workflows': + result = client.list_workflows(active=args.active) + elif args.action == 'get-workflow': + if not args.id: + raise ValueError("--id required for get-workflow") + result = client.get_workflow(args.id) + elif args.action == 'create': + if args.from_file: + with open(args.from_file, 'r') as f: + workflow_data = json.load(f) + result = client.create_workflow(workflow_data) + elif args.from_template: + # Load template + template_path = Path(__file__).parent.parent / 'templates' / f'{args.from_template}.json' + if not template_path.exists(): + raise ValueError(f"Template not found: {args.from_template}") + with open(template_path, 'r') as f: + workflow_data = json.load(f) + result = client.create_workflow(workflow_data) + else: + raise ValueError("--from-file or --from-template required for create") + elif args.action == 'activate': + if not args.id: + raise ValueError("--id required for activate") + result = client.activate_workflow(args.id) + elif args.action == 'deactivate': + if not args.id: + raise ValueError("--id required for deactivate") + result = client.deactivate_workflow(args.id) + elif args.action == 'list-executions': + result = client.list_executions(workflow_id=args.id, limit=args.limit) + elif args.action == 'get-execution': + if not args.id: + raise ValueError("--id required for get-execution") + result = client.get_execution(args.id) + elif args.action == 'execute': + if not args.id: + raise ValueError("--id required for execute") + data = json.loads(args.data) if args.data else None + result = client.execute_workflow(args.id, data=data) + elif args.action == 'validate': + if args.from_file: + with open(args.from_file, 'r') as f: + workflow_data = json.load(f) + elif args.id: + workflow_data = client.get_workflow(args.id) + else: + raise ValueError("--id or --from-file required for validate") + result = client.validate_workflow(workflow_data) + elif args.action == 'stats': + if not args.id: + raise ValueError("--id required for stats") + result = client.get_workflow_statistics(args.id, days=args.days) + + # Output + if args.pretty: + print(json.dumps(result, indent=2)) + else: + print(json.dumps(result)) + + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/scripts/n8n_optimizer.py b/scripts/n8n_optimizer.py new file mode 100644 index 0000000..509387c --- /dev/null +++ b/scripts/n8n_optimizer.py @@ -0,0 +1,479 @@ +#!/usr/bin/env python3 +""" +n8n Workflow Optimizer +Analyze and optimize workflow performance +""" + +import sys +import json +import argparse +from datetime import datetime, timedelta +from typing import Dict, List, Any +from collections import defaultdict + +# Import N8nClient - handle both direct and module imports +try: + from n8n_api import N8nClient +except ImportError: + from scripts.n8n_api import N8nClient + + +class WorkflowOptimizer: + """Workflow performance analyzer and optimizer""" + + def __init__(self, client: N8nClient = None): + self.client = client or N8nClient() + + def analyze_performance(self, workflow_id: str, days: int = 7) -> Dict: + """Comprehensive performance analysis""" + workflow = self.client.get_workflow(workflow_id) + statistics = self.client.get_workflow_statistics(workflow_id, days=days) + + analysis = { + 'workflow_id': workflow_id, + 'workflow_name': workflow.get('name'), + 'analysis_period_days': days, + 'execution_metrics': self._analyze_execution_metrics(statistics), + 'node_analysis': self._analyze_nodes(workflow), + 'connection_analysis': self._analyze_connections(workflow), + 'performance_score': 0, + 'bottlenecks': [], + 'optimization_opportunities': [] + } + + # Identify bottlenecks + analysis['bottlenecks'] = self._identify_bottlenecks(workflow, statistics) + + # Find optimization opportunities + analysis['optimization_opportunities'] = self._find_optimizations(workflow, statistics) + + # Calculate performance score (0-100) + analysis['performance_score'] = self._calculate_performance_score(analysis) + + return analysis + + def _analyze_execution_metrics(self, statistics: Dict) -> Dict: + """Analyze execution metrics""" + metrics = { + 'total_executions': statistics.get('total_executions', 0), + 'successful_executions': statistics.get('successful', 0), + 'failed_executions': statistics.get('failed', 0), + 'success_rate': statistics.get('success_rate', 0), + 'failure_rate': 0 + } + + if metrics['total_executions'] > 0: + metrics['failure_rate'] = (metrics['failed_executions'] / metrics['total_executions']) * 100 + + # Categorize health + if metrics['success_rate'] >= 95: + metrics['health'] = 'excellent' + elif metrics['success_rate'] >= 80: + metrics['health'] = 'good' + elif metrics['success_rate'] >= 60: + metrics['health'] = 'fair' + else: + metrics['health'] = 'poor' + + return metrics + + def _analyze_nodes(self, workflow: Dict) -> Dict: + """Analyze workflow nodes""" + nodes = workflow.get('nodes', []) + + analysis = { + 'total_nodes': len(nodes), + 'node_types': defaultdict(int), + 'complexity_score': 0, + 'expensive_nodes': [] + } + + # Count node types + for node in nodes: + node_type = node.get('type', '').split('.')[-1] + analysis['node_types'][node_type] += 1 + + # Identify potentially expensive operations + expensive_types = [ + 'httpRequest', + 'postgres', + 'mysql', + 'mongodb', + 'googleSheets', + 'airtable', + 'webhook' + ] + + for node in nodes: + node_type = node.get('type', '') + for exp_type in expensive_types: + if exp_type in node_type: + analysis['expensive_nodes'].append({ + 'name': node.get('name'), + 'type': node_type, + 'reason': self._get_expense_reason(exp_type) + }) + + # Calculate complexity score + analysis['complexity_score'] = self._calculate_complexity(workflow) + + return analysis + + def _get_expense_reason(self, node_type: str) -> str: + """Get reason why node type is expensive""" + reasons = { + 'httpRequest': 'External API calls can be slow and rate-limited', + 'postgres': 'Database queries can be slow with large datasets', + 'mysql': 'Database queries can be slow with large datasets', + 'mongodb': 'Database queries can be slow with large datasets', + 'googleSheets': 'Google Sheets API has rate limits and can be slow', + 'airtable': 'Airtable API has rate limits', + 'webhook': 'Waiting for webhook responses can cause delays' + } + return reasons.get(node_type, 'Potentially expensive operation') + + def _analyze_connections(self, workflow: Dict) -> Dict: + """Analyze workflow connections""" + connections = workflow.get('connections', {}) + + analysis = { + 'total_connections': 0, + 'parallel_paths': 0, + 'sequential_paths': 0, + 'max_path_length': 0 + } + + # Count connections + for source, targets in connections.items(): + for output_type, output_conns in targets.items(): + for conn_list in output_conns: + analysis['total_connections'] += len(conn_list) + + # Check for parallel paths + if len(conn_list) > 1: + analysis['parallel_paths'] += 1 + + return analysis + + def _calculate_complexity(self, workflow: Dict) -> int: + """Calculate workflow complexity score (0-100)""" + nodes = workflow.get('nodes', []) + connections = workflow.get('connections', {}) + + # Base complexity from node count + complexity = min(len(nodes) * 5, 50) + + # Add complexity for connections + total_connections = sum( + len(conn) + for targets in connections.values() + for output_conns in targets.values() + for conn in output_conns + ) + complexity += min(total_connections * 3, 30) + + # Add complexity for conditional logic + for node in nodes: + if node.get('type') == 'n8n-nodes-base.if': + complexity += 5 + elif node.get('type') == 'n8n-nodes-base.switch': + complexity += 10 + + return min(complexity, 100) + + def _identify_bottlenecks(self, workflow: Dict, statistics: Dict) -> List[Dict]: + """Identify performance bottlenecks""" + bottlenecks = [] + nodes = workflow.get('nodes', []) + + # Check for sequential expensive operations + expensive_types = ['httpRequest', 'postgres', 'mysql', 'mongodb'] + expensive_nodes = [ + node for node in nodes + if any(exp in node.get('type', '') for exp in expensive_types) + ] + + if len(expensive_nodes) > 3: + bottlenecks.append({ + 'type': 'sequential_expensive_operations', + 'severity': 'high', + 'description': f'Workflow has {len(expensive_nodes)} potentially expensive operations running sequentially', + 'affected_nodes': [node['name'] for node in expensive_nodes], + 'impact': 'High execution time' + }) + + # Check for high failure rate + if statistics.get('failed', 0) > statistics.get('successful', 0): + bottlenecks.append({ + 'type': 'high_failure_rate', + 'severity': 'critical', + 'description': 'Workflow has more failures than successes', + 'impact': 'Unreliable execution' + }) + + # Check for missing error handling + has_error_handling = any( + node.get('type') in ['n8n-nodes-base.errorTrigger', 'n8n-nodes-base.if'] + for node in nodes + ) + + if not has_error_handling and len(nodes) > 3: + bottlenecks.append({ + 'type': 'missing_error_handling', + 'severity': 'medium', + 'description': 'Workflow lacks error handling nodes', + 'impact': 'Failures may not be handled gracefully' + }) + + return bottlenecks + + def _find_optimizations(self, workflow: Dict, statistics: Dict) -> List[Dict]: + """Find optimization opportunities""" + optimizations = [] + nodes = workflow.get('nodes', []) + connections = workflow.get('connections', {}) + + # Opportunity 1: Parallel execution + for source_node, targets in connections.items(): + for output_conns in targets.values(): + for conn_list in output_conns: + if len(conn_list) > 1: + optimizations.append({ + 'type': 'parallel_execution', + 'priority': 'high', + 'description': f'Node "{source_node}" branches to multiple nodes - already optimized for parallel execution', + 'node': source_node, + 'benefit': 'Reduced execution time through parallelization' + }) + + # Opportunity 2: Caching + http_nodes = [node for node in nodes if 'httpRequest' in node.get('type', '')] + if http_nodes: + optimizations.append({ + 'type': 'caching', + 'priority': 'medium', + 'description': f'Found {len(http_nodes)} HTTP request nodes - consider caching responses', + 'affected_nodes': [node['name'] for node in http_nodes], + 'benefit': 'Reduced API calls and faster execution', + 'implementation': 'Use Function or Code nodes to implement simple caching' + }) + + # Opportunity 3: Batch processing + loop_nodes = [node for node in nodes if 'loop' in node.get('type', '').lower()] + if not loop_nodes: + optimizations.append({ + 'type': 'batch_processing', + 'priority': 'low', + 'description': 'Consider using "Split In Batches" node for processing large datasets', + 'benefit': 'Better memory management and parallel processing', + 'implementation': 'Add "Split In Batches" node before expensive operations' + }) + + # Opportunity 4: Error handling + error_nodes = [node for node in nodes if 'error' in node.get('type', '').lower()] + if not error_nodes and len(nodes) > 3: + optimizations.append({ + 'type': 'error_handling', + 'priority': 'high', + 'description': 'Add error handling to improve reliability', + 'benefit': 'Graceful error recovery and better debugging', + 'implementation': 'Add "Error Trigger" or "IF" nodes to handle failures' + }) + + # Opportunity 5: Reduce complexity + complexity = self._calculate_complexity(workflow) + if complexity > 70: + optimizations.append({ + 'type': 'reduce_complexity', + 'priority': 'medium', + 'description': f'Workflow complexity score is {complexity}/100 - consider splitting into sub-workflows', + 'benefit': 'Easier maintenance and debugging', + 'implementation': 'Break workflow into smaller, reusable workflows' + }) + + # Opportunity 6: Execution settings + workflow_settings = workflow.get('settings', {}) + if not workflow_settings.get('executionTimeout'): + optimizations.append({ + 'type': 'execution_timeout', + 'priority': 'low', + 'description': 'Set execution timeout to prevent hanging workflows', + 'benefit': 'Prevent resource waste from stuck executions', + 'implementation': 'Add timeout in workflow settings' + }) + + return optimizations + + def _calculate_performance_score(self, analysis: Dict) -> int: + """Calculate overall performance score (0-100)""" + score = 100 + + # Deduct for execution failures + metrics = analysis.get('execution_metrics', {}) + success_rate = metrics.get('success_rate', 100) + score -= (100 - success_rate) * 0.5 + + # Deduct for complexity + complexity = analysis.get('node_analysis', {}).get('complexity_score', 0) + if complexity > 70: + score -= (complexity - 70) * 0.3 + + # Deduct for bottlenecks + bottlenecks = analysis.get('bottlenecks', []) + for bottleneck in bottlenecks: + severity = bottleneck.get('severity', 'low') + if severity == 'critical': + score -= 20 + elif severity == 'high': + score -= 10 + elif severity == 'medium': + score -= 5 + + # Deduct for high-priority optimizations not implemented + optimizations = analysis.get('optimization_opportunities', []) + high_priority = [opt for opt in optimizations if opt.get('priority') == 'high'] + score -= len(high_priority) * 5 + + return max(0, min(100, int(score))) + + def suggest_optimizations(self, workflow_id: str) -> Dict: + """Generate optimization suggestions""" + analysis = self.analyze_performance(workflow_id) + + suggestions = { + 'workflow_id': workflow_id, + 'performance_score': analysis['performance_score'], + 'health': analysis['execution_metrics']['health'], + 'priority_actions': [], + 'quick_wins': [], + 'long_term_improvements': [] + } + + # Categorize optimizations by effort and impact + for opt in analysis['optimization_opportunities']: + priority = opt.get('priority', 'low') + + if priority == 'high': + suggestions['priority_actions'].append(opt) + elif priority == 'medium': + suggestions['quick_wins'].append(opt) + else: + suggestions['long_term_improvements'].append(opt) + + # Add bottleneck fixes as priority actions + for bottleneck in analysis['bottlenecks']: + if bottleneck.get('severity') in ['critical', 'high']: + suggestions['priority_actions'].append({ + 'type': 'fix_bottleneck', + 'priority': 'critical', + 'description': f"Fix bottleneck: {bottleneck['description']}", + 'benefit': f"Resolve: {bottleneck['impact']}" + }) + + return suggestions + + def generate_optimization_report(self, analysis: Dict) -> str: + """Generate human-readable optimization report""" + report = [] + report.append("=" * 70) + report.append("n8n Workflow Optimization Report") + report.append("=" * 70) + + report.append(f"\nWorkflow: {analysis['workflow_name']}") + report.append(f"Analysis Period: {analysis['analysis_period_days']} days") + report.append(f"Performance Score: {analysis['performance_score']}/100") + + # Execution Metrics + metrics = analysis['execution_metrics'] + report.append(f"\n## Execution Metrics") + report.append(f"Health Status: {metrics['health'].upper()}") + report.append(f"Total Executions: {metrics['total_executions']}") + report.append(f"Success Rate: {metrics['success_rate']:.1f}%") + report.append(f"Failure Rate: {metrics['failure_rate']:.1f}%") + + # Node Analysis + node_analysis = analysis['node_analysis'] + report.append(f"\n## Workflow Structure") + report.append(f"Total Nodes: {node_analysis['total_nodes']}") + report.append(f"Complexity Score: {node_analysis['complexity_score']}/100") + + if node_analysis['expensive_nodes']: + report.append(f"\nExpensive Operations ({len(node_analysis['expensive_nodes'])}):") + for node in node_analysis['expensive_nodes'][:5]: + report.append(f" โ€ข {node['name']}: {node['reason']}") + + # Bottlenecks + if analysis['bottlenecks']: + report.append(f"\n## Bottlenecks ({len(analysis['bottlenecks'])})") + for bottleneck in analysis['bottlenecks']: + severity = bottleneck['severity'].upper() + report.append(f"\n[{severity}] {bottleneck['type']}") + report.append(f" Description: {bottleneck['description']}") + report.append(f" Impact: {bottleneck['impact']}") + + # Optimization Opportunities + optimizations = analysis['optimization_opportunities'] + if optimizations: + report.append(f"\n## Optimization Opportunities ({len(optimizations)})") + + # Group by priority + high_priority = [opt for opt in optimizations if opt.get('priority') == 'high'] + medium_priority = [opt for opt in optimizations if opt.get('priority') == 'medium'] + low_priority = [opt for opt in optimizations if opt.get('priority') == 'low'] + + if high_priority: + report.append(f"\n### High Priority ({len(high_priority)})") + for opt in high_priority: + report.append(f"\nโ€ข {opt['type'].replace('_', ' ').title()}") + report.append(f" {opt['description']}") + report.append(f" Benefit: {opt['benefit']}") + if 'implementation' in opt: + report.append(f" How: {opt['implementation']}") + + if medium_priority: + report.append(f"\n### Medium Priority ({len(medium_priority)})") + for opt in medium_priority: + report.append(f"\nโ€ข {opt['type'].replace('_', ' ').title()}") + report.append(f" {opt['description']}") + + if low_priority: + report.append(f"\n### Low Priority ({len(low_priority)})") + for opt in low_priority: + report.append(f" โ€ข {opt['description']}") + + report.append("\n" + "=" * 70) + + return "\n".join(report) + + +def main(): + parser = argparse.ArgumentParser(description='n8n Workflow Optimizer') + parser.add_argument('action', choices=['analyze', 'suggest', 'report']) + parser.add_argument('--id', required=True, help='Workflow ID') + parser.add_argument('--days', type=int, default=7, help='Analysis period in days') + parser.add_argument('--pretty', action='store_true', help='Pretty print JSON output') + + args = parser.parse_args() + + try: + optimizer = WorkflowOptimizer() + + if args.action == 'analyze': + result = optimizer.analyze_performance(args.id, days=args.days) + print(json.dumps(result, indent=2 if args.pretty else None)) + + elif args.action == 'suggest': + result = optimizer.suggest_optimizations(args.id) + print(json.dumps(result, indent=2 if args.pretty else None)) + + elif args.action == 'report': + analysis = optimizer.analyze_performance(args.id, days=args.days) + print(optimizer.generate_optimization_report(analysis)) + + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main() diff --git a/scripts/n8n_tester.py b/scripts/n8n_tester.py new file mode 100644 index 0000000..c91713d --- /dev/null +++ b/scripts/n8n_tester.py @@ -0,0 +1,416 @@ +#!/usr/bin/env python3 +""" +n8n Workflow Testing & Validation +Test workflows before activation with validation and dry-runs +""" + +import sys +import json +import argparse +import time +from pathlib import Path +from typing import Dict, List, Any + +# Import N8nClient - handle both direct and module imports +try: + from n8n_api import N8nClient +except ImportError: + from scripts.n8n_api import N8nClient + + +class WorkflowTester: + """Workflow testing and validation""" + + def __init__(self, client: N8nClient = None): + self.client = client # Only initialize when needed + + def validate_workflow(self, workflow_id: str = None, workflow_file: str = None) -> Dict: + """Validate workflow structure and configuration""" + if workflow_id: + if not self.client: + self.client = N8nClient() + workflow_data = self.client.get_workflow(workflow_id) + elif workflow_file: + with open(workflow_file, 'r') as f: + workflow_data = json.load(f) + else: + raise ValueError("Either workflow_id or workflow_file required") + + # Perform validation - use standalone validation for files + validation = self._perform_validation(workflow_data) + + # Additional validation checks + self._check_credentials(workflow_data, validation) + self._check_node_configurations(workflow_data, validation) + self._check_execution_flow(workflow_data, validation) + + return validation + + def _perform_validation(self, workflow_data: Dict) -> Dict: + """Perform standalone workflow validation""" + issues = { + 'errors': [], + 'warnings': [], + 'valid': True + } + + # Check required fields + if 'nodes' not in workflow_data: + issues['errors'].append("Missing 'nodes' field") + issues['valid'] = False + return issues + + nodes = workflow_data.get('nodes', []) + connections = workflow_data.get('connections', {}) + + # Validate nodes + node_names = set() + for node in nodes: + if 'name' not in node: + issues['errors'].append("Node missing 'name' field") + issues['valid'] = False + else: + node_names.add(node['name']) + + if 'type' not in node: + issues['errors'].append(f"Node '{node.get('name', 'unknown')}' missing 'type' field") + issues['valid'] = False + + # Validate connections + for source_node, targets in connections.items(): + if source_node not in node_names: + issues['errors'].append(f"Connection references non-existent source node: {source_node}") + issues['valid'] = False + + for output_type, output_connections in targets.items(): + for conn_list in output_connections: + for conn in conn_list: + target_node = conn.get('node') + if target_node and target_node not in node_names: + issues['errors'].append(f"Connection references non-existent target node: {target_node}") + issues['valid'] = False + + # Check for disconnected nodes + connected_nodes = set(connections.keys()) + for targets in connections.values(): + for output_connections in targets.values(): + for conn_list in output_connections: + for conn in conn_list: + connected_nodes.add(conn.get('node')) + + disconnected = node_names - connected_nodes + if disconnected and len(nodes) > 1: + for node in disconnected: + issues['warnings'].append(f"Node '{node}' appears to be disconnected") + + return issues + + def _check_credentials(self, workflow_data: Dict, validation: Dict): + """Check for missing or invalid credentials""" + nodes = workflow_data.get('nodes', []) + + # Nodes that typically require credentials + credential_nodes = [ + 'n8n-nodes-base.httpRequest', + 'n8n-nodes-base.googleSheets', + 'n8n-nodes-base.slack', + 'n8n-nodes-base.twitter', + 'n8n-nodes-base.stripe', + 'n8n-nodes-base.postgres', + 'n8n-nodes-base.mysql', + 'n8n-nodes-base.emailSend' + ] + + for node in nodes: + node_type = node.get('type', '') + if node_type in credential_nodes: + credentials = node.get('credentials', {}) + if not credentials: + validation['warnings'].append( + f"Node '{node['name']}' ({node_type}) likely requires credentials" + ) + + def _check_node_configurations(self, workflow_data: Dict, validation: Dict): + """Check for invalid node configurations""" + nodes = workflow_data.get('nodes', []) + + for node in nodes: + node_type = node.get('type', '') + parameters = node.get('parameters', {}) + + # Check HTTP Request nodes + if node_type == 'n8n-nodes-base.httpRequest': + if not parameters.get('url'): + validation['errors'].append( + f"Node '{node['name']}' missing required URL parameter" + ) + validation['valid'] = False + + # Check webhook nodes + elif node_type == 'n8n-nodes-base.webhook': + if not parameters.get('path'): + validation['errors'].append( + f"Node '{node['name']}' missing required path parameter" + ) + validation['valid'] = False + + # Check email nodes + elif node_type == 'n8n-nodes-base.emailSend': + if not parameters.get('subject') and not parameters.get('text'): + validation['warnings'].append( + f"Node '{node['name']}' missing subject or text" + ) + + def _check_execution_flow(self, workflow_data: Dict, validation: Dict): + """Check workflow execution flow for issues""" + nodes = workflow_data.get('nodes', []) + connections = workflow_data.get('connections', {}) + + # Check for trigger nodes + trigger_types = [ + 'n8n-nodes-base.webhook', + 'n8n-nodes-base.scheduleTrigger', + 'n8n-nodes-base.manualTrigger', + 'n8n-nodes-base.start' + ] + + has_trigger = any(node.get('type') in trigger_types for node in nodes) + if not has_trigger and len(nodes) > 0: + validation['warnings'].append( + "Workflow has no trigger node. It can only be executed manually." + ) + + # Check for end nodes (nodes with no outgoing connections) + node_names = {node['name'] for node in nodes} + connected_as_source = set(connections.keys()) + end_nodes = node_names - connected_as_source + + if not end_nodes and len(nodes) > 1: + validation['warnings'].append( + "Workflow has no end nodes. This may indicate circular dependencies." + ) + + def dry_run(self, workflow_id: str, test_data: Dict = None, test_data_file: str = None) -> Dict: + """Execute workflow with test data""" + # Load test data if from file + if test_data_file: + with open(test_data_file, 'r') as f: + test_data = json.load(f) + + print(f"Running workflow {workflow_id} with test data...") + + # Execute workflow + execution_result = self.client.execute_workflow(workflow_id, data=test_data) + execution_id = execution_result.get('data', {}).get('executionId') + + if not execution_id: + return { + 'status': 'failed', + 'error': 'No execution ID returned', + 'result': execution_result + } + + print(f"Execution started: {execution_id}") + print("Waiting for execution to complete...") + + # Poll for execution completion + max_attempts = 30 + attempt = 0 + + while attempt < max_attempts: + time.sleep(2) + attempt += 1 + + try: + execution = self.client.get_execution(execution_id) + finished = execution.get('finished', False) + + if finished: + # Execution completed + success = execution.get('data', {}).get('resultData', {}).get('error') is None + + result = { + 'status': 'success' if success else 'failed', + 'execution_id': execution_id, + 'finished': True, + 'started_at': execution.get('startedAt'), + 'stopped_at': execution.get('stoppedAt'), + 'mode': execution.get('mode'), + 'data': execution.get('data', {}) + } + + if not success: + error_data = execution.get('data', {}).get('resultData', {}).get('error', {}) + result['error'] = { + 'message': error_data.get('message'), + 'description': error_data.get('description') + } + + return result + except Exception as e: + print(f"Error checking execution status: {e}") + continue + + return { + 'status': 'timeout', + 'execution_id': execution_id, + 'message': 'Execution did not complete within expected time' + } + + def test_suite(self, workflow_id: str, test_cases: List[Dict]) -> Dict: + """Run multiple test cases against workflow""" + results = { + 'workflow_id': workflow_id, + 'total_tests': len(test_cases), + 'passed': 0, + 'failed': 0, + 'test_results': [] + } + + for i, test_case in enumerate(test_cases, 1): + print(f"\nRunning test case {i}/{len(test_cases)}: {test_case.get('name', 'Unnamed')}") + + test_data = test_case.get('input', {}) + expected_output = test_case.get('expected', {}) + + # Run test + result = self.dry_run(workflow_id, test_data=test_data) + + # Check result + passed = result.get('status') == 'success' + + test_result = { + 'test_name': test_case.get('name'), + 'passed': passed, + 'input': test_data, + 'output': result, + 'expected': expected_output + } + + results['test_results'].append(test_result) + + if passed: + results['passed'] += 1 + print(f"โœ“ Test passed") + else: + results['failed'] += 1 + print(f"โœ— Test failed: {result.get('error', 'Unknown error')}") + + return results + + def generate_test_report(self, validation: Dict, dry_run: Dict = None) -> str: + """Generate human-readable test report""" + report = [] + report.append("=" * 60) + report.append("n8n Workflow Test Report") + report.append("=" * 60) + + # Validation results + report.append("\n## Validation Results") + report.append(f"Status: {'โœ“ VALID' if validation['valid'] else 'โœ— INVALID'}") + + if validation['errors']: + report.append(f"\nErrors ({len(validation['errors'])}):") + for error in validation['errors']: + report.append(f" โœ— {error}") + + if validation['warnings']: + report.append(f"\nWarnings ({len(validation['warnings'])}):") + for warning in validation['warnings']: + report.append(f" โš  {warning}") + + if not validation['errors'] and not validation['warnings']: + report.append("\nโœ“ No issues found") + + # Dry run results + if dry_run: + report.append("\n## Dry Run Results") + report.append(f"Status: {dry_run.get('status', 'unknown').upper()}") + report.append(f"Execution ID: {dry_run.get('execution_id', 'N/A')}") + + if dry_run.get('started_at'): + report.append(f"Started: {dry_run['started_at']}") + if dry_run.get('stopped_at'): + report.append(f"Stopped: {dry_run['stopped_at']}") + + if dry_run.get('error'): + report.append(f"\nError: {dry_run['error'].get('message', 'Unknown error')}") + if dry_run['error'].get('description'): + report.append(f"Description: {dry_run['error']['description']}") + + report.append("\n" + "=" * 60) + + return "\n".join(report) + + +def main(): + parser = argparse.ArgumentParser(description='n8n Workflow Testing & Validation') + parser.add_argument('action', choices=['validate', 'dry-run', 'test-suite', 'report']) + parser.add_argument('--id', help='Workflow ID') + parser.add_argument('--file', help='Workflow JSON file') + parser.add_argument('--data', help='Test data JSON string') + parser.add_argument('--data-file', help='Test data JSON file') + parser.add_argument('--test-suite', help='Test suite JSON file') + parser.add_argument('--pretty', action='store_true', help='Pretty print output') + parser.add_argument('--report', action='store_true', help='Generate human-readable report') + + args = parser.parse_args() + + try: + tester = WorkflowTester() + result = None + + if args.action == 'validate': + result = tester.validate_workflow(workflow_id=args.id, workflow_file=args.file) + + if args.report: + print(tester.generate_test_report(result)) + else: + print(json.dumps(result, indent=2 if args.pretty else None)) + + elif args.action == 'dry-run': + if not args.id: + raise ValueError("--id required for dry-run") + + test_data = None + if args.data: + test_data = json.loads(args.data) + + result = tester.dry_run( + workflow_id=args.id, + test_data=test_data, + test_data_file=args.data_file + ) + + if args.report: + validation = tester.validate_workflow(workflow_id=args.id) + print(tester.generate_test_report(validation, result)) + else: + print(json.dumps(result, indent=2 if args.pretty else None)) + + elif args.action == 'test-suite': + if not args.id: + raise ValueError("--id required for test-suite") + if not args.test_suite: + raise ValueError("--test-suite required for test-suite") + + with open(args.test_suite, 'r') as f: + test_cases = json.load(f) + + result = tester.test_suite(args.id, test_cases) + print(json.dumps(result, indent=2 if args.pretty else None)) + + elif args.action == 'report': + if not args.id: + raise ValueError("--id required for report") + + validation = tester.validate_workflow(workflow_id=args.id) + print(tester.generate_test_report(validation)) + + except Exception as e: + print(f"Error: {e}", file=sys.stderr) + sys.exit(1) + + +if __name__ == '__main__': + main()