Back to Blog

    Architecture of AI-Powered Marketing Automation

    berner-setterwallJanuary 21, 2025

    When people ask "how does Cogny work?", they usually want to know about the AI.

    But the AI is maybe 20% of the system.

    The real architecture challenge is everything around the AI: how you ingest data, manage context, stream responses, handle errors, control costs, ensure security, and scale to thousands of concurrent users.

    After six months of building Cogny in production, we've learned what works, what doesn't, and where the complexity really lives. Here's the complete technical architecture, including the parts that aren't sexy but are essential.

    The High-Level Picture

    Before diving into details, here's the 30,000-foot view:

    User → Frontend (React) → API (Next.js) → AI Agent (Claude) → Data Warehouse (BigQuery) → Results → User

    Simple, right?

    Here's what that actually looks like in production:

    User

    Web App (React + TypeScript)

    API Layer (Next.js API Routes)

    Authentication (Supabase Auth)

    Session Management (Redis)

    Request Queue (for rate limiting)

    AI Orchestration Layer
    ├→ Context Builder (assembles schema, history, etc.)
    ├→ Prompt Engineering (system prompts + few-shot examples)
    ├→ Claude API (via Anthropic SDK)
    ├→ Tool Execution Engine
    │ ├→ BigQuery Client
    │ ├→ Query Validator
    │ ├→ Cost Estimator
    │ ├→ Cache Layer (Redis)
    │ └→ Result Processor
    └→ Response Stream (SSE)

    Monitoring & Logging
    ├→ Error tracking (Sentry)
    ├→ Metrics (Prometheus)
    ├→ Query logs (BigQuery)
    └→ Cost tracking

    Let's break down each layer.

    Layer 1: Frontend Architecture

    The frontend is a React/TypeScript application built with Vite. Nothing revolutionary here, but some interesting choices:

    Component Structure

    src/
    ├── components/
    │ ├── chat/ # Chat interface components
    │ ├── warehouse/ # Data warehouse management
    │ └── ui/ # shadcn/ui components
    ├── lib/
    │ ├── api/ # API client functions
    │ ├── hooks/ # Custom React hooks
    │ └── utils/ # Utility functions
    └── app/ # Next.js pages

    Real-Time Chat Interface

    The chat is the core UI. Key requirements:

    • Stream AI responses token-by-token
    • Show tool execution progress
    • Display data tables and visualizations
    • Handle long conversations efficiently

    Implementation:

    We use Server-Sent Events (SSE) for streaming:

    // Frontend
    const eventSource = new EventSource(`/api/chat/stream?threadId=${threadId}`);

    eventSource.onmessage = (event) => {
    const data = JSON.parse(event.data);

    switch (data.type) {
    case 'token':
    // Append token to current message
    appendToken(data.token);
    break;

    case 'tool_use':
    // Show tool execution
    showToolExecution(data.tool, data.input);
    break;

    case 'tool_result':
    // Display tool result
    showToolResult(data.result);
    break;

    case 'error':
    // Handle error
    showError(data.error);
    break;

    case 'done':
    // Close stream
    eventSource.close();
    break;
    }
    };

    Challenge: Handling disconnections and retries.

    Solution: Implement exponential backoff retry with state recovery:

    let retryCount = 0;
    const maxRetries = 3;

    function connectStream() {
    const eventSource = new EventSource(url);

    eventSource.onerror = () => {
    eventSource.close();

    if (retryCount < maxRetries) {
    const delay = Math.pow(2, retryCount) * 1000;
    setTimeout(() => {
    retryCount++;
    connectStream();
    }, delay);
    } else {
    showError("Connection lost. Please refresh.");
    }
    };

    // Reset retry count on successful connection
    eventSource.onopen = () => {
    retryCount = 0;
    };
    }

    State Management

    We use a combination of:

    • React Context for global state (user, auth)
    • React Query for server state (API data)
    • Local state for UI state (modals, forms)

    Why not Redux/Zustand? For our use case, React Query handles most state management needs. It provides caching, refetching, and optimistic updates out of the box.

    Performance Optimizations

    1. Virtual scrolling for long chat histories (react-window)

    2. Lazy loading of warehouse schemas (only load when needed)

    3. Code splitting by route

    4. Memoization of expensive computations

    const processedMessages = useMemo(() => {
    return messages.map(msg => ({
    ...msg,
    formattedTime: formatTime(msg.timestamp),
    parsedContent: parseMarkdown(msg.content)
    }));
    }, [messages]);

    Layer 2: API Layer

    The API is built with Next.js API routes (App Router). This gives us:

    • Serverless deployment (Vercel/GKE)
    • Edge runtime for some routes
    • Built-in middleware
    • TypeScript support

    Route Structure

    app/api/
    ├── auth/ # Authentication endpoints
    ├── chat/
    │ ├── stream/ # SSE streaming endpoint
    │ └── history/ # Chat history
    ├── warehouse/
    │ ├── connect/ # BigQuery connection
    │ ├── schema/ # Schema introspection
    │ └── query/ # Direct query execution
    └── admin/ # Admin endpoints

    Authentication Flow

    We use Supabase Auth with Row Level Security (RLS):

    // Middleware
    export async function middleware(req: NextRequest) {
    const supabase = createMiddlewareClient({ req, res });

    // Refresh session if needed
    const { data: { session } } = await supabase.auth.getSession();

    if (!session && !isPublicRoute(req.url)) {
    return NextResponse.redirect('/login');
    }

    return NextResponse.next();
    }

    Why Supabase? Handles OAuth, magic links, and JWT management. RLS ensures users only see their data.

    Rate Limiting

    We implement rate limiting at multiple levels:

    1. Per-user API limits:

    const rateLimiter = new RateLimiter({
    redis: redisClient,
    limits: {
    '/api/chat': { max: 60, window: '1m' },
    '/api/warehouse/query': { max: 100, window: '1h' }
    }
    });

    // In API route
    const allowed = await rateLimiter.check(userId, route);
    if (!allowed) {
    return new Response('Rate limit exceeded', { status: 429 });
    }

    2. Query cost limits:

    // Track spending per user
    const dailySpend = await getCostFromCache(userId, 'daily');
    if (dailySpend > userCostLimit) {
    return new Response('Daily cost limit exceeded', { status: 429 });
    }

    3. Concurrent request limits:

    // Max 5 concurrent queries per user
    const activeQueries = await getActiveQueryCount(userId);
    if (activeQueries >= 5) {
    return new Response('Too many concurrent requests', { status: 429 });
    }

    Layer 3: AI Orchestration

    This is where it gets interesting. The AI orchestration layer is responsible for:

    • Building context for Claude
    • Managing conversation state
    • Executing tools
    • Streaming responses
    • Error handling

    Context Assembly

    For each user message, we build a context that includes:

    interface Context {
    conversation_history: Message[]; // Last N messages
    warehouse_schema: Schema; // Relevant tables/columns
    user_metadata: UserProfile; // User preferences, settings
    performance_hints: OptimizationTips; // Query optimization guidance
    example_queries: QueryExamples; // Few-shot examples
    }

    Challenge: Keeping context under token limits while including enough information.

    Solution: Progressive context pruning:

    async function buildContext(userId: string, threadId: string): Promise<Context> {
    // Start with full context
    let context = await getFullContext(userId, threadId);

    // If over token limit, prune
    while (estimateTokens(context) > MAX_CONTEXT_TOKENS) {
    if (context.conversation_history.length > 10) {
    // Summarize old messages
    context.conversation_history = await summarizeHistory(
    context.conversation_history.slice(0, -10)
    ).concat(context.conversation_history.slice(-10));
    } else if (context.warehouse_schema.tables.length > 20) {
    // Prune schema to most relevant tables
    context.warehouse_schema = await pruneSchema(
    context.warehouse_schema,
    context.conversation_history
    );
    } else {
    // Remove example queries
    context.example_queries = context.example_queries.slice(0, 5);
    }
    }

    return context;
    }

    Prompt Engineering

    We use a multi-part system prompt:

    1. Role definition:

    You are a marketing analytics expert helping users understand their data.
    You have access to their BigQuery warehouse and can query it to answer questions.

    2. Behavioral guidelines:

    - Always explain your reasoning
    - Acknowledge uncertainty when appropriate
    - Prioritize actionable insights over raw data
    - Use business terminology, not technical jargon

    3. Tool usage guidelines:

    Available tools:
    - execute_query: Run SQL against BigQuery
    - get_schema: Retrieve table structure
    - list_tables: Show available tables

    When executing queries:
    1. Estimate cost before running expensive queries
    2. Use partition filters for time-based queries
    3. Limit results to what's needed

    4. Output format:

    Structure your responses:
    1. Brief answer to the question
    2. Supporting data/evidence
    3. Insights and recommendations
    4. Suggested next steps

    5. Few-shot examples:

    Example conversation:
    User: "Why did our ROAS drop?"
    Assistant: I'll analyze your campaign performance to identify the cause.

    [calls execute_query with SQL to compare recent vs historical performance]

    I found that your ROAS dropped 15% in the last 7 days. Here's what's happening:
    ...

    Tool Execution Engine

    Claude can call tools (functions) to interact with BigQuery. Here's how that works:

    // Tool definitions
    const tools = [
    {
    name: 'execute_query',
    description: 'Execute a SQL query against the user\'s BigQuery warehouse',
    input_schema: {
    type: 'object',
    properties: {
    sql: { type: 'string', description: 'The SQL query to execute' },
    reason: { type: 'string', description: 'Why you\'re running this query' }
    },
    required: ['sql', 'reason']
    }
    },
    // ... more tools
    ];

    // Tool execution
    async function executeTool(toolName: string, input: any, context: Context) {
    switch (toolName) {
    case 'execute_query':
    return await executeQuery(input.sql, context);

    case 'get_schema':
    return await getSchema(input.table_name, context);

    // ... more tools
    }
    }

    Important: Tool execution happens inside the streaming loop:

    async function* streamResponse(userMessage: string, context: Context) {
    const stream = await anthropic.messages.stream({
    model: 'claude-3-5-sonnet-20241022',
    max_tokens: 4096,
    messages: [{ role: 'user', content: userMessage }],
    system: systemPrompt,
    tools: tools
    });

    for await (const event of stream) {
    if (event.type === 'content_block_delta') {
    // Stream text token
    yield { type: 'token', token: event.delta.text };

    } else if (event.type === 'tool_use') {
    // Execute tool
    yield { type: 'tool_use', tool: event.name, input: event.input };

    const result = await executeTool(event.name, event.input, context);

    yield { type: 'tool_result', result };

    // Continue conversation with tool result
    // (Claude may call more tools or respond to user)
    }
    }

    yield { type: 'done' };
    }

    Query Execution Pipeline

    When Claude calls execute_query, we don't just run the SQL. We run it through a pipeline:

    async function executeQuery(sql: string, context: Context) {
    // 1. Validate SQL syntax
    const validation = await validateSQL(sql);
    if (!validation.valid) {
    return { error: `Invalid SQL: ${validation.error}` };
    }

    // 2. Estimate cost
    const cost = await estimateQueryCost(sql, context.warehouse);
    if (cost > context.user.cost_limit) {
    return {
    error: `Query would cost $${cost.toFixed(2)}, exceeding your limit of $${context.user.cost_limit}. Consider narrowing the query.`
    };
    }

    // 3. Check cache
    const cacheKey = hashQuery(sql);
    const cached = await getFromCache(cacheKey);
    if (cached && !isStale(cached, context.freshness_requirements)) {
    return { data: cached.results, cached: true };
    }

    // 4. Optimize query
    const optimized = await optimizeQuery(sql, context.warehouse.schema);

    // 5. Execute with timeout
    const results = await executeWithTimeout(optimized, context.timeout || 30000);

    // 6. Cache results
    await cacheResults(cacheKey, results);

    // 7. Track cost
    await trackQueryCost(context.userId, cost);

    // 8. Return results
    return { data: results, cost };
    }

    Error Handling

    We handle errors at multiple levels:

    1. Query errors:

    try {
    const results = await bigquery.query(sql);
    return results;
    } catch (error) {
    // Translate BigQuery error to user-friendly message
    const translated = translateBigQueryError(error);

    // Ask Claude to fix if possible
    if (translated.can_auto_fix) {
    const fixed = await askClaudeToFix(sql, translated.error);
    return await bigquery.query(fixed);
    }

    return { error: translated.message };
    }

    2. API errors:

    try {
    const response = await anthropic.messages.create(...);
    return response;
    } catch (error) {
    if (error.status === 429) {
    // Rate limited - retry with backoff
    await sleep(exponentialBackoff(retryCount));
    return retry();
    } else if (error.status === 500) {
    // Anthropic outage - fallback message
    return { error: 'AI service temporarily unavailable' };
    }
    throw error;
    }

    3. Timeout handling:

    async function executeWithTimeout(query: string, timeout: number) {
    const timeoutPromise = new Promise((_, reject) =>
    setTimeout(() => reject(new Error('Query timeout')), timeout)
    );

    const queryPromise = bigquery.query(query);

    try {
    return await Promise.race([queryPromise, timeoutPromise]);
    } catch (error) {
    if (error.message === 'Query timeout') {
    // Offer to run async
    return {
    error: 'Query is taking longer than expected',
    suggestion: 'Run this as a background job?',
    can_async: true
    };
    }
    throw error;
    }
    }

    Layer 4: Data Layer

    BigQuery Integration

    We use the official BigQuery client library:

    import { BigQuery } from '@google-cloud/bigquery';

    const bigquery = new BigQuery({
    projectId: process.env.GCP_PROJECT_ID,
    keyFilename: process.env.GCP_KEY_FILE
    });

    Per-user credentials:

    Each user brings their own BigQuery credentials. We don't have access to their data.

    // User provides service account JSON
    const userBigQuery = new BigQuery({
    projectId: user.gcp_project_id,
    credentials: user.gcp_credentials // stored encrypted
    });

    Security: Credentials are encrypted at rest (AES-256) and decrypted only during query execution.

    Schema Introspection

    We need to understand the user's schema to generate good queries:

    async function introspectSchema(projectId: string, datasetId: string) {
    // Get all tables
    const [tables] = await bigquery
    .dataset(datasetId)
    .getTables();

    // Get metadata for each table
    const schema = await Promise.all(
    tables.map(async (table) => {
    const [metadata] = await table.getMetadata();

    return {
    name: table.id,
    schema: metadata.schema.fields,
    partitioning: metadata.timePartitioning,
    clustering: metadata.clustering,
    description: metadata.description,
    num_rows: metadata.numRows,
    size_bytes: metadata.numBytes
    };
    })
    );

    return schema;
    }

    Optimization: We cache schema for 24 hours and invalidate on detected changes.

    Caching Strategy

    We use Redis for multiple caching layers:

    1. Query result cache:

    // Key: hash(sql + parameters)
    // Value: { results, timestamp, cost }
    // TTL: based on data freshness requirements

    2. Schema cache:

    // Key: `schema:${projectId}:${datasetId}`
    // Value: full schema metadata
    // TTL: 24 hours

    3. Session cache:

    // Key: `session:${userId}:${threadId}`
    // Value: conversation context
    // TTL: 24 hours

    Cache invalidation:

    // On schema change
    await redis.del(`schema:${projectId}:${datasetId}`);

    // On new data
    await redis.del(`query:${hashQuery(sql)}`);

    Cost Tracking

    We track query costs in BigQuery itself:

    // Log every query
    await bigquery.dataset('cogny_internal').table('query_logs').insert([{
    user_id: userId,
    query_hash: hashQuery(sql),
    bytes_processed: results.totalBytesProcessed,
    cost: (results.totalBytesProcessed / 1e12) * 5, // $5 per TB
    duration_ms: results.jobStatistics.totalElapsedTime,
    timestamp: new Date()
    }]);

    This lets us:

    • Show users their query costs
    • Aggregate daily/monthly spending
    • Identify expensive queries
    • Optimize common patterns

    Layer 5: Deployment & Infrastructure

    Architecture

    We run on Google Kubernetes Engine (GKE):

    Load Balancer (GCP)

    Ingress (Nginx)

    Frontend Service (2 replicas)

    API Service (3 replicas)

    Background Workers (2 replicas)

    Container Structure

    # Frontend
    FROM node:20-alpine
    WORKDIR /app
    COPY package*.json ./
    RUN npm ci
    COPY . .
    RUN npm run build
    CMD ["npm", "start"]

    # API
    FROM node:20-alpine
    WORKDIR /app
    COPY package*.json ./
    RUN npm ci
    COPY . .
    RUN npm run build
    CMD ["node", "dist/server.js"]

    Configuration

    We use environment variables for configuration:

    # Core
    NODE_ENV=production
    API_URL=https://api.cogny.com

    # Database
    SUPABASE_URL=...
    SUPABASE_ANON_KEY=...
    SUPABASE_SERVICE_KEY=...

    # AI
    ANTHROPIC_API_KEY=...

    # BigQuery
    GCP_PROJECT_ID=...
    GCP_CREDENTIALS=...

    # Caching
    REDIS_URL=...

    # Monitoring
    SENTRY_DSN=...

    Scaling Strategy

    Horizontal scaling:

    • Frontend: Scale based on HTTP requests
    • API: Scale based on CPU utilization
    • Workers: Scale based on queue depth

    # Kubernetes HPA
    apiVersion: autoscaling/v2
    kind: HorizontalPodAutoscaler
    metadata:
    name: api-hpa
    spec:
    scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: api
    minReplicas: 3
    maxReplicas: 10
    metrics:
    - type: Resource
    resource:
    name: cpu
    target:
    type: Utilization
    averageUtilization: 70

    Vertical scaling:

    • BigQuery: Auto-scales (managed service)
    • Redis: Upgrade instance size as needed

    Monitoring

    1. Application metrics:

    • Request latency (p50, p95, p99)
    • Error rates by endpoint
    • Active users
    • Query execution time
    • AI response time
    • Cache hit rates

    2. Infrastructure metrics:

    • CPU/memory utilization
    • Network throughput
    • Disk I/O
    • Pod health

    3. Business metrics:

    • Daily active users
    • Queries per user
    • Average query cost
    • Conversation length
    • User satisfaction (NPS)

    Stack:

    • Metrics: Prometheus + Grafana
    • Errors: Sentry
    • Logs: GCP Cloud Logging
    • Uptime: UptimeRobot

    Incident Response

    We use PagerDuty for on-call alerting:

    P0 (immediate response):

    • API down
    • Database unavailable
    • Critical error rate > 5%

    P1 (1 hour response):

    • Elevated error rate (1-5%)
    • Query latency > 30s p95
    • Cache failure

    P2 (next business day):

    • Non-critical bugs
    • Performance degradation
    • Feature requests

    Layer 6: Security

    Data Security

    1. Encryption:

    • Data in transit: TLS 1.3
    • Data at rest: AES-256
    • User credentials: Encrypted in database

    2. Access control:

    • Row Level Security (RLS) in Supabase
    • Service account with minimal permissions
    • User credentials scoped to their project

    3. Audit logging:

    // Log all data access
    await auditLog.create({
    user_id: userId,
    action: 'query_executed',
    resource: `bigquery:${projectId}:${datasetId}:${table}`,
    ip_address: req.ip,
    user_agent: req.headers['user-agent'],
    timestamp: new Date()
    });

    API Security

    1. Authentication:

    • JWT tokens (Supabase)
    • Refresh token rotation
    • Session timeout (24 hours)

    2. Authorization:

    • Role-based access control (RBAC)
    • Resource ownership validation
    • Scope-limited API keys

    3. Input validation:

    // Validate all inputs
    const schema = z.object({
    query: z.string().max(10000),
    projectId: z.string().regex(/^[a-z0-9-]+$/),
    datasetId: z.string().regex(/^[a-zA-Z0-9_]+$/)
    });

    const validated = schema.parse(req.body);

    4. SQL injection prevention:

    // Use parameterized queries
    const query = `
    SELECT * FROM \`${projectId}.${datasetId}.${table}\`
    WHERE date = @date
    `;

    await bigquery.query({
    query,
    params: { date: userInput }
    });

    Rate Limiting

    Multi-level rate limiting:

    • Per-user API limits
    • Per-IP limits (prevent abuse)
    • Query cost limits
    • Concurrent request limits

    What I'd Do Differently

    After six months of running this in production, here's what I'd change:

    1. Build Observability First

    We added monitoring after launch. Big mistake. Should have been there from day one.

    Next time:

    • Instrument all code paths
    • Set up dashboards before shipping
    • Define SLOs upfront
    • Build alerting early

    2. Design for Multi-Model from the Start

    We built for Claude specifically. Adding support for other models (GPT-4, local models) is now a major refactor.

    Next time:

    • Abstract model interface
    • Make prompts model-agnostic
    • Build model selection logic early

    3. Invest More in Testing Infrastructure

    We have decent test coverage, but not enough integration tests for the full pipeline.

    Next time:

    • Build end-to-end test suite early
    • Automate performance testing
    • Test error scenarios thoroughly
    • Load test before launch

    4. Plan for Data Migrations

    Our database schema has evolved. Migrations are painful.

    Next time:

    • Design for schema evolution
    • Build migration tooling early
    • Version APIs from the start

    5. Cost Optimization Earlier

    We optimized for functionality, then performance, then cost. Cost should have come earlier.

    Next time:

    • Build cost tracking from day one
    • Set budgets per feature
    • Optimize expensive operations early

    The Honest Complexity Assessment

    Building production AI systems is harder than most people think.

    The AI itself is maybe 20% of the work:

    • Data infrastructure: 30%
    • Error handling: 20%
    • Security: 15%
    • Monitoring: 10%
    • Optimization: 5%

    Total engineering time to launch: ~6 months with 2 engineers.

    Ongoing maintenance: ~20% of engineering time.

    It's doable, but don't underestimate the operational complexity of running AI in production.

    Final Thoughts

    This architecture has served us well. We're processing thousands of queries daily with:

    • p95 response time: 8 seconds
    • Uptime: 99.9%
    • Average query cost: $0.18
    • User satisfaction: 4.7/5

    But it's still evolving. Every week we find new optimizations, fix new edge cases, and improve reliability.

    The biggest lesson: AI systems are fundamentally different from traditional software. They're probabilistic, context-dependent, and require different engineering approaches.

    But when you get it right, the capabilities are transformative.

    ---

    About Berner Setterwall

    Berner is CTO and co-founder of Cogny, where he's built the technical architecture for AI-powered marketing automation. Previously, he was a senior engineer at Campanja, building systems for Netflix, Zalando, and other major brands. He specializes in AI architecture, distributed systems, and making complex technology reliable at scale.

    Want to see this architecture in action?

    Cogny brings AI-powered marketing analytics to your BigQuery warehouse. We've built the infrastructure so you don't have to. Book a demo to see how we turn complex architecture into simple, powerful user experiences.