βοΈ Distributed Worker SystemΒΆ
π― OverviewΒΆ
The Animation Designer Bot uses a sophisticated distributed worker system to handle the computational demands of motion graphics generation and rendering. This system provides horizontal scalability, fault tolerance, and efficient resource utilization.
ποΈ System ArchitectureΒΆ
graph TB
subgraph JOB_MANAGEMENT["π Job Management"]
JOB_QUEUE[π Job Queue<br/>Redis + Bull Queue<br/>Priority Scheduling]
JOB_SCHEDULER[β° Job Scheduler<br/>Node.js + Cron<br/>Resource Allocation]
QUEUE_MONITOR[π Queue Monitor<br/>Prometheus + Grafana<br/>Real-time Metrics]
JOB_QUEUE --> JOB_SCHEDULER
JOB_QUEUE --> QUEUE_MONITOR
end
subgraph WORKER_CLUSTER["βοΈ Worker Cluster"]
WORKER1[βοΈ Worker 1<br/>Node.js + PM2<br/>Docker Container]
WORKER2[βοΈ Worker 2<br/>Python + systemd<br/>Resource Management]
WORKER3[βοΈ Worker 3<br/>GPU Support<br/>CUDA + OpenCL]
WORKERN[βοΈ Worker N<br/>Auto-scaling<br/>Kubernetes Pods]
JOB_SCHEDULER --> WORKER1
JOB_SCHEDULER --> WORKER2
JOB_SCHEDULER --> WORKER3
JOB_SCHEDULER --> WORKERN
end
subgraph NATRON_ENGINES["π₯ Natron Engines"]
NATRON1[π₯ Natron Renderer 1<br/>Natron Engine<br/>OpenImageIO + OpenEXR]
NATRON2[π₯ Natron Renderer 2<br/>GPU Acceleration<br/>CUDA + OpenCL]
NATRON_CLI[π» Natron CLI<br/>NatronRenderer<br/>Command Line Interface]
PYTHON_API[π Natron Python API<br/>PyNatron Bindings<br/>Automation Scripts]
WORKER1 --> NATRON1
WORKER2 --> NATRON2
WORKER3 --> NATRON_CLI
WORKERN --> PYTHON_API
end
subgraph MONITORING["π Monitoring & Health"]
HEALTH_CHECKS[π₯ Health Checks<br/>Service Discovery<br/>Fault Detection]
METRICS_COLLECTION[π Metrics Collection<br/>Performance Data<br/>Resource Usage]
ALERTING[π¨ Alerting System<br/>Threshold Monitoring<br/>Incident Response]
WORKER1 --> HEALTH_CHECKS
WORKER2 --> HEALTH_CHECKS
WORKER3 --> HEALTH_CHECKS
WORKERN --> HEALTH_CHECKS
HEALTH_CHECKS --> METRICS_COLLECTION
METRICS_COLLECTION --> ALERTING
end
classDef job fill:#e1f5fe,stroke:#01579b
classDef worker fill:#f3e5f5,stroke:#4a148c
classDef natron fill:#fce4ec,stroke:#880e4f
classDef monitor fill:#e8f5e8,stroke:#1b5e20
class JOB_MANAGEMENT job
class WORKER_CLUSTER worker
class NATRON_ENGINES natron
class MONITORING monitor π Job Queue SystemΒΆ
Redis + Bull Queue ImplementationΒΆ
// Job Queue Configuration
const Queue = require('bull');
const redis = require('redis');
const renderQueue = new Queue('motion graphics rendering', {
redis: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
password: process.env.REDIS_PASSWORD
},
defaultJobOptions: {
removeOnComplete: 100,
removeOnFail: 50,
attempts: 3,
backoff: {
type: 'exponential',
delay: 2000
}
}
});
// Job Types
const JOB_TYPES = {
MOTION_GRAPHICS_RENDER: 'motion_graphics_render',
ASSET_PROCESSING: 'asset_processing',
QUALITY_VALIDATION: 'quality_validation',
OUTPUT_CONVERSION: 'output_conversion'
};
// Job Priority Levels
const PRIORITY_LEVELS = {
CRITICAL: 1, // Real-time requests
HIGH: 2, // Premium users
NORMAL: 3, // Standard users
LOW: 4 // Batch processing
};
Job SchedulerΒΆ
class JobScheduler {
constructor() {
this.queue = renderQueue;
this.workerRegistry = new Map();
this.resourceMonitor = new ResourceMonitor();
}
async scheduleJob(jobData, priority = PRIORITY_LEVELS.NORMAL) {
const job = await this.queue.add(jobData.type, jobData, {
priority: priority,
delay: this.calculateDelay(jobData),
jobId: this.generateJobId(jobData)
});
return job;
}
calculateDelay(jobData) {
// Calculate delay based on resource availability
const currentLoad = this.resourceMonitor.getCurrentLoad();
const estimatedDuration = this.estimateJobDuration(jobData);
if (currentLoad > 0.8) {
return estimatedDuration * 0.5; // Delay for resource availability
}
return 0; // Immediate execution
}
async distributeJob(job) {
const availableWorkers = await this.getAvailableWorkers();
const bestWorker = this.selectBestWorker(availableWorkers, job);
if (bestWorker) {
await this.assignJobToWorker(job, bestWorker);
} else {
await this.queueJobForLater(job);
}
}
}
βοΈ Worker ImplementationΒΆ
Node.js WorkerΒΆ
// Worker Implementation
class MotionGraphicsWorker {
constructor(workerId, config) {
this.workerId = workerId;
this.config = config;
this.status = 'idle';
this.currentJob = null;
this.resourceUsage = new ResourceMonitor();
}
async start() {
console.log(`Worker ${this.workerId} starting...`);
// Register with job scheduler
await this.registerWithScheduler();
// Start job processing
this.processJobs();
// Start health monitoring
this.startHealthMonitoring();
}
async processJobs() {
const processor = async (job) => {
try {
this.status = 'processing';
this.currentJob = job;
console.log(`Worker ${this.workerId} processing job ${job.id}`);
// Process based on job type
const result = await this.processJobByType(job.data);
this.status = 'idle';
this.currentJob = null;
return result;
} catch (error) {
console.error(`Worker ${this.workerId} job failed:`, error);
this.status = 'error';
throw error;
}
};
renderQueue.process(processor);
}
async processJobByType(jobData) {
switch (jobData.type) {
case JOB_TYPES.MOTION_GRAPHICS_RENDER:
return await this.renderMotionGraphics(jobData);
case JOB_TYPES.ASSET_PROCESSING:
return await this.processAssets(jobData);
case JOB_TYPES.QUALITY_VALIDATION:
return await this.validateQuality(jobData);
default:
throw new Error(`Unknown job type: ${jobData.type}`);
}
}
async renderMotionGraphics(jobData) {
const { timeline, outputSpecs } = jobData;
// Create Natron project
const natronProject = await this.createNatronProject(timeline);
// Render via Natron CLI
const outputPath = await this.renderWithNatron(natronProject, outputSpecs);
// Post-process output
const finalOutput = await this.postProcessOutput(outputPath, outputSpecs);
return {
success: true,
outputPath: finalOutput,
metadata: {
duration: outputSpecs.duration,
resolution: outputSpecs.resolution,
format: outputSpecs.format
}
};
}
}
Python WorkerΒΆ
# Python Worker Implementation
import asyncio
import subprocess
import psutil
from typing import Dict, Any
class PythonMotionGraphicsWorker:
def __init__(self, worker_id: str, config: Dict[str, Any]):
self.worker_id = worker_id
self.config = config
self.status = 'idle'
self.current_job = None
self.resource_monitor = ResourceMonitor()
async def start(self):
"""Start the worker and begin processing jobs"""
print(f"Python Worker {self.worker_id} starting...")
# Register with job scheduler
await self.register_with_scheduler()
# Start job processing loop
await self.process_jobs()
async def process_jobs(self):
"""Main job processing loop"""
while True:
try:
# Get next job from queue
job = await self.get_next_job()
if job:
await self.process_job(job)
else:
# No jobs available, wait
await asyncio.sleep(1)
except Exception as e:
print(f"Worker {self.worker_id} error: {e}")
await asyncio.sleep(5) # Wait before retrying
async def process_job(self, job: Dict[str, Any]):
"""Process a single job"""
try:
self.status = 'processing'
self.current_job = job
print(f"Worker {self.worker_id} processing job {job['id']}")
# Process based on job type
result = await self.process_job_by_type(job['data'])
# Mark job as completed
await self.mark_job_completed(job['id'], result)
self.status = 'idle'
self.current_job = None
except Exception as e:
print(f"Worker {self.worker_id} job failed: {e}")
self.status = 'error'
await self.mark_job_failed(job['id'], str(e))
async def process_job_by_type(self, job_data: Dict[str, Any]):
"""Process job based on its type"""
job_type = job_data['type']
if job_type == 'motion_graphics_render':
return await self.render_motion_graphics(job_data)
elif job_type == 'asset_processing':
return await self.process_assets(job_data)
elif job_type == 'quality_validation':
return await self.validate_quality(job_data)
else:
raise ValueError(f"Unknown job type: {job_type}")
async def render_motion_graphics(self, job_data: Dict[str, Any]):
"""Render motion graphics using Natron"""
timeline = job_data['timeline']
output_specs = job_data['output_specs']
# Create Natron project
natron_project = await self.create_natron_project(timeline)
# Render via Natron CLI
output_path = await self.render_with_natron(natron_project, output_specs)
# Post-process output
final_output = await self.post_process_output(output_path, output_specs)
return {
'success': True,
'output_path': final_output,
'metadata': {
'duration': output_specs['duration'],
'resolution': output_specs['resolution'],
'format': output_specs['format']
}
}
π³ Containerization and OrchestrationΒΆ
Docker ConfigurationΒΆ
# Dockerfile for Node.js Worker
FROM node:18-alpine
# Install system dependencies
RUN apk add --no-cache \
python3 \
py3-pip \
ffmpeg \
imagemagick
# Install Natron
RUN wget -O natron.deb https://github.com/NatronGitHub/Natron/releases/download/v2.4.0/Natron-2.4.0-Linux-x86_64.deb && \
dpkg -i natron.deb || apt-get install -f -y
# Set working directory
WORKDIR /app
# Copy package files
COPY package*.json ./
# Install dependencies
RUN npm ci --only=production
# Copy application code
COPY . .
# Create non-root user
RUN addgroup -g 1001 -S nodejs && \
adduser -S worker -u 1001
# Switch to non-root user
USER worker
# Expose port
EXPOSE 3000
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
CMD curl -f http://localhost:3000/health || exit 1
# Start worker
CMD ["npm", "start"]
Kubernetes DeploymentΒΆ
# kubernetes/worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: motion-graphics-worker
labels:
app: motion-graphics-worker
spec:
replicas: 3
selector:
matchLabels:
app: motion-graphics-worker
template:
metadata:
labels:
app: motion-graphics-worker
spec:
containers:
- name: worker
image: motion-graphics-worker:latest
ports:
- containerPort: 3000
env:
- name: REDIS_HOST
value: "redis-service"
- name: REDIS_PORT
value: "6379"
- name: WORKER_ID
valueFrom:
fieldRef:
fieldPath: metadata.name
resources:
requests:
memory: "2Gi"
cpu: "1000m"
limits:
memory: "4Gi"
cpu: "2000m"
livenessProbe:
httpGet:
path: /health
port: 3000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 3000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: motion-graphics-worker-service
spec:
selector:
app: motion-graphics-worker
ports:
- port: 3000
targetPort: 3000
type: ClusterIP
π Monitoring and Health ChecksΒΆ
Health Check ImplementationΒΆ
// Health Check System
class HealthCheckSystem {
constructor() {
this.checks = new Map();
this.setupDefaultChecks();
}
setupDefaultChecks() {
// System resource check
this.addCheck('system_resources', async () => {
const cpuUsage = await this.getCpuUsage();
const memoryUsage = await this.getMemoryUsage();
const diskUsage = await this.getDiskUsage();
return {
status: cpuUsage < 90 && memoryUsage < 90 && diskUsage < 90 ? 'healthy' : 'unhealthy',
details: { cpuUsage, memoryUsage, diskUsage }
};
});
// Natron availability check
this.addCheck('natron_availability', async () => {
try {
const result = await this.checkNatronInstallation();
return {
status: result ? 'healthy' : 'unhealthy',
details: { natronVersion: result }
};
} catch (error) {
return {
status: 'unhealthy',
details: { error: error.message }
};
}
});
// Redis connection check
this.addCheck('redis_connection', async () => {
try {
await redis.ping();
return { status: 'healthy', details: {} };
} catch (error) {
return {
status: 'unhealthy',
details: { error: error.message }
};
}
});
}
async runAllChecks() {
const results = {};
for (const [name, check] of this.checks) {
try {
results[name] = await check();
} catch (error) {
results[name] = {
status: 'unhealthy',
details: { error: error.message }
};
}
}
return results;
}
}
Metrics CollectionΒΆ
// Metrics Collection
class MetricsCollector {
constructor() {
this.metrics = new Map();
this.setupPrometheusMetrics();
}
setupPrometheusMetrics() {
const prometheus = require('prom-client');
// Job metrics
this.jobCounter = new prometheus.Counter({
name: 'jobs_processed_total',
help: 'Total number of jobs processed',
labelNames: ['worker_id', 'job_type', 'status']
});
this.jobDuration = new prometheus.Histogram({
name: 'job_duration_seconds',
help: 'Duration of job processing',
labelNames: ['worker_id', 'job_type'],
buckets: [1, 5, 15, 30, 60, 120, 300, 600]
});
// Resource metrics
this.cpuUsage = new prometheus.Gauge({
name: 'worker_cpu_usage_percent',
help: 'CPU usage percentage',
labelNames: ['worker_id']
});
this.memoryUsage = new prometheus.Gauge({
name: 'worker_memory_usage_bytes',
help: 'Memory usage in bytes',
labelNames: ['worker_id']
});
}
recordJobCompletion(workerId, jobType, duration, success) {
this.jobCounter.inc({
worker_id: workerId,
job_type: jobType,
status: success ? 'success' : 'failure'
});
this.jobDuration.observe(
{ worker_id: workerId, job_type: jobType },
duration
);
}
updateResourceMetrics(workerId, cpuUsage, memoryUsage) {
this.cpuUsage.set({ worker_id: workerId }, cpuUsage);
this.memoryUsage.set({ worker_id: workerId }, memoryUsage);
}
}
π Auto-scaling ConfigurationΒΆ
Horizontal Pod AutoscalerΒΆ
# kubernetes/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: motion-graphics-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: motion-graphics-worker
minReplicas: 2
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 30
Custom Scaling MetricsΒΆ
// Custom Scaling Logic
class CustomScaler {
constructor() {
this.queueMonitor = new QueueMonitor();
this.resourceMonitor = new ResourceMonitor();
}
async shouldScaleUp() {
const queueLength = await this.queueMonitor.getQueueLength();
const averageJobDuration = await this.queueMonitor.getAverageJobDuration();
const currentWorkers = await this.getCurrentWorkerCount();
// Scale up if queue is getting long and jobs are taking too long
const estimatedWaitTime = (queueLength * averageJobDuration) / currentWorkers;
return estimatedWaitTime > 300; // 5 minutes
}
async shouldScaleDown() {
const queueLength = await this.queueMonitor.getQueueLength();
const currentWorkers = await this.getCurrentWorkerCount();
const averageUtilization = await this.resourceMonitor.getAverageUtilization();
// Scale down if queue is short and utilization is low
return queueLength < 5 && averageUtilization < 30 && currentWorkers > 2;
}
}
π§ Configuration and DeploymentΒΆ
Environment ConfigurationΒΆ
# .env.example
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
# Worker Configuration
WORKER_ID=worker-001
WORKER_TYPE=nodejs
MAX_CONCURRENT_JOBS=3
JOB_TIMEOUT=300000
# Natron Configuration
NATRON_PATH=/usr/bin/NatronRenderer
NATRON_PYTHON_PATH=/usr/lib/python3.8/site-packages
NATRON_PLUGIN_PATH=/usr/lib/Natron/Plugins
# Resource Limits
MAX_MEMORY_USAGE=4GB
MAX_CPU_USAGE=80%
MAX_DISK_USAGE=10GB
# Monitoring
PROMETHEUS_PORT=9090
GRAFANA_PORT=3000
HEALTH_CHECK_INTERVAL=30
Deployment ScriptsΒΆ
#!/bin/bash
# deploy-workers.sh
# Deploy Redis
kubectl apply -f kubernetes/redis-deployment.yaml
# Deploy Workers
kubectl apply -f kubernetes/worker-deployment.yaml
# Deploy HPA
kubectl apply -f kubernetes/hpa.yaml
# Deploy Monitoring
kubectl apply -f kubernetes/monitoring-stack.yaml
# Wait for deployment
kubectl wait --for=condition=available --timeout=300s deployment/motion-graphics-worker
# Verify deployment
kubectl get pods -l app=motion-graphics-worker
kubectl get hpa motion-graphics-worker-hpa
This distributed worker system provides the scalability and reliability needed for professional motion graphics generation at scale.