Skip to content

βš™οΈ Distributed Worker SystemΒΆ

🎯 Overview¢

The Animation Designer Bot uses a sophisticated distributed worker system to handle the computational demands of motion graphics generation and rendering. This system provides horizontal scalability, fault tolerance, and efficient resource utilization.

πŸ—οΈ System ArchitectureΒΆ

graph TB
    subgraph JOB_MANAGEMENT["πŸ“‹ Job Management"]
        JOB_QUEUE[πŸ“‹ Job Queue<br/>Redis + Bull Queue<br/>Priority Scheduling]
        JOB_SCHEDULER[⏰ Job Scheduler<br/>Node.js + Cron<br/>Resource Allocation]
        QUEUE_MONITOR[πŸ“Š Queue Monitor<br/>Prometheus + Grafana<br/>Real-time Metrics]

        JOB_QUEUE --> JOB_SCHEDULER
        JOB_QUEUE --> QUEUE_MONITOR
    end

    subgraph WORKER_CLUSTER["βš™οΈ Worker Cluster"]
        WORKER1[βš™οΈ Worker 1<br/>Node.js + PM2<br/>Docker Container]
        WORKER2[βš™οΈ Worker 2<br/>Python + systemd<br/>Resource Management]
        WORKER3[βš™οΈ Worker 3<br/>GPU Support<br/>CUDA + OpenCL]
        WORKERN[βš™οΈ Worker N<br/>Auto-scaling<br/>Kubernetes Pods]

        JOB_SCHEDULER --> WORKER1
        JOB_SCHEDULER --> WORKER2
        JOB_SCHEDULER --> WORKER3
        JOB_SCHEDULER --> WORKERN
    end

    subgraph NATRON_ENGINES["πŸŽ₯ Natron Engines"]
        NATRON1[πŸŽ₯ Natron Renderer 1<br/>Natron Engine<br/>OpenImageIO + OpenEXR]
        NATRON2[πŸŽ₯ Natron Renderer 2<br/>GPU Acceleration<br/>CUDA + OpenCL]
        NATRON_CLI[πŸ’» Natron CLI<br/>NatronRenderer<br/>Command Line Interface]
        PYTHON_API[🐍 Natron Python API<br/>PyNatron Bindings<br/>Automation Scripts]

        WORKER1 --> NATRON1
        WORKER2 --> NATRON2
        WORKER3 --> NATRON_CLI
        WORKERN --> PYTHON_API
    end

    subgraph MONITORING["πŸ“Š Monitoring & Health"]
        HEALTH_CHECKS[πŸ₯ Health Checks<br/>Service Discovery<br/>Fault Detection]
        METRICS_COLLECTION[πŸ“ˆ Metrics Collection<br/>Performance Data<br/>Resource Usage]
        ALERTING[🚨 Alerting System<br/>Threshold Monitoring<br/>Incident Response]

        WORKER1 --> HEALTH_CHECKS
        WORKER2 --> HEALTH_CHECKS
        WORKER3 --> HEALTH_CHECKS
        WORKERN --> HEALTH_CHECKS

        HEALTH_CHECKS --> METRICS_COLLECTION
        METRICS_COLLECTION --> ALERTING
    end

    classDef job fill:#e1f5fe,stroke:#01579b
    classDef worker fill:#f3e5f5,stroke:#4a148c
    classDef natron fill:#fce4ec,stroke:#880e4f
    classDef monitor fill:#e8f5e8,stroke:#1b5e20

    class JOB_MANAGEMENT job
    class WORKER_CLUSTER worker
    class NATRON_ENGINES natron
    class MONITORING monitor

πŸ“‹ Job Queue SystemΒΆ

Redis + Bull Queue ImplementationΒΆ

// Job Queue Configuration
const Queue = require('bull');
const redis = require('redis');

