Back to Documentation
    DocumentationfeaturesFeb 19, 2025

    AI Report Generation: How It Works

    Technical deep dive into Cogny's AI-powered report generation system, including Claude integration, query planning, iterative analysis, and natural language generation.

    Overview

    Cogny's AI Report Builder uses advanced language models (Claude 3.5 Sonnet) combined with intelligent query planning to generate comprehensive growth reports from your analytics data.

    Architecture:

    • Natural language understanding of analysis requests
    • Intelligent SQL query generation
    • Iterative data exploration
    • Pattern recognition and insight extraction
    • Natural language report generation

    System Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                     User Input                               │
    │  "Analyze conversion funnel for Q1 2025"                    │
    └─────────────────┬───────────────────────────────────────────┘
                      │
             ┌────────▼───────────┐
             │  Prompt Analysis   │
             │  - Intent parsing  │
             │  - Context extraction
             └────────┬───────────┘
                      │
             ┌────────▼───────────┐
             │  Query Planner     │
             │  (Claude AI)       │
             └────────┬───────────┘
                      │
        ┌─────────────┼─────────────┐
        │             │             │
    ┌───▼────┐  ┌────▼─────┐  ┌───▼────┐
    │Dataset │  │Schema    │  │Context │
    │Lookup  │  │Inspection│  │Memory  │
    └───┬────┘  └────┬─────┘  └───┬────┘
        │             │             │
        └─────────────┴─────────────┘
                      │
             ┌────────▼───────────┐
             │  SQL Generation    │
             │  - Query synthesis │
             │  - Validation      │
             └────────┬───────────┘
                      │
             ┌────────▼───────────┐
             │  BigQuery Execute  │
             └────────┬───────────┘
                      │
             ┌────────▼───────────┐
             │  Result Analysis   │
             │  (Claude AI)       │
             │  - Pattern detection
             │  - Insight extraction
             └────────┬───────────┘
                      │
        ┌─────────────┴─────────────┐
        │                           │
    ┌───▼────────┐       ┌──────────▼────┐
    │Visualize   │       │ More Questions?│
    │Results     │       └──────────┬─────┘
    └───┬────────┘                  │
        │                           │ Yes
        │                           │
        │              ┌────────────▼─────┐
        │              │  Iterative Query │
        │              │  (max 50 loops)  │
        │              └────────────┬─────┘
        │                           │
        └───────────────┬───────────┘
                        │ No (Complete)
               ┌────────▼────────┐
               │ Report Assembly │
               │ - Sections      │
               │ - Insights      │
               │ - Recommendations
               └─────────────────┘
    

    Claude AI Integration

    Model Selection

    Primary Model: Claude 3.5 Sonnet

    • Context Window: 200K tokens
    • Reasoning: Advanced analytical capabilities
    • Tool Use: Native function calling
    • Streaming: Real-time response streaming

    System Prompt

    The system prompt defines Claude's role and capabilities:

    SYSTEM_PROMPT = """
    You are an expert growth analyst with deep knowledge of:
    - Google Analytics 4 data structure and schema
    - BigQuery SQL and query optimization
    - Conversion funnel analysis
    - User behavior patterns
    - Statistical analysis and A/B testing
    - Growth marketing strategies
    
    Your role is to:
    1. Understand user's analytical questions
    2. Plan and execute SQL queries against GA4 BigQuery data
    3. Analyze results to identify patterns and insights
    4. Provide actionable recommendations
    
    Available tools:
    - list_datasets: List available BigQuery datasets
    - list_tables: List tables in a dataset
    - inspect_schema: View table schema and sample data
    - execute_query: Run SQL query against BigQuery
    - create_visualization: Generate charts from query results
    
    Guidelines:
    - Always inspect schema before querying unfamiliar tables
    - Use efficient SQL with proper partitioning (_TABLE_SUFFIX)
    - Explain your analytical approach step-by-step
    - Provide confidence levels for insights
    - Include specific metrics and percentages in findings
    - Suggest follow-up analyses when relevant
    """
    

    Context Management

    Maintain conversation context for iterative analysis:

    class ConversationContext:
        def __init__(self, warehouse_id, max_messages=50):
            self.warehouse_id = warehouse_id
            self.messages = []
            self.max_messages = max_messages
            self.query_history = []
            self.insights = []
    
        def add_message(self, role, content):
            """Add message to context"""
            self.messages.append({
                'role': role,
                'content': content,
                'timestamp': datetime.now().isoformat()
            })
    
            # Trim old messages if exceeding limit
            if len(self.messages) > self.max_messages:
                # Keep system prompt + recent messages
                self.messages = [self.messages[0]] + self.messages[-(self.max_messages-1):]
    
        def add_query(self, query, results):
            """Track executed queries"""
            self.query_history.append({
                'query': query,
                'row_count': len(results),
                'timestamp': datetime.now().isoformat()
            })
    
        def add_insight(self, insight):
            """Track discovered insights"""
            self.insights.append(insight)
    
        def get_context_summary(self):
            """Generate context summary for Claude"""
            return {
                'queries_executed': len(self.query_history),
                'insights_found': len(self.insights),
                'conversation_length': len(self.messages)
            }
    

    Tool Implementations

    list_datasets

    Discover available BigQuery datasets:

    async def list_datasets(project_id: str):
        """List BigQuery datasets"""
        from google.cloud import bigquery
    
        client = bigquery.Client(project=project_id)
    
        datasets = []
        for dataset in client.list_datasets():
            dataset_info = {
                'dataset_id': dataset.dataset_id,
                'project': dataset.project,
                'location': dataset.location,
                'full_id': f"{dataset.project}.{dataset.dataset_id}"
            }
            datasets.append(dataset_info)
    
        return {
            'datasets': datasets,
            'count': len(datasets)
        }
    

    inspect_schema

    Examine table structure and sample data:

    async def inspect_schema(project_id: str, dataset_id: str, table_id: str):
        """Inspect table schema and get sample data"""
        from google.cloud import bigquery
    
        client = bigquery.Client(project=project_id)
    
        # Get schema
        table_ref = f"{project_id}.{dataset_id}.{table_id}"
        table = client.get_table(table_ref)
    
        schema = []
        for field in table.schema:
            schema.append({
                'name': field.name,
                'type': field.field_type,
                'mode': field.mode,
                'description': field.description
            })
    
        # Get sample data
        query = f"""
        SELECT *
        FROM `{table_ref}`
        WHERE _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
        LIMIT 5
        """
    
        sample_results = client.query(query).result()
        sample_data = [dict(row) for row in sample_results]
    
        # Get table stats
        stats_query = f"""
        SELECT
          COUNT(*) as row_count,
          COUNT(DISTINCT user_pseudo_id) as unique_users
        FROM `{table_ref}`
        WHERE _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
        """
    
        stats = list(client.query(stats_query).result())[0]
    
        return {
            'schema': schema,
            'sample_data': sample_data,
            'stats': dict(stats),
            'table_info': {
                'num_rows': table.num_rows,
                'num_bytes': table.num_bytes,
                'created': table.created.isoformat(),
                'modified': table.modified.isoformat()
            }
        }
    

    execute_query

    Execute SQL query with safety checks:

    async def execute_query(
        project_id: str,
        query: str,
        max_results: int = 1000,
        dry_run: bool = False
    ):
        """Execute BigQuery SQL query"""
        from google.cloud import bigquery
        import sqlparse
    
        client = bigquery.Client(project=project_id)
    
        # Validate query
        if not validate_query_safety(query):
            raise ValueError("Query contains potentially unsafe operations")
    
        # Format query
        formatted_query = sqlparse.format(
            query,
            reindent=True,
            keyword_case='upper'
        )
    
        # Configure job
        job_config = bigquery.QueryJobConfig()
    
        if dry_run:
            job_config.dry_run = True
            job_config.use_query_cache = False
    
        # Execute
        query_job = client.query(formatted_query, job_config=job_config)
    
        if dry_run:
            return {
                'bytes_processed': query_job.total_bytes_processed,
                'estimated_cost': calculate_query_cost(query_job.total_bytes_processed)
            }
    
        # Get results
        results = query_job.result(max_results=max_results)
    
        rows = []
        for row in results:
            rows.append(dict(row))
    
        return {
            'rows': rows,
            'total_rows': results.total_rows,
            'schema': [
                {'name': field.name, 'type': field.field_type}
                for field in results.schema
            ],
            'job_stats': {
                'bytes_processed': query_job.total_bytes_processed,
                'bytes_billed': query_job.total_bytes_billed,
                'slot_millis': query_job.slot_millis,
                'execution_time_ms': (
                    query_job.ended - query_job.started
                ).total_seconds() * 1000
            }
        }
    
    def validate_query_safety(query: str) -> bool:
        """Validate query doesn't contain unsafe operations"""
        query_upper = query.upper()
    
        # Disallow DDL operations
        unsafe_keywords = [
            'DROP', 'DELETE', 'TRUNCATE', 'UPDATE',
            'INSERT', 'CREATE', 'ALTER', 'GRANT', 'REVOKE'
        ]
    
        for keyword in unsafe_keywords:
            if keyword in query_upper:
                return False
    
        return True
    

    create_visualization

    Generate charts from query results:

    async def create_visualization(
        data: list,
        chart_type: str,
        title: str,
        x_axis: str,
        y_axis: str,
        options: dict = None
    ):
        """Create visualization from data"""
        import plotly.graph_objects as go
        import plotly.express as px
        import pandas as pd
    
        df = pd.DataFrame(data)
    
        if chart_type == 'line':
            fig = px.line(df, x=x_axis, y=y_axis, title=title)
    
        elif chart_type == 'bar':
            fig = px.bar(df, x=x_axis, y=y_axis, title=title)
    
        elif chart_type == 'funnel':
            fig = go.Figure(go.Funnel(
                y=df[x_axis],
                x=df[y_axis],
                textposition="inside",
                textinfo="value+percent initial"
            ))
            fig.update_layout(title=title)
    
        elif chart_type == 'pie':
            fig = px.pie(df, names=x_axis, values=y_axis, title=title)
    
        elif chart_type == 'scatter':
            fig = px.scatter(df, x=x_axis, y=y_axis, title=title)
    
        # Apply custom options
        if options:
            fig.update_layout(**options)
    
        # Export to various formats
        chart_html = fig.to_html()
        chart_json = fig.to_json()
        chart_image = fig.to_image(format='png')
    
        return {
            'html': chart_html,
            'json': chart_json,
            'image_base64': base64.b64encode(chart_image).decode(),
            'chart_type': chart_type
        }
    

    Query Planning

    Intent Recognition

    Parse user intent from natural language:

    async def analyze_user_intent(user_prompt: str, context: ConversationContext):
        """Analyze user's analytical intent"""
    
        # Use Claude to parse intent
        intent_prompt = f"""
        Analyze this analytical request and extract key information:
    
        Request: "{user_prompt}"
    
        Extract:
        1. Analysis type (funnel, cohort, trend, comparison, etc.)
        2. Metrics of interest
        3. Dimensions for breakdown
        4. Time period
        5. Specific questions to answer
    
        Return as JSON.
        """
    
        response = await call_claude(intent_prompt, context)
    
        intent = json.loads(response)
    
        return {
            'analysis_type': intent.get('analysis_type'),
            'metrics': intent.get('metrics', []),
            'dimensions': intent.get('dimensions', []),
            'time_period': intent.get('time_period'),
            'questions': intent.get('questions', [])
        }
    

    Query Strategy

    Plan optimal query approach:

    async def plan_query_strategy(intent: dict, schema: dict):
        """Plan query execution strategy"""
    
        strategy = {
            'queries': [],
            'sequence': []
        }
    
        # Determine query sequence based on analysis type
        if intent['analysis_type'] == 'funnel':
            strategy['queries'] = [
                'identify_funnel_events',
                'calculate_step_conversions',
                'analyze_drop_offs',
                'segment_by_dimensions'
            ]
    
        elif intent['analysis_type'] == 'cohort':
            strategy['queries'] = [
                'identify_cohort_dates',
                'calculate_retention_by_cohort',
                'analyze_cohort_behavior'
            ]
    
        elif intent['analysis_type'] == 'trend':
            strategy['queries'] = [
                'aggregate_by_time_period',
                'calculate_moving_averages',
                'identify_anomalies'
            ]
    
        # Add dimension breakdowns
        for dimension in intent['dimensions']:
            strategy['queries'].append(f'breakdown_by_{dimension}')
    
        return strategy
    

    Adaptive Query Generation

    Generate queries that adapt based on results:

    async def generate_adaptive_query(
        intent: dict,
        previous_results: list,
        context: ConversationContext
    ):
        """Generate next query based on previous results"""
    
        # Analyze previous results
        insights = analyze_results(previous_results)
    
        # Determine next question
        if insights.get('unusual_patterns'):
            # Investigate unusual patterns
            next_query_type = 'investigate_anomaly'
    
        elif insights.get('incomplete_coverage'):
            # Fill data gaps
            next_query_type = 'expand_analysis'
    
        elif insights.get('interesting_segments'):
            # Deep dive into segments
            next_query_type = 'segment_deep_dive'
    
        else:
            # Analysis complete
            return None
    
        # Generate query
        query_prompt = f"""
        Based on these findings:
        {json.dumps(insights, indent=2)}
    
        Generate a SQL query to: {next_query_type}
    
        Consider:
        - Previous queries: {len(context.query_history)}
        - Time period: {intent['time_period']}
        - Focus areas: {', '.join(intent['metrics'])}
        """
    
        query = await call_claude(query_prompt, context)
    
        return query
    

    Insight Extraction

    Pattern Recognition

    Identify patterns in query results:

    def detect_patterns(data: list) -> list:
        """Detect patterns in data"""
        import pandas as pd
        import numpy as np
    
        df = pd.DataFrame(data)
        patterns = []
    
        # Trend detection
        for column in df.select_dtypes(include=[np.number]).columns:
            # Calculate trend
            x = np.arange(len(df))
            y = df[column].values
            slope, _ = np.polyfit(x, y, 1)
    
            if abs(slope) > 0.1:  # Significant trend
                direction = 'increasing' if slope > 0 else 'decreasing'
                patterns.append({
                    'type': 'trend',
                    'metric': column,
                    'direction': direction,
                    'magnitude': abs(slope),
                    'confidence': 0.85
                })
    
        # Anomaly detection
        for column in df.select_dtypes(include=[np.number]).columns:
            mean = df[column].mean()
            std = df[column].std()
            z_scores = (df[column] - mean) / std
    
            anomalies = df[abs(z_scores) > 2]  # 2 standard deviations
    
            if len(anomalies) > 0:
                patterns.append({
                    'type': 'anomaly',
                    'metric': column,
                    'values': anomalies[column].tolist(),
                    'indices': anomalies.index.tolist(),
                    'confidence': 0.75
                })
    
        # Correlation detection
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        if len(numeric_cols) >= 2:
            corr_matrix = df[numeric_cols].corr()
    
            for i in range(len(numeric_cols)):
                for j in range(i + 1, len(numeric_cols)):
                    corr = corr_matrix.iloc[i, j]
    
                    if abs(corr) > 0.7:  # Strong correlation
                        patterns.append({
                            'type': 'correlation',
                            'metric_1': numeric_cols[i],
                            'metric_2': numeric_cols[j],
                            'correlation': corr,
                            'confidence': 0.9
                        })
    
        return patterns
    

    Insight Generation

    Transform patterns into actionable insights:

    async def generate_insights(patterns: list, context: dict):
        """Generate insights from detected patterns"""
    
        insights = []
    
        for pattern in patterns:
            if pattern['type'] == 'trend':
                insight = {
                    'title': f"{pattern['metric'].replace('_', ' ').title()} {pattern['direction'].title()}",
                    'description': await generate_trend_insight(pattern, context),
                    'type': 'trend',
                    'priority': calculate_priority(pattern),
                    'confidence': pattern['confidence'],
                    'data': pattern
                }
                insights.append(insight)
    
            elif pattern['type'] == 'anomaly':
                insight = {
                    'title': f"Unusual {pattern['metric'].replace('_', ' ').title()} Detected",
                    'description': await generate_anomaly_insight(pattern, context),
                    'type': 'anomaly',
                    'priority': 'high',
                    'confidence': pattern['confidence'],
                    'data': pattern
                }
                insights.append(insight)
    
            elif pattern['type'] == 'correlation':
                insight = {
                    'title': f"{pattern['metric_1']} Correlated with {pattern['metric_2']}",
                    'description': await generate_correlation_insight(pattern, context),
                    'type': 'correlation',
                    'priority': calculate_priority(pattern),
                    'confidence': pattern['confidence'],
                    'data': pattern
                }
                insights.append(insight)
    
        return insights
    
    async def generate_trend_insight(pattern: dict, context: dict):
        """Generate natural language insight for trend"""
    
        metric = pattern['metric'].replace('_', ' ')
        direction = pattern['direction']
        magnitude = pattern['magnitude']
    
        prompt = f"""
        Generate a concise insight about this trend:
        - Metric: {metric}
        - Direction: {direction}
        - Magnitude: {magnitude:.2f}
        - Context: {json.dumps(context, indent=2)}
    
        Explain what this means for the business and potential causes.
        Keep it under 100 words.
        """
    
        insight = await call_claude(prompt)
        return insight
    

    Report Assembly

    Section Generation

    Organize insights into report sections:

    def assemble_report(
        insights: list,
        queries: list,
        visualizations: list,
        context: dict
    ) -> dict:
        """Assemble final report"""
    
        report = {
            'title': context.get('title', 'Growth Analysis Report'),
            'created_at': datetime.now().isoformat(),
            'summary': generate_executive_summary(insights),
            'sections': []
        }
    
        # Group insights by category
        insights_by_category = group_insights_by_category(insights)
    
        for category, category_insights in insights_by_category.items():
            section = {
                'title': category.replace('_', ' ').title(),
                'insights': category_insights,
                'visualizations': filter_visualizations(visualizations, category),
                'queries': filter_queries(queries, category)
            }
            report['sections'].append(section)
    
        # Add recommendations
        report['recommendations'] = generate_recommendations(insights, context)
    
        return report
    
    def generate_executive_summary(insights: list) -> str:
        """Generate executive summary"""
    
        high_priority = [i for i in insights if i.get('priority') == 'high']
    
        summary = f"Analysis identified {len(insights)} insights, "
        summary += f"including {len(high_priority)} high-priority findings.\n\n"
    
        # Top 3 insights
        top_insights = sorted(
            insights,
            key=lambda x: (x.get('priority') == 'high', x.get('confidence', 0)),
            reverse=True
        )[:3]
    
        summary += "Key Findings:\n"
        for i, insight in enumerate(top_insights, 1):
            summary += f"{i}. {insight['title']}\n"
    
        return summary
    

    Natural Language Generation

    Generate human-readable report text:

    async def generate_report_narrative(report: dict):
        """Generate natural language report narrative"""
    
        narrative_prompt = f"""
        Generate a comprehensive analytical narrative based on these findings:
    
        {json.dumps(report, indent=2)}
    
        Requirements:
        - Clear, executive-friendly language
        - Specific metrics and percentages
        - Logical flow between sections
        - Actionable insights
        - 500-1000 words
    
        Structure:
        1. Executive Summary
        2. Key Findings
        3. Detailed Analysis
        4. Recommendations
        """
    
        narrative = await call_claude(narrative_prompt)
    
        return narrative
    

    Streaming Implementation

    Server-Sent Events

    Stream report generation progress:

    async def stream_report_generation(request_id: str, user_prompt: str):
        """Stream report generation with SSE"""
    
        async def event_generator():
            context = ConversationContext(warehouse_id)
    
            # Yield status updates
            yield sse_event('status', {'message': 'Analyzing request...'})
    
            # Intent analysis
            intent = await analyze_user_intent(user_prompt, context)
            yield sse_event('status', {'message': 'Planning queries...'})
    
            # Execute queries
            for query_plan in intent['query_sequence']:
                yield sse_event('query', {'status': 'executing', 'query': query_plan})
    
                results = await execute_query(query_plan)
    
                yield sse_event('result', {
                    'rows': len(results),
                    'columns': list(results[0].keys()) if results else []
                })
    
                # Analyze results
                patterns = detect_patterns(results)
                insights = await generate_insights(patterns, context)
    
                for insight in insights:
                    yield sse_event('insight', insight)
    
                # Generate visualization
                if should_visualize(results):
                    viz = await create_visualization(results, ...)
                    yield sse_event('visualization', viz)
    
            # Complete
            report = assemble_report(context.insights, context.query_history, ...)
            yield sse_event('completed', report)
    
        return StreamingResponse(event_generator(), media_type='text/event-stream')
    
    def sse_event(event_type: str, data: dict) -> str:
        """Format SSE event"""
        return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
    

    Performance Optimization

    Query Caching

    Cache frequently executed queries:

    from functools import lru_cache
    import hashlib
    
    @lru_cache(maxsize=1000)
    def cached_query_results(query_hash: str):
        """Cache query results"""
        # Fetch from cache or execute
        pass
    
    def hash_query(query: str, params: dict) -> str:
        """Generate cache key"""
        cache_key = f"{query}_{json.dumps(params, sort_keys=True)}"
        return hashlib.sha256(cache_key.encode()).hexdigest()
    

    Parallel Query Execution

    Execute independent queries in parallel:

    import asyncio
    
    async def execute_queries_parallel(queries: list):
        """Execute multiple queries in parallel"""
        tasks = [execute_query(q) for q in queries]
        results = await asyncio.gather(*tasks)
        return results
    

    Next Steps

    Resources

    Need Implementation Help?

    Talk to Our Technical Team

    Schedule a technical consultation to discuss your integration requirements and implementation strategy.

    Schedule Demo