π Data & AnalyticsΒΆ
π― OverviewΒΆ
The Data & Analytics component provides comprehensive data management, analytics, and reporting capabilities for the Email Marketing system. It handles contact management, campaign analytics, database operations, and report generation with real-time insights and performance tracking.
ποΈ ArchitectureΒΆ
Core ComponentsΒΆ
1. Contact ManagerΒΆ
Responsibility: Contact sourcing, validation, and management - LinkedIn Integration: Professional contact scraping and enrichment - Zero Bounce Integration: Email validation and verification - Contact Enrichment: Data enhancement and quality improvement - List Management: Contact segmentation and organization
2. Analytics EngineΒΆ
Responsibility: Campaign performance analysis and insights - Real-time Analytics: Live campaign performance monitoring - Performance Metrics: Comprehensive KPI tracking and analysis - Trend Analysis: Historical data analysis and trend identification - Predictive Analytics: Machine learning-based performance prediction
3. Database ManagerΒΆ
Responsibility: Data persistence and retrieval operations - PostgreSQL Management: Complex database operations and optimization - Data Modeling: Schema design and relationship management - Query Optimization: Performance tuning and query optimization - Backup & Recovery: Data protection and disaster recovery
4. Reporting SystemΒΆ
Responsibility: Report generation and distribution - Automated Reports: Scheduled report generation and delivery - Custom Dashboards: Interactive dashboard creation and management - Export Capabilities: Multiple format export (PDF, Excel, CSV) - Alert System: Proactive notification and alerting
π System WorkflowsΒΆ
Contact Management WorkflowΒΆ
sequenceDiagram
participant CM as Contact Manager
participant LI as LinkedIn API
participant ZB as Zero Bounce
participant DM as Database Manager
participant AE as Analytics Engine
CM->>LI: Scrape Contacts
LI-->>CM: Raw Contact Data
CM->>ZB: Validate Emails
ZB-->>CM: Validation Results
CM->>DM: Store Validated Contacts
DM-->>CM: Storage Confirmed
CM->>AE: Update Analytics
AE-->>CM: Analytics Updated Analytics Processing WorkflowΒΆ
graph TD
A[Campaign Data] --> B[Data Collection]
B --> C[Data Validation]
C --> D[Data Processing]
D --> E[Metric Calculation]
E --> F[Trend Analysis]
F --> G[Report Generation]
G --> H[Distribution] π οΈ Technical ImplementationΒΆ
Contact ManagerΒΆ
class ContactManager {
constructor(linkedInAPI, zeroBounceAPI, databaseManager) {
this.linkedInAPI = linkedInAPI;
this.zeroBounceAPI = zeroBounceAPI;
this.databaseManager = databaseManager;
}
async scrapeLinkedInContacts(searchCriteria) {
const {
keywords,
location,
industry,
companySize,
limit = 1000
} = searchCriteria;
try {
// 1. Search LinkedIn profiles
const profiles = await this.linkedInAPI.searchProfiles({
keywords,
location,
industry,
companySize,
limit
});
// 2. Extract contact information
const contacts = await this.extractContactInfo(profiles);
// 3. Validate emails
const validatedContacts = await this.validateEmails(contacts);
// 4. Store in database
await this.databaseManager.storeContacts(validatedContacts);
return {
total: contacts.length,
valid: validatedContacts.length,
invalid: contacts.length - validatedContacts.length
};
} catch (error) {
console.error('LinkedIn scraping failed:', error);
throw error;
}
}
async validateEmails(contacts) {
const emails = contacts.map(contact => contact.email);
const validationResults = await this.zeroBounceAPI.validateBatch(emails);
return contacts.filter(contact => {
const result = validationResults.find(r => r.email === contact.email);
return result && result.status === 'valid';
});
}
async enrichContactData(contact) {
const enrichmentData = await this.linkedInAPI.getProfileDetails(contact.linkedinUrl);
return {
...contact,
company: enrichmentData.company,
position: enrichmentData.position,
experience: enrichmentData.experience,
education: enrichmentData.education,
skills: enrichmentData.skills,
connections: enrichmentData.connections
};
}
}
Analytics EngineΒΆ
class AnalyticsEngine {
constructor(databaseManager, reportingSystem) {
this.db = databaseManager;
this.reporting = reportingSystem;
}
async calculateCampaignMetrics(campaignId) {
const campaign = await this.db.getCampaign(campaignId);
const activities = await this.db.getCampaignActivities(campaignId);
const metrics = {
delivery: this.calculateDeliveryMetrics(activities),
engagement: this.calculateEngagementMetrics(activities),
performance: this.calculatePerformanceMetrics(activities),
trends: await this.calculateTrends(campaignId)
};
return metrics;
}
calculateDeliveryMetrics(activities) {
const total = activities.length;
const delivered = activities.filter(a => a.status === 'delivered').length;
const bounced = activities.filter(a => a.status === 'bounced').length;
const failed = activities.filter(a => a.status === 'failed').length;
return {
total,
delivered,
bounced,
failed,
deliveryRate: (delivered / total) * 100,
bounceRate: (bounced / total) * 100,
failureRate: (failed / total) * 100
};
}
calculateEngagementMetrics(activities) {
const delivered = activities.filter(a => a.status === 'delivered');
const opened = delivered.filter(a => a.opened).length;
const clicked = delivered.filter(a => a.clicked).length;
const unsubscribed = delivered.filter(a => a.unsubscribed).length;
return {
openRate: (opened / delivered.length) * 100,
clickRate: (clicked / delivered.length) * 100,
unsubscribeRate: (unsubscribed / delivered.length) * 100,
clickToOpenRate: (clicked / opened) * 100
};
}
async calculateTrends(campaignId) {
const historicalData = await this.db.getHistoricalCampaignData(campaignId);
return {
deliveryTrend: this.calculateTrend(historicalData, 'deliveryRate'),
openTrend: this.calculateTrend(historicalData, 'openRate'),
clickTrend: this.calculateTrend(historicalData, 'clickRate'),
bounceTrend: this.calculateTrend(historicalData, 'bounceRate')
};
}
calculateTrend(data, metric) {
if (data.length < 2) return 'insufficient_data';
const recent = data.slice(-7); // Last 7 days
const previous = data.slice(-14, -7); // Previous 7 days
const recentAvg = recent.reduce((sum, d) => sum + d[metric], 0) / recent.length;
const previousAvg = previous.reduce((sum, d) => sum + d[metric], 0) / previous.length;
const change = ((recentAvg - previousAvg) / previousAvg) * 100;
if (change > 5) return 'increasing';
if (change < -5) return 'decreasing';
return 'stable';
}
}
Database ManagerΒΆ
class DatabaseManager {
constructor(pool, redis) {
this.pool = pool;
this.redis = redis;
}
async storeContacts(contacts) {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
for (const contact of contacts) {
await client.query(`
INSERT INTO contacts (
email, first_name, last_name, company, position,
linkedin_url, phone, location, industry, source,
created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, NOW(), NOW())
ON CONFLICT (email) DO UPDATE SET
first_name = EXCLUDED.first_name,
last_name = EXCLUDED.last_name,
company = EXCLUDED.company,
position = EXCLUDED.position,
updated_at = NOW()
`, [
contact.email,
contact.firstName,
contact.lastName,
contact.company,
contact.position,
contact.linkedinUrl,
contact.phone,
contact.location,
contact.industry,
'linkedin'
]);
}
await client.query('COMMIT');
// Update cache
await this.redis.del('contacts:count');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getCampaignMetrics(campaignId, dateRange) {
const cacheKey = `campaign:${campaignId}:metrics:${dateRange.start}:${dateRange.end}`;
// Check cache first
const cached = await this.redis.get(cacheKey);
if (cached) {
return JSON.parse(cached);
}
const query = `
SELECT
COUNT(*) as total_sent,
COUNT(CASE WHEN status = 'delivered' THEN 1 END) as delivered,
COUNT(CASE WHEN status = 'bounced' THEN 1 END) as bounced,
COUNT(CASE WHEN status = 'failed' THEN 1 END) as failed,
COUNT(CASE WHEN opened = true THEN 1 END) as opened,
COUNT(CASE WHEN clicked = true THEN 1 END) as clicked,
COUNT(CASE WHEN unsubscribed = true THEN 1 END) as unsubscribed,
AVG(response_time) as avg_response_time
FROM campaign_activities
WHERE campaign_id = $1
AND created_at BETWEEN $2 AND $3
`;
const result = await this.pool.query(query, [
campaignId,
dateRange.start,
dateRange.end
]);
const metrics = result.rows[0];
// Cache for 5 minutes
await this.redis.setex(cacheKey, 300, JSON.stringify(metrics));
return metrics;
}
async getAgentPerformance(agentId, dateRange) {
const query = `
SELECT
COUNT(*) as total_sessions,
AVG(session_duration) as avg_session_duration,
AVG(actions_performed) as avg_actions,
AVG(typing_speed_wpm) as avg_typing_speed,
AVG(error_rate) as avg_error_rate,
COUNT(CASE WHEN session_completed = true THEN 1 END) as completed_sessions
FROM activity_sessions
WHERE agent_id = $1
AND session_start BETWEEN $2 AND $3
`;
const result = await this.pool.query(query, [
agentId,
dateRange.start,
dateRange.end
]);
return result.rows[0];
}
}
Reporting SystemΒΆ
class ReportingSystem {
constructor(databaseManager, analyticsEngine) {
this.db = databaseManager;
this.analytics = analyticsEngine;
}
async generateCampaignReport(campaignId, options = {}) {
const {
format = 'pdf',
includeCharts = true,
dateRange = null
} = options;
// 1. Get campaign data
const campaign = await this.db.getCampaign(campaignId);
const metrics = await this.analytics.calculateCampaignMetrics(campaignId);
// 2. Generate report data
const reportData = {
campaign: campaign,
metrics: metrics,
dateRange: dateRange || {
start: campaign.created_at,
end: new Date()
},
generatedAt: new Date()
};
// 3. Generate report based on format
switch (format) {
case 'pdf':
return await this.generatePDFReport(reportData, includeCharts);
case 'excel':
return await this.generateExcelReport(reportData);
case 'csv':
return await this.generateCSVReport(reportData);
default:
throw new Error(`Unsupported format: ${format}`);
}
}
async generatePDFReport(data, includeCharts) {
const pdf = new PDFDocument();
const stream = new PassThrough();
pdf.pipe(stream);
// Header
pdf.fontSize(20).text('Campaign Report', 50, 50);
pdf.fontSize(12).text(`Campaign: ${data.campaign.name}`, 50, 80);
pdf.text(`Generated: ${data.generatedAt.toLocaleDateString()}`, 50, 100);
// Metrics section
pdf.fontSize(16).text('Performance Metrics', 50, 130);
const metrics = data.metrics;
let y = 160;
pdf.fontSize(12);
pdf.text(`Total Sent: ${metrics.delivery.total}`, 50, y);
y += 20;
pdf.text(`Delivery Rate: ${metrics.delivery.deliveryRate.toFixed(2)}%`, 50, y);
y += 20;
pdf.text(`Open Rate: ${metrics.engagement.openRate.toFixed(2)}%`, 50, y);
y += 20;
pdf.text(`Click Rate: ${metrics.engagement.clickRate.toFixed(2)}%`, 50, y);
y += 20;
pdf.text(`Bounce Rate: ${metrics.delivery.bounceRate.toFixed(2)}%`, 50, y);
// Charts (if requested)
if (includeCharts) {
y += 40;
pdf.text('Performance Charts', 50, y);
// Add chart generation logic here
}
pdf.end();
return stream;
}
async generateExcelReport(data) {
const workbook = new ExcelJS.Workbook();
const worksheet = workbook.addWorksheet('Campaign Report');
// Add headers
worksheet.addRow(['Metric', 'Value']);
// Add metrics
const metrics = data.metrics;
worksheet.addRow(['Total Sent', metrics.delivery.total]);
worksheet.addRow(['Delivery Rate', `${metrics.delivery.deliveryRate.toFixed(2)}%`]);
worksheet.addRow(['Open Rate', `${metrics.engagement.openRate.toFixed(2)}%`]);
worksheet.addRow(['Click Rate', `${metrics.engagement.clickRate.toFixed(2)}%`]);
worksheet.addRow(['Bounce Rate', `${metrics.delivery.bounceRate.toFixed(2)}%`]);
// Add charts
const chart = worksheet.addChart({
type: 'column',
name: 'Campaign Metrics',
categories: ['Delivery Rate', 'Open Rate', 'Click Rate'],
values: [
metrics.delivery.deliveryRate,
metrics.engagement.openRate,
metrics.engagement.clickRate
]
});
chart.setPosition('E2', 'H15');
return workbook;
}
async scheduleReport(reportConfig) {
const {
campaignId,
frequency,
format,
recipients,
time
} = reportConfig;
const schedule = {
id: generateId(),
campaignId,
frequency,
format,
recipients,
time,
nextRun: this.calculateNextRun(frequency, time),
createdAt: new Date()
};
await this.db.storeScheduledReport(schedule);
// Schedule in job queue
await this.scheduleJob(schedule);
return schedule;
}
}
π Database SchemaΒΆ
Core TablesΒΆ
-- Contacts table
CREATE TABLE contacts (
id BIGSERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
first_name VARCHAR(100),
last_name VARCHAR(100),
company VARCHAR(200),
position VARCHAR(200),
linkedin_url VARCHAR(500),
phone VARCHAR(50),
location VARCHAR(200),
industry VARCHAR(100),
source VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Campaigns table
CREATE TABLE campaigns (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(200) NOT NULL,
description TEXT,
status VARCHAR(50) DEFAULT 'draft',
target_audience JSONB,
content JSONB,
scheduling JSONB,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Campaign activities table
CREATE TABLE campaign_activities (
id BIGSERIAL PRIMARY KEY,
campaign_id BIGINT REFERENCES campaigns(id),
contact_id BIGINT REFERENCES contacts(id),
agent_id BIGINT,
status VARCHAR(50),
sent_at TIMESTAMP,
opened_at TIMESTAMP,
clicked_at TIMESTAMP,
bounced_at TIMESTAMP,
unsubscribed_at TIMESTAMP,
response_time INTEGER,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Analytics cache table
CREATE TABLE analytics_cache (
id BIGSERIAL PRIMARY KEY,
cache_key VARCHAR(255) UNIQUE NOT NULL,
data JSONB NOT NULL,
expires_at TIMESTAMP NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
IndexesΒΆ
-- Performance indexes
CREATE INDEX idx_campaign_activities_campaign_id ON campaign_activities(campaign_id);
CREATE INDEX idx_campaign_activities_contact_id ON campaign_activities(contact_id);
CREATE INDEX idx_campaign_activities_sent_at ON campaign_activities(sent_at);
CREATE INDEX idx_campaign_activities_status ON campaign_activities(status);
CREATE INDEX idx_contacts_email ON contacts(email);
CREATE INDEX idx_contacts_company ON contacts(company);
CREATE INDEX idx_analytics_cache_key ON analytics_cache(cache_key);
CREATE INDEX idx_analytics_cache_expires ON analytics_cache(expires_at);
π Analytics & MetricsΒΆ
Campaign MetricsΒΆ
- Delivery Metrics: Total sent, delivered, bounced, failed
- Engagement Metrics: Open rate, click rate, unsubscribe rate
- Performance Metrics: Response time, error rate, success rate
- Trend Analysis: Historical performance trends
- Comparative Analysis: Campaign performance comparison
Agent MetricsΒΆ
- Session Metrics: Total sessions, completion rate, duration
- Performance Metrics: Typing speed, error rate, accuracy
- Behavioral Metrics: Activity patterns, engagement levels
- Evolution Metrics: Learning and adaptation over time
System MetricsΒΆ
- Database Performance: Query performance, connection usage
- API Performance: Response times, error rates
- Resource Usage: CPU, memory, storage utilization
- Scalability Metrics: Performance under load
π§ ConfigurationΒΆ
Database ConfigurationΒΆ
{
"database": {
"host": "localhost",
"port": 5432,
"database": "email_marketing",
"username": "postgres",
"password": "password",
"pool": {
"min": 5,
"max": 20,
"idleTimeoutMillis": 30000
}
}
}
Analytics ConfigurationΒΆ
{
"analytics": {
"cache": {
"enabled": true,
"ttl": 300,
"maxSize": 1000
},
"reports": {
"defaultFormat": "pdf",
"includeCharts": true,
"scheduledReports": true
},
"metrics": {
"realTime": true,
"historical": true,
"trends": true
}
}
}
π Security & ComplianceΒΆ
Data ProtectionΒΆ
- Encryption: Sensitive data encryption at rest and in transit
- Access Control: Role-based database access
- Audit Logging: Comprehensive data access logging
- Data Retention: Configurable retention policies
Privacy ComplianceΒΆ
- GDPR Compliance: Data protection and privacy controls
- Data Minimization: Only necessary data collection
- Right to Deletion: Contact data deletion capabilities
- Consent Management: Data usage consent tracking
π DeploymentΒΆ
Infrastructure RequirementsΒΆ
- Database: PostgreSQL cluster with read replicas
- Cache: Redis cluster for analytics caching
- Storage: Large storage for historical data
- Compute: High-performance servers for analytics
Scaling StrategyΒΆ
- Database Scaling: Read replicas and connection pooling
- Cache Scaling: Redis cluster with sharding
- Analytics Scaling: Distributed analytics processing
- Storage Scaling: Data partitioning and archiving