const renderQueue = new Queue('motion graphics rendering', {
  redis: {
    host: process.env.REDIS_HOST,
    port: process.env.REDIS_PORT,
    password: process.env.REDIS_PASSWORD
  },
  defaultJobOptions: {
    removeOnComplete: 100,
    removeOnFail: 50,
    attempts: 3,
    backoff: {
      type: 'exponential',
      delay: 2000
    }
  }
});

// Job Types
const JOB_TYPES = {
  MOTION_GRAPHICS_RENDER: 'motion_graphics_render',
  ASSET_PROCESSING: 'asset_processing',
  QUALITY_VALIDATION: 'quality_validation',
  OUTPUT_CONVERSION: 'output_conversion'
};

// Job Priority Levels
const PRIORITY_LEVELS = {
  CRITICAL: 1,    // Real-time requests
  HIGH: 2,        // Premium users
  NORMAL: 3,      // Standard users
  LOW: 4          // Batch processing
};

Job SchedulerΒΆ

class JobScheduler {
  constructor() {
    this.queue = renderQueue;
    this.workerRegistry = new Map();
    this.resourceMonitor = new ResourceMonitor();
  }

  async scheduleJob(jobData, priority = PRIORITY_LEVELS.NORMAL) {
    const job = await this.queue.add(jobData.type, jobData, {
      priority: priority,
      delay: this.calculateDelay(jobData),
      jobId: this.generateJobId(jobData)
    });

    return job;
  }

  calculateDelay(jobData) {
    // Calculate delay based on resource availability
    const currentLoad = this.resourceMonitor.getCurrentLoad();
    const estimatedDuration = this.estimateJobDuration(jobData);

    if (currentLoad > 0.8) {
      return estimatedDuration * 0.5; // Delay for resource availability
    }

    return 0; // Immediate execution
  }

  async distributeJob(job) {
    const availableWorkers = await this.getAvailableWorkers();
    const bestWorker = this.selectBestWorker(availableWorkers, job);

    if (bestWorker) {
      await this.assignJobToWorker(job, bestWorker);
    } else {
      await this.queueJobForLater(job);
    }
  }
}

βš™οΈ Worker ImplementationΒΆ

Node.js WorkerΒΆ

// Worker Implementation
class MotionGraphicsWorker {
  constructor(workerId, config) {
    this.workerId = workerId;
    this.config = config;
    this.status = 'idle';
    this.currentJob = null;
    this.resourceUsage = new ResourceMonitor();
  }

  async start() {
    console.log(`Worker ${this.workerId} starting...`);

    // Register with job scheduler
    await this.registerWithScheduler();

    // Start job processing
    this.processJobs();

    // Start health monitoring
    this.startHealthMonitoring();
  }

  async processJobs() {
    const processor = async (job) => {
      try {
        this.status = 'processing';
        this.currentJob = job;

        console.log(`Worker ${this.workerId} processing job ${job.id}`);

        // Process based on job type
        const result = await this.processJobByType(job.data);

        this.status = 'idle';
        this.currentJob = null;

        return result;
      } catch (error) {
        console.error(`Worker ${this.workerId} job failed:`, error);
        this.status = 'error';
        throw error;
      }
    };

    renderQueue.process(processor);
  }

  async processJobByType(jobData) {
    switch (jobData.type) {
      case JOB_TYPES.MOTION_GRAPHICS_RENDER:
        return await this.renderMotionGraphics(jobData);
      case JOB_TYPES.ASSET_PROCESSING:
        return await this.processAssets(jobData);
      case JOB_TYPES.QUALITY_VALIDATION:
        return await this.validateQuality(jobData);
      default:
        throw new Error(`Unknown job type: ${jobData.type}`);
    }
  }

