Architecture of AI-Powered Marketing Automation
When people ask "how does Cogny work?", they usually want to know about the AI.
But the AI is maybe 20% of the system.
The real architecture challenge is everything around the AI: how you ingest data, manage context, stream responses, handle errors, control costs, ensure security, and scale to thousands of concurrent users.
After six months of building Cogny in production, we've learned what works, what doesn't, and where the complexity really lives. Here's the complete technical architecture, including the parts that aren't sexy but are essential.
The High-Level Picture
Before diving into details, here's the 30,000-foot view:
User → Frontend (React) → API (Next.js) → AI Agent (Claude) → Data Warehouse (BigQuery) → Results → User
Simple, right?
Here's what that actually looks like in production:
User
↓
Web App (React + TypeScript)
↓
API Layer (Next.js API Routes)
↓
Authentication (Supabase Auth)
↓
Session Management (Redis)
↓
Request Queue (for rate limiting)
↓
AI Orchestration Layer
├→ Context Builder (assembles schema, history, etc.)
├→ Prompt Engineering (system prompts + few-shot examples)
├→ Claude API (via Anthropic SDK)
├→ Tool Execution Engine
│ ├→ BigQuery Client
│ ├→ Query Validator
│ ├→ Cost Estimator
│ ├→ Cache Layer (Redis)
│ └→ Result Processor
└→ Response Stream (SSE)
↓
Monitoring & Logging
├→ Error tracking (Sentry)
├→ Metrics (Prometheus)
├→ Query logs (BigQuery)
└→ Cost tracking
Let's break down each layer.
Layer 1: Frontend Architecture
The frontend is a React/TypeScript application built with Vite. Nothing revolutionary here, but some interesting choices:
Component Structure
src/
├── components/
│ ├── chat/ # Chat interface components
│ ├── warehouse/ # Data warehouse management
│ └── ui/ # shadcn/ui components
├── lib/
│ ├── api/ # API client functions
│ ├── hooks/ # Custom React hooks
│ └── utils/ # Utility functions
└── app/ # Next.js pages
Real-Time Chat Interface
The chat is the core UI. Key requirements:
- Stream AI responses token-by-token
- Show tool execution progress
- Display data tables and visualizations
- Handle long conversations efficiently
Implementation:
We use Server-Sent Events (SSE) for streaming:
// Frontend
const eventSource = new EventSource(`/api/chat/stream?threadId=${threadId}`);
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
switch (data.type) {
case 'token':
// Append token to current message
appendToken(data.token);
break;
case 'tool_use':
// Show tool execution
showToolExecution(data.tool, data.input);
break;
case 'tool_result':
// Display tool result
showToolResult(data.result);
break;
case 'error':
// Handle error
showError(data.error);
break;
case 'done':
// Close stream
eventSource.close();
break;
}
};
Challenge: Handling disconnections and retries.
Solution: Implement exponential backoff retry with state recovery:
let retryCount = 0;
const maxRetries = 3;
function connectStream() {
const eventSource = new EventSource(url);
eventSource.onerror = () => {
eventSource.close();
if (retryCount < maxRetries) {
const delay = Math.pow(2, retryCount) * 1000;
setTimeout(() => {
retryCount++;
connectStream();
}, delay);
} else {
showError("Connection lost. Please refresh.");
}
};
// Reset retry count on successful connection
eventSource.onopen = () => {
retryCount = 0;
};
}
State Management
We use a combination of:
- React Context for global state (user, auth)
- React Query for server state (API data)
- Local state for UI state (modals, forms)
Why not Redux/Zustand? For our use case, React Query handles most state management needs. It provides caching, refetching, and optimistic updates out of the box.
Performance Optimizations
1. Virtual scrolling for long chat histories (react-window)
2. Lazy loading of warehouse schemas (only load when needed)
3. Code splitting by route
4. Memoization of expensive computations
const processedMessages = useMemo(() => {
return messages.map(msg => ({
...msg,
formattedTime: formatTime(msg.timestamp),
parsedContent: parseMarkdown(msg.content)
}));
}, [messages]);
Layer 2: API Layer
The API is built with Next.js API routes (App Router). This gives us:
- Serverless deployment (Vercel/GKE)
- Edge runtime for some routes
- Built-in middleware
- TypeScript support
Route Structure
app/api/
├── auth/ # Authentication endpoints
├── chat/
│ ├── stream/ # SSE streaming endpoint
│ └── history/ # Chat history
├── warehouse/
│ ├── connect/ # BigQuery connection
│ ├── schema/ # Schema introspection
│ └── query/ # Direct query execution
└── admin/ # Admin endpoints
Authentication Flow
We use Supabase Auth with Row Level Security (RLS):
// Middleware
export async function middleware(req: NextRequest) {
const supabase = createMiddlewareClient({ req, res });
// Refresh session if needed
const { data: { session } } = await supabase.auth.getSession();
if (!session && !isPublicRoute(req.url)) {
return NextResponse.redirect('/login');
}
return NextResponse.next();
}
Why Supabase? Handles OAuth, magic links, and JWT management. RLS ensures users only see their data.
Rate Limiting
We implement rate limiting at multiple levels:
1. Per-user API limits:
const rateLimiter = new RateLimiter({
redis: redisClient,
limits: {
'/api/chat': { max: 60, window: '1m' },
'/api/warehouse/query': { max: 100, window: '1h' }
}
});
// In API route
const allowed = await rateLimiter.check(userId, route);
if (!allowed) {
return new Response('Rate limit exceeded', { status: 429 });
}
2. Query cost limits:
// Track spending per user
const dailySpend = await getCostFromCache(userId, 'daily');
if (dailySpend > userCostLimit) {
return new Response('Daily cost limit exceeded', { status: 429 });
}
3. Concurrent request limits:
// Max 5 concurrent queries per user
const activeQueries = await getActiveQueryCount(userId);
if (activeQueries >= 5) {
return new Response('Too many concurrent requests', { status: 429 });
}
Layer 3: AI Orchestration
This is where it gets interesting. The AI orchestration layer is responsible for:
- Building context for Claude
- Managing conversation state
- Executing tools
- Streaming responses
- Error handling
Context Assembly
For each user message, we build a context that includes:
interface Context {
conversation_history: Message[]; // Last N messages
warehouse_schema: Schema; // Relevant tables/columns
user_metadata: UserProfile; // User preferences, settings
performance_hints: OptimizationTips; // Query optimization guidance
example_queries: QueryExamples; // Few-shot examples
}
Challenge: Keeping context under token limits while including enough information.
Solution: Progressive context pruning:
async function buildContext(userId: string, threadId: string): Promise<Context> {
// Start with full context
let context = await getFullContext(userId, threadId);
// If over token limit, prune
while (estimateTokens(context) > MAX_CONTEXT_TOKENS) {
if (context.conversation_history.length > 10) {
// Summarize old messages
context.conversation_history = await summarizeHistory(
context.conversation_history.slice(0, -10)
).concat(context.conversation_history.slice(-10));
} else if (context.warehouse_schema.tables.length > 20) {
// Prune schema to most relevant tables
context.warehouse_schema = await pruneSchema(
context.warehouse_schema,
context.conversation_history
);
} else {
// Remove example queries
context.example_queries = context.example_queries.slice(0, 5);
}
}
return context;
}
Prompt Engineering
We use a multi-part system prompt:
1. Role definition:
You are a marketing analytics expert helping users understand their data.
You have access to their BigQuery warehouse and can query it to answer questions.
2. Behavioral guidelines:
- Always explain your reasoning
- Acknowledge uncertainty when appropriate
- Prioritize actionable insights over raw data
- Use business terminology, not technical jargon
3. Tool usage guidelines:
Available tools:
- execute_query: Run SQL against BigQuery
- get_schema: Retrieve table structure
- list_tables: Show available tables
When executing queries:
1. Estimate cost before running expensive queries
2. Use partition filters for time-based queries
3. Limit results to what's needed
4. Output format:
Structure your responses:
1. Brief answer to the question
2. Supporting data/evidence
3. Insights and recommendations
4. Suggested next steps
5. Few-shot examples:
Example conversation:
User: "Why did our ROAS drop?"
Assistant: I'll analyze your campaign performance to identify the cause.
[calls execute_query with SQL to compare recent vs historical performance]
I found that your ROAS dropped 15% in the last 7 days. Here's what's happening:
...
Tool Execution Engine
Claude can call tools (functions) to interact with BigQuery. Here's how that works:
// Tool definitions
const tools = [
{
name: 'execute_query',
description: 'Execute a SQL query against the user\'s BigQuery warehouse',
input_schema: {
type: 'object',
properties: {
sql: { type: 'string', description: 'The SQL query to execute' },
reason: { type: 'string', description: 'Why you\'re running this query' }
},
required: ['sql', 'reason']
}
},
// ... more tools
];
// Tool execution
async function executeTool(toolName: string, input: any, context: Context) {
switch (toolName) {
case 'execute_query':
return await executeQuery(input.sql, context);
case 'get_schema':
return await getSchema(input.table_name, context);
// ... more tools
}
}
Important: Tool execution happens inside the streaming loop:
async function* streamResponse(userMessage: string, context: Context) {
const stream = await anthropic.messages.stream({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 4096,
messages: [{ role: 'user', content: userMessage }],
system: systemPrompt,
tools: tools
});
for await (const event of stream) {
if (event.type === 'content_block_delta') {
// Stream text token
yield { type: 'token', token: event.delta.text };
} else if (event.type === 'tool_use') {
// Execute tool
yield { type: 'tool_use', tool: event.name, input: event.input };
const result = await executeTool(event.name, event.input, context);
yield { type: 'tool_result', result };
// Continue conversation with tool result
// (Claude may call more tools or respond to user)
}
}
yield { type: 'done' };
}
Query Execution Pipeline
When Claude calls execute_query, we don't just run the SQL. We run it through a pipeline:
async function executeQuery(sql: string, context: Context) {
// 1. Validate SQL syntax
const validation = await validateSQL(sql);
if (!validation.valid) {
return { error: `Invalid SQL: ${validation.error}` };
}
// 2. Estimate cost
const cost = await estimateQueryCost(sql, context.warehouse);
if (cost > context.user.cost_limit) {
return {
error: `Query would cost $${cost.toFixed(2)}, exceeding your limit of $${context.user.cost_limit}. Consider narrowing the query.`
};
}
// 3. Check cache
const cacheKey = hashQuery(sql);
const cached = await getFromCache(cacheKey);
if (cached && !isStale(cached, context.freshness_requirements)) {
return { data: cached.results, cached: true };
}
// 4. Optimize query
const optimized = await optimizeQuery(sql, context.warehouse.schema);
// 5. Execute with timeout
const results = await executeWithTimeout(optimized, context.timeout || 30000);
// 6. Cache results
await cacheResults(cacheKey, results);
// 7. Track cost
await trackQueryCost(context.userId, cost);
// 8. Return results
return { data: results, cost };
}
Error Handling
We handle errors at multiple levels:
1. Query errors:
try {
const results = await bigquery.query(sql);
return results;
} catch (error) {
// Translate BigQuery error to user-friendly message
const translated = translateBigQueryError(error);
// Ask Claude to fix if possible
if (translated.can_auto_fix) {
const fixed = await askClaudeToFix(sql, translated.error);
return await bigquery.query(fixed);
}
return { error: translated.message };
}
2. API errors:
try {
const response = await anthropic.messages.create(...);
return response;
} catch (error) {
if (error.status === 429) {
// Rate limited - retry with backoff
await sleep(exponentialBackoff(retryCount));
return retry();
} else if (error.status === 500) {
// Anthropic outage - fallback message
return { error: 'AI service temporarily unavailable' };
}
throw error;
}
3. Timeout handling:
async function executeWithTimeout(query: string, timeout: number) {
const timeoutPromise = new Promise((_, reject) =>
setTimeout(() => reject(new Error('Query timeout')), timeout)
);
const queryPromise = bigquery.query(query);
try {
return await Promise.race([queryPromise, timeoutPromise]);
} catch (error) {
if (error.message === 'Query timeout') {
// Offer to run async
return {
error: 'Query is taking longer than expected',
suggestion: 'Run this as a background job?',
can_async: true
};
}
throw error;
}
}
Layer 4: Data Layer
BigQuery Integration
We use the official BigQuery client library:
import { BigQuery } from '@google-cloud/bigquery';
const bigquery = new BigQuery({
projectId: process.env.GCP_PROJECT_ID,
keyFilename: process.env.GCP_KEY_FILE
});
Per-user credentials:
Each user brings their own BigQuery credentials. We don't have access to their data.
// User provides service account JSON
const userBigQuery = new BigQuery({
projectId: user.gcp_project_id,
credentials: user.gcp_credentials // stored encrypted
});
Security: Credentials are encrypted at rest (AES-256) and decrypted only during query execution.
Schema Introspection
We need to understand the user's schema to generate good queries:
async function introspectSchema(projectId: string, datasetId: string) {
// Get all tables
const [tables] = await bigquery
.dataset(datasetId)
.getTables();
// Get metadata for each table
const schema = await Promise.all(
tables.map(async (table) => {
const [metadata] = await table.getMetadata();
return {
name: table.id,
schema: metadata.schema.fields,
partitioning: metadata.timePartitioning,
clustering: metadata.clustering,
description: metadata.description,
num_rows: metadata.numRows,
size_bytes: metadata.numBytes
};
})
);
return schema;
}
Optimization: We cache schema for 24 hours and invalidate on detected changes.
Caching Strategy
We use Redis for multiple caching layers:
1. Query result cache:
// Key: hash(sql + parameters)
// Value: { results, timestamp, cost }
// TTL: based on data freshness requirements
2. Schema cache:
// Key: `schema:${projectId}:${datasetId}`
// Value: full schema metadata
// TTL: 24 hours
3. Session cache:
// Key: `session:${userId}:${threadId}`
// Value: conversation context
// TTL: 24 hours
Cache invalidation:
// On schema change
await redis.del(`schema:${projectId}:${datasetId}`);
// On new data
await redis.del(`query:${hashQuery(sql)}`);
Cost Tracking
We track query costs in BigQuery itself:
// Log every query
await bigquery.dataset('cogny_internal').table('query_logs').insert([{
user_id: userId,
query_hash: hashQuery(sql),
bytes_processed: results.totalBytesProcessed,
cost: (results.totalBytesProcessed / 1e12) * 5, // $5 per TB
duration_ms: results.jobStatistics.totalElapsedTime,
timestamp: new Date()
}]);
This lets us:
- Show users their query costs
- Aggregate daily/monthly spending
- Identify expensive queries
- Optimize common patterns
Layer 5: Deployment & Infrastructure
Architecture
We run on Google Kubernetes Engine (GKE):
Load Balancer (GCP)
↓
Ingress (Nginx)
↓
Frontend Service (2 replicas)
↓
API Service (3 replicas)
↓
Background Workers (2 replicas)
Container Structure
# Frontend
FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY . .
RUN npm run build
CMD ["npm", "start"]
# API
FROM node:20-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci
COPY . .
RUN npm run build
CMD ["node", "dist/server.js"]
Configuration
We use environment variables for configuration:
# Core
NODE_ENV=production
API_URL=https://api.cogny.com
# Database
SUPABASE_URL=...
SUPABASE_ANON_KEY=...
SUPABASE_SERVICE_KEY=...
# AI
ANTHROPIC_API_KEY=...
# BigQuery
GCP_PROJECT_ID=...
GCP_CREDENTIALS=...
# Caching
REDIS_URL=...
# Monitoring
SENTRY_DSN=...
Scaling Strategy
Horizontal scaling:
- Frontend: Scale based on HTTP requests
- API: Scale based on CPU utilization
- Workers: Scale based on queue depth
# Kubernetes HPA
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: api-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: api
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Vertical scaling:
- BigQuery: Auto-scales (managed service)
- Redis: Upgrade instance size as needed
Monitoring
1. Application metrics:
- Request latency (p50, p95, p99)
- Error rates by endpoint
- Active users
- Query execution time
- AI response time
- Cache hit rates
2. Infrastructure metrics:
- CPU/memory utilization
- Network throughput
- Disk I/O
- Pod health
3. Business metrics:
- Daily active users
- Queries per user
- Average query cost
- Conversation length
- User satisfaction (NPS)
Stack:
- Metrics: Prometheus + Grafana
- Errors: Sentry
- Logs: GCP Cloud Logging
- Uptime: UptimeRobot
Incident Response
We use PagerDuty for on-call alerting:
P0 (immediate response):
- API down
- Database unavailable
- Critical error rate > 5%
P1 (1 hour response):
- Elevated error rate (1-5%)
- Query latency > 30s p95
- Cache failure
P2 (next business day):
- Non-critical bugs
- Performance degradation
- Feature requests
Layer 6: Security
Data Security
1. Encryption:
- Data in transit: TLS 1.3
- Data at rest: AES-256
- User credentials: Encrypted in database
2. Access control:
- Row Level Security (RLS) in Supabase
- Service account with minimal permissions
- User credentials scoped to their project
3. Audit logging:
// Log all data access
await auditLog.create({
user_id: userId,
action: 'query_executed',
resource: `bigquery:${projectId}:${datasetId}:${table}`,
ip_address: req.ip,
user_agent: req.headers['user-agent'],
timestamp: new Date()
});
API Security
1. Authentication:
- JWT tokens (Supabase)
- Refresh token rotation
- Session timeout (24 hours)
2. Authorization:
- Role-based access control (RBAC)
- Resource ownership validation
- Scope-limited API keys
3. Input validation:
// Validate all inputs
const schema = z.object({
query: z.string().max(10000),
projectId: z.string().regex(/^[a-z0-9-]+$/),
datasetId: z.string().regex(/^[a-zA-Z0-9_]+$/)
});
const validated = schema.parse(req.body);
4. SQL injection prevention:
// Use parameterized queries
const query = `
SELECT * FROM \`${projectId}.${datasetId}.${table}\`
WHERE date = @date
`;
await bigquery.query({
query,
params: { date: userInput }
});
Rate Limiting
Multi-level rate limiting:
- Per-user API limits
- Per-IP limits (prevent abuse)
- Query cost limits
- Concurrent request limits
What I'd Do Differently
After six months of running this in production, here's what I'd change:
1. Build Observability First
We added monitoring after launch. Big mistake. Should have been there from day one.
Next time:
- Instrument all code paths
- Set up dashboards before shipping
- Define SLOs upfront
- Build alerting early
2. Design for Multi-Model from the Start
We built for Claude specifically. Adding support for other models (GPT-4, local models) is now a major refactor.
Next time:
- Abstract model interface
- Make prompts model-agnostic
- Build model selection logic early
3. Invest More in Testing Infrastructure
We have decent test coverage, but not enough integration tests for the full pipeline.
Next time:
- Build end-to-end test suite early
- Automate performance testing
- Test error scenarios thoroughly
- Load test before launch
4. Plan for Data Migrations
Our database schema has evolved. Migrations are painful.
Next time:
- Design for schema evolution
- Build migration tooling early
- Version APIs from the start
5. Cost Optimization Earlier
We optimized for functionality, then performance, then cost. Cost should have come earlier.
Next time:
- Build cost tracking from day one
- Set budgets per feature
- Optimize expensive operations early
The Honest Complexity Assessment
Building production AI systems is harder than most people think.
The AI itself is maybe 20% of the work:
- Data infrastructure: 30%
- Error handling: 20%
- Security: 15%
- Monitoring: 10%
- Optimization: 5%
Total engineering time to launch: ~6 months with 2 engineers.
Ongoing maintenance: ~20% of engineering time.
It's doable, but don't underestimate the operational complexity of running AI in production.
Final Thoughts
This architecture has served us well. We're processing thousands of queries daily with:
- p95 response time: 8 seconds
- Uptime: 99.9%
- Average query cost: $0.18
- User satisfaction: 4.7/5
But it's still evolving. Every week we find new optimizations, fix new edge cases, and improve reliability.
The biggest lesson: AI systems are fundamentally different from traditional software. They're probabilistic, context-dependent, and require different engineering approaches.
But when you get it right, the capabilities are transformative.
---
About Berner Setterwall
Berner is CTO and co-founder of Cogny, where he's built the technical architecture for AI-powered marketing automation. Previously, he was a senior engineer at Campanja, building systems for Netflix, Zalando, and other major brands. He specializes in AI architecture, distributed systems, and making complex technology reliable at scale.
Want to see this architecture in action?
Cogny brings AI-powered marketing analytics to your BigQuery warehouse. We've built the infrastructure so you don't have to. Book a demo to see how we turn complex architecture into simple, powerful user experiences.