  async renderMotionGraphics(jobData) {
    const { timeline, outputSpecs } = jobData;

    // Create Natron project
    const natronProject = await this.createNatronProject(timeline);

    // Render via Natron CLI
    const outputPath = await this.renderWithNatron(natronProject, outputSpecs);

    // Post-process output
    const finalOutput = await this.postProcessOutput(outputPath, outputSpecs);

    return {
      success: true,
      outputPath: finalOutput,
      metadata: {
        duration: outputSpecs.duration,
        resolution: outputSpecs.resolution,
        format: outputSpecs.format
      }
    };
  }
}

Python WorkerΒΆ

# Python Worker Implementation
import asyncio
import subprocess
import psutil
from typing import Dict, Any

class PythonMotionGraphicsWorker:
    def __init__(self, worker_id: str, config: Dict[str, Any]):
        self.worker_id = worker_id
        self.config = config
        self.status = 'idle'
        self.current_job = None
        self.resource_monitor = ResourceMonitor()

    async def start(self):
        """Start the worker and begin processing jobs"""
        print(f"Python Worker {self.worker_id} starting...")

        # Register with job scheduler
        await self.register_with_scheduler()

        # Start job processing loop
        await self.process_jobs()

    async def process_jobs(self):
        """Main job processing loop"""
        while True:
            try:
                # Get next job from queue
                job = await self.get_next_job()

                if job:
                    await self.process_job(job)
                else:
                    # No jobs available, wait
                    await asyncio.sleep(1)

            except Exception as e:
                print(f"Worker {self.worker_id} error: {e}")
                await asyncio.sleep(5)  # Wait before retrying

    async def process_job(self, job: Dict[str, Any]):
        """Process a single job"""
        try:
            self.status = 'processing'
            self.current_job = job

            print(f"Worker {self.worker_id} processing job {job['id']}")

            # Process based on job type
            result = await self.process_job_by_type(job['data'])

            # Mark job as completed
            await self.mark_job_completed(job['id'], result)

            self.status = 'idle'
            self.current_job = None

        except Exception as e:
            print(f"Worker {self.worker_id} job failed: {e}")
            self.status = 'error'
            await self.mark_job_failed(job['id'], str(e))

    async def process_job_by_type(self, job_data: Dict[str, Any]):
        """Process job based on its type"""
        job_type = job_data['type']

        if job_type == 'motion_graphics_render':
            return await self.render_motion_graphics(job_data)
        elif job_type == 'asset_processing':
            return await self.process_assets(job_data)
        elif job_type == 'quality_validation':
            return await self.validate_quality(job_data)
        else:
            raise ValueError(f"Unknown job type: {job_type}")

    async def render_motion_graphics(self, job_data: Dict[str, Any]):
        """Render motion graphics using Natron"""
        timeline = job_data['timeline']
        output_specs = job_data['output_specs']

        # Create Natron project
        natron_project = await self.create_natron_project(timeline)

        # Render via Natron CLI
        output_path = await self.render_with_natron(natron_project, output_specs)

        # Post-process output
        final_output = await self.post_process_output(output_path, output_specs)

        return {
            'success': True,
            'output_path': final_output,
            'metadata': {
                'duration': output_specs['duration'],
                'resolution': output_specs['resolution'],
                'format': output_specs['format']
            }
        }

🐳 Containerization and Orchestration¢

Docker ConfigurationΒΆ

# Dockerfile for Node.js Worker
FROM node:18-alpine

# Install system dependencies
RUN apk add --no-cache \
    python3 \
    py3-pip \
    ffmpeg \
    imagemagick

# Install Natron
RUN wget -O natron.deb https://github.com/NatronGitHub/Natron/releases/download/v2.4.0/Natron-2.4.0-Linux-x86_64.deb && \
    dpkg -i natron.deb || apt-get install -f -y

# Set working directory
WORKDIR /app

# Copy package files
COPY package*.json ./

# Install dependencies
RUN npm ci --only=production

# Copy application code
COPY . .

# Create non-root user
RUN addgroup -g 1001 -S nodejs && \
    adduser -S worker -u 1001

# Switch to non-root user
USER worker

# Expose port
EXPOSE 3000

# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=5s --retries=3 \
    CMD curl -f http://localhost:3000/health || exit 1

# Start worker
CMD ["npm", "start"]

Kubernetes DeploymentΒΆ

# kubernetes/worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: motion-graphics-worker
  labels:
    app: motion-graphics-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: motion-graphics-worker
  template:
    metadata:
      labels:
        app: motion-graphics-worker
    spec:
      containers:
      - name: worker
        image: motion-graphics-worker:latest
        ports:
        - containerPort: 3000
        env:
        - name: REDIS_HOST
          value: "redis-service"
        - name: REDIS_PORT
          value: "6379"
        - name: WORKER_ID
          valueFrom:
            fieldRef:
              fieldPath: metadata.name
        resources:
          requests:
            memory: "2Gi"
            cpu: "1000m"
          limits:
            memory: "4Gi"
            cpu: "2000m"
        livenessProbe:
          httpGet:
            path: /health
            port: 3000
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe:
          httpGet:
            path: /ready
            port: 3000
          initialDelaySeconds: 5
          periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
  name: motion-graphics-worker-service
spec:
  selector:
    app: motion-graphics-worker
  ports:
  - port: 3000
    targetPort: 3000
  type: ClusterIP

πŸ“Š Monitoring and Health ChecksΒΆ

Health Check ImplementationΒΆ

// Health Check System
class HealthCheckSystem {
  constructor() {
    this.checks = new Map();
    this.setupDefaultChecks();
  }

  setupDefaultChecks() {
    // System resource check
    this.addCheck('system_resources', async () => {
      const cpuUsage = await this.getCpuUsage();
      const memoryUsage = await this.getMemoryUsage();
      const diskUsage = await this.getDiskUsage();

      return {
        status: cpuUsage < 90 && memoryUsage < 90 && diskUsage < 90 ? 'healthy' : 'unhealthy',
        details: { cpuUsage, memoryUsage, diskUsage }
      };
    });

    // Natron availability check
    this.addCheck('natron_availability', async () => {
      try {
        const result = await this.checkNatronInstallation();
        return {
          status: result ? 'healthy' : 'unhealthy',
          details: { natronVersion: result }
        };
      } catch (error) {
        return {
          status: 'unhealthy',
          details: { error: error.message }
        };
      }
    });

    // Redis connection check
    this.addCheck('redis_connection', async () => {
      try {
        await redis.ping();
        return { status: 'healthy', details: {} };
      } catch (error) {
        return {
          status: 'unhealthy',
          details: { error: error.message }
        };
      }
    });
  }

  async runAllChecks() {
    const results = {};

    for (const [name, check] of this.checks) {
      try {
        results[name] = await check();
      } catch (error) {
        results[name] = {
          status: 'unhealthy',
          details: { error: error.message }
        };
      }
    }

    return results;
  }
}

Metrics CollectionΒΆ

// Metrics Collection
class MetricsCollector {
  constructor() {
    this.metrics = new Map();
    this.setupPrometheusMetrics();
  }

  setupPrometheusMetrics() {
    const prometheus = require('prom-client');

    // Job metrics
    this.jobCounter = new prometheus.Counter({
      name: 'jobs_processed_total',
      help: 'Total number of jobs processed',
      labelNames: ['worker_id', 'job_type', 'status']
    });

    this.jobDuration = new prometheus.Histogram({
      name: 'job_duration_seconds',
      help: 'Duration of job processing',
      labelNames: ['worker_id', 'job_type'],
      buckets: [1, 5, 15, 30, 60, 120, 300, 600]
    });

    // Resource metrics
    this.cpuUsage = new prometheus.Gauge({
      name: 'worker_cpu_usage_percent',
      help: 'CPU usage percentage',
      labelNames: ['worker_id']
    });

    this.memoryUsage = new prometheus.Gauge({
      name: 'worker_memory_usage_bytes',
      help: 'Memory usage in bytes',
      labelNames: ['worker_id']
    });
  }

  recordJobCompletion(workerId, jobType, duration, success) {
    this.jobCounter.inc({
      worker_id: workerId,
      job_type: jobType,
      status: success ? 'success' : 'failure'
    });

    this.jobDuration.observe(
      { worker_id: workerId, job_type: jobType },
      duration
    );
  }

  updateResourceMetrics(workerId, cpuUsage, memoryUsage) {
    this.cpuUsage.set({ worker_id: workerId }, cpuUsage);
    this.memoryUsage.set({ worker_id: workerId }, memoryUsage);
  }
}

πŸš€ Auto-scaling ConfigurationΒΆ

Horizontal Pod AutoscalerΒΆ

# kubernetes/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: motion-graphics-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: motion-graphics-worker
  minReplicas: 2
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
  - type: Resource
    resource:
      name: memory
      target:
        type: Utilization
        averageUtilization: 80
  behavior:
    scaleDown:
      stabilizationWindowSeconds: 300
      policies:
      - type: Percent
        value: 50
        periodSeconds: 60
    scaleUp:
      stabilizationWindowSeconds: 60
      policies:
      - type: Percent
        value: 100
        periodSeconds: 30

Custom Scaling MetricsΒΆ

// Custom Scaling Logic
class CustomScaler {
  constructor() {
    this.queueMonitor = new QueueMonitor();
    this.resourceMonitor = new ResourceMonitor();
  }

  async shouldScaleUp() {
    const queueLength = await this.queueMonitor.getQueueLength();
    const averageJobDuration = await this.queueMonitor.getAverageJobDuration();
    const currentWorkers = await this.getCurrentWorkerCount();

    // Scale up if queue is getting long and jobs are taking too long
    const estimatedWaitTime = (queueLength * averageJobDuration) / currentWorkers;

    return estimatedWaitTime > 300; // 5 minutes
  }

  async shouldScaleDown() {
    const queueLength = await this.queueMonitor.getQueueLength();
    const currentWorkers = await this.getCurrentWorkerCount();
    const averageUtilization = await this.resourceMonitor.getAverageUtilization();

    // Scale down if queue is short and utilization is low
    return queueLength < 5 && averageUtilization < 30 && currentWorkers > 2;
  }
}

πŸ”§ Configuration and DeploymentΒΆ

Environment ConfigurationΒΆ

# .env.example
# Redis Configuration
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=

# Worker Configuration
WORKER_ID=worker-001
WORKER_TYPE=nodejs
MAX_CONCURRENT_JOBS=3
JOB_TIMEOUT=300000

# Natron Configuration
NATRON_PATH=/usr/bin/NatronRenderer
NATRON_PYTHON_PATH=/usr/lib/python3.8/site-packages
NATRON_PLUGIN_PATH=/usr/lib/Natron/Plugins

# Resource Limits
MAX_MEMORY_USAGE=4GB
MAX_CPU_USAGE=80%
MAX_DISK_USAGE=10GB

# Monitoring
PROMETHEUS_PORT=9090
GRAFANA_PORT=3000
HEALTH_CHECK_INTERVAL=30

Deployment ScriptsΒΆ

#!/bin/bash
# deploy-workers.sh

# Deploy Redis
kubectl apply -f kubernetes/redis-deployment.yaml

# Deploy Workers
kubectl apply -f kubernetes/worker-deployment.yaml

# Deploy HPA
kubectl apply -f kubernetes/hpa.yaml

# Deploy Monitoring
kubectl apply -f kubernetes/monitoring-stack.yaml

# Wait for deployment
kubectl wait --for=condition=available --timeout=300s deployment/motion-graphics-worker

# Verify deployment
kubectl get pods -l app=motion-graphics-worker
kubectl get hpa motion-graphics-worker-hpa

This distributed worker system provides the scalability and reliability needed for professional motion graphics generation at scale.