AI Report Generation: How It Works
Technical deep dive into Cogny's AI-powered report generation system, including Claude integration, query planning, iterative analysis, and natural language generation.
Overview
Cogny's AI Report Builder uses advanced language models (Claude 3.5 Sonnet) combined with intelligent query planning to generate comprehensive growth reports from your analytics data.
Architecture:
- Natural language understanding of analysis requests
- Intelligent SQL query generation
- Iterative data exploration
- Pattern recognition and insight extraction
- Natural language report generation
System Architecture
┌─────────────────────────────────────────────────────────────┐
│ User Input │
│ "Analyze conversion funnel for Q1 2025" │
└─────────────────┬───────────────────────────────────────────┘
│
┌────────▼───────────┐
│ Prompt Analysis │
│ - Intent parsing │
│ - Context extraction
└────────┬───────────┘
│
┌────────▼───────────┐
│ Query Planner │
│ (Claude AI) │
└────────┬───────────┘
│
┌─────────────┼─────────────┐
│ │ │
┌───▼────┐ ┌────▼─────┐ ┌───▼────┐
│Dataset │ │Schema │ │Context │
│Lookup │ │Inspection│ │Memory │
└───┬────┘ └────┬─────┘ └───┬────┘
│ │ │
└─────────────┴─────────────┘
│
┌────────▼───────────┐
│ SQL Generation │
│ - Query synthesis │
│ - Validation │
└────────┬───────────┘
│
┌────────▼───────────┐
│ BigQuery Execute │
└────────┬───────────┘
│
┌────────▼───────────┐
│ Result Analysis │
│ (Claude AI) │
│ - Pattern detection
│ - Insight extraction
└────────┬───────────┘
│
┌─────────────┴─────────────┐
│ │
┌───▼────────┐ ┌──────────▼────┐
│Visualize │ │ More Questions?│
│Results │ └──────────┬─────┘
└───┬────────┘ │
│ │ Yes
│ │
│ ┌────────────▼─────┐
│ │ Iterative Query │
│ │ (max 50 loops) │
│ └────────────┬─────┘
│ │
└───────────────┬───────────┘
│ No (Complete)
┌────────▼────────┐
│ Report Assembly │
│ - Sections │
│ - Insights │
│ - Recommendations
└─────────────────┘
Claude AI Integration
Model Selection
Primary Model: Claude 3.5 Sonnet
- Context Window: 200K tokens
- Reasoning: Advanced analytical capabilities
- Tool Use: Native function calling
- Streaming: Real-time response streaming
System Prompt
The system prompt defines Claude's role and capabilities:
SYSTEM_PROMPT = """
You are an expert growth analyst with deep knowledge of:
- Google Analytics 4 data structure and schema
- BigQuery SQL and query optimization
- Conversion funnel analysis
- User behavior patterns
- Statistical analysis and A/B testing
- Growth marketing strategies
Your role is to:
1. Understand user's analytical questions
2. Plan and execute SQL queries against GA4 BigQuery data
3. Analyze results to identify patterns and insights
4. Provide actionable recommendations
Available tools:
- list_datasets: List available BigQuery datasets
- list_tables: List tables in a dataset
- inspect_schema: View table schema and sample data
- execute_query: Run SQL query against BigQuery
- create_visualization: Generate charts from query results
Guidelines:
- Always inspect schema before querying unfamiliar tables
- Use efficient SQL with proper partitioning (_TABLE_SUFFIX)
- Explain your analytical approach step-by-step
- Provide confidence levels for insights
- Include specific metrics and percentages in findings
- Suggest follow-up analyses when relevant
"""
Context Management
Maintain conversation context for iterative analysis:
class ConversationContext:
def __init__(self, warehouse_id, max_messages=50):
self.warehouse_id = warehouse_id
self.messages = []
self.max_messages = max_messages
self.query_history = []
self.insights = []
def add_message(self, role, content):
"""Add message to context"""
self.messages.append({
'role': role,
'content': content,
'timestamp': datetime.now().isoformat()
})
# Trim old messages if exceeding limit
if len(self.messages) > self.max_messages:
# Keep system prompt + recent messages
self.messages = [self.messages[0]] + self.messages[-(self.max_messages-1):]
def add_query(self, query, results):
"""Track executed queries"""
self.query_history.append({
'query': query,
'row_count': len(results),
'timestamp': datetime.now().isoformat()
})
def add_insight(self, insight):
"""Track discovered insights"""
self.insights.append(insight)
def get_context_summary(self):
"""Generate context summary for Claude"""
return {
'queries_executed': len(self.query_history),
'insights_found': len(self.insights),
'conversation_length': len(self.messages)
}
Tool Implementations
list_datasets
Discover available BigQuery datasets:
async def list_datasets(project_id: str):
"""List BigQuery datasets"""
from google.cloud import bigquery
client = bigquery.Client(project=project_id)
datasets = []
for dataset in client.list_datasets():
dataset_info = {
'dataset_id': dataset.dataset_id,
'project': dataset.project,
'location': dataset.location,
'full_id': f"{dataset.project}.{dataset.dataset_id}"
}
datasets.append(dataset_info)
return {
'datasets': datasets,
'count': len(datasets)
}
inspect_schema
Examine table structure and sample data:
async def inspect_schema(project_id: str, dataset_id: str, table_id: str):
"""Inspect table schema and get sample data"""
from google.cloud import bigquery
client = bigquery.Client(project=project_id)
# Get schema
table_ref = f"{project_id}.{dataset_id}.{table_id}"
table = client.get_table(table_ref)
schema = []
for field in table.schema:
schema.append({
'name': field.name,
'type': field.field_type,
'mode': field.mode,
'description': field.description
})
# Get sample data
query = f"""
SELECT *
FROM `{table_ref}`
WHERE _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
LIMIT 5
"""
sample_results = client.query(query).result()
sample_data = [dict(row) for row in sample_results]
# Get table stats
stats_query = f"""
SELECT
COUNT(*) as row_count,
COUNT(DISTINCT user_pseudo_id) as unique_users
FROM `{table_ref}`
WHERE _TABLE_SUFFIX = FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
"""
stats = list(client.query(stats_query).result())[0]
return {
'schema': schema,
'sample_data': sample_data,
'stats': dict(stats),
'table_info': {
'num_rows': table.num_rows,
'num_bytes': table.num_bytes,
'created': table.created.isoformat(),
'modified': table.modified.isoformat()
}
}
execute_query
Execute SQL query with safety checks:
async def execute_query(
project_id: str,
query: str,
max_results: int = 1000,
dry_run: bool = False
):
"""Execute BigQuery SQL query"""
from google.cloud import bigquery
import sqlparse
client = bigquery.Client(project=project_id)
# Validate query
if not validate_query_safety(query):
raise ValueError("Query contains potentially unsafe operations")
# Format query
formatted_query = sqlparse.format(
query,
reindent=True,
keyword_case='upper'
)
# Configure job
job_config = bigquery.QueryJobConfig()
if dry_run:
job_config.dry_run = True
job_config.use_query_cache = False
# Execute
query_job = client.query(formatted_query, job_config=job_config)
if dry_run:
return {
'bytes_processed': query_job.total_bytes_processed,
'estimated_cost': calculate_query_cost(query_job.total_bytes_processed)
}
# Get results
results = query_job.result(max_results=max_results)
rows = []
for row in results:
rows.append(dict(row))
return {
'rows': rows,
'total_rows': results.total_rows,
'schema': [
{'name': field.name, 'type': field.field_type}
for field in results.schema
],
'job_stats': {
'bytes_processed': query_job.total_bytes_processed,
'bytes_billed': query_job.total_bytes_billed,
'slot_millis': query_job.slot_millis,
'execution_time_ms': (
query_job.ended - query_job.started
).total_seconds() * 1000
}
}
def validate_query_safety(query: str) -> bool:
"""Validate query doesn't contain unsafe operations"""
query_upper = query.upper()
# Disallow DDL operations
unsafe_keywords = [
'DROP', 'DELETE', 'TRUNCATE', 'UPDATE',
'INSERT', 'CREATE', 'ALTER', 'GRANT', 'REVOKE'
]
for keyword in unsafe_keywords:
if keyword in query_upper:
return False
return True
create_visualization
Generate charts from query results:
async def create_visualization(
data: list,
chart_type: str,
title: str,
x_axis: str,
y_axis: str,
options: dict = None
):
"""Create visualization from data"""
import plotly.graph_objects as go
import plotly.express as px
import pandas as pd
df = pd.DataFrame(data)
if chart_type == 'line':
fig = px.line(df, x=x_axis, y=y_axis, title=title)
elif chart_type == 'bar':
fig = px.bar(df, x=x_axis, y=y_axis, title=title)
elif chart_type == 'funnel':
fig = go.Figure(go.Funnel(
y=df[x_axis],
x=df[y_axis],
textposition="inside",
textinfo="value+percent initial"
))
fig.update_layout(title=title)
elif chart_type == 'pie':
fig = px.pie(df, names=x_axis, values=y_axis, title=title)
elif chart_type == 'scatter':
fig = px.scatter(df, x=x_axis, y=y_axis, title=title)
# Apply custom options
if options:
fig.update_layout(**options)
# Export to various formats
chart_html = fig.to_html()
chart_json = fig.to_json()
chart_image = fig.to_image(format='png')
return {
'html': chart_html,
'json': chart_json,
'image_base64': base64.b64encode(chart_image).decode(),
'chart_type': chart_type
}
Query Planning
Intent Recognition
Parse user intent from natural language:
async def analyze_user_intent(user_prompt: str, context: ConversationContext):
"""Analyze user's analytical intent"""
# Use Claude to parse intent
intent_prompt = f"""
Analyze this analytical request and extract key information:
Request: "{user_prompt}"
Extract:
1. Analysis type (funnel, cohort, trend, comparison, etc.)
2. Metrics of interest
3. Dimensions for breakdown
4. Time period
5. Specific questions to answer
Return as JSON.
"""
response = await call_claude(intent_prompt, context)
intent = json.loads(response)
return {
'analysis_type': intent.get('analysis_type'),
'metrics': intent.get('metrics', []),
'dimensions': intent.get('dimensions', []),
'time_period': intent.get('time_period'),
'questions': intent.get('questions', [])
}
Query Strategy
Plan optimal query approach:
async def plan_query_strategy(intent: dict, schema: dict):
"""Plan query execution strategy"""
strategy = {
'queries': [],
'sequence': []
}
# Determine query sequence based on analysis type
if intent['analysis_type'] == 'funnel':
strategy['queries'] = [
'identify_funnel_events',
'calculate_step_conversions',
'analyze_drop_offs',
'segment_by_dimensions'
]
elif intent['analysis_type'] == 'cohort':
strategy['queries'] = [
'identify_cohort_dates',
'calculate_retention_by_cohort',
'analyze_cohort_behavior'
]
elif intent['analysis_type'] == 'trend':
strategy['queries'] = [
'aggregate_by_time_period',
'calculate_moving_averages',
'identify_anomalies'
]
# Add dimension breakdowns
for dimension in intent['dimensions']:
strategy['queries'].append(f'breakdown_by_{dimension}')
return strategy
Adaptive Query Generation
Generate queries that adapt based on results:
async def generate_adaptive_query(
intent: dict,
previous_results: list,
context: ConversationContext
):
"""Generate next query based on previous results"""
# Analyze previous results
insights = analyze_results(previous_results)
# Determine next question
if insights.get('unusual_patterns'):
# Investigate unusual patterns
next_query_type = 'investigate_anomaly'
elif insights.get('incomplete_coverage'):
# Fill data gaps
next_query_type = 'expand_analysis'
elif insights.get('interesting_segments'):
# Deep dive into segments
next_query_type = 'segment_deep_dive'
else:
# Analysis complete
return None
# Generate query
query_prompt = f"""
Based on these findings:
{json.dumps(insights, indent=2)}
Generate a SQL query to: {next_query_type}
Consider:
- Previous queries: {len(context.query_history)}
- Time period: {intent['time_period']}
- Focus areas: {', '.join(intent['metrics'])}
"""
query = await call_claude(query_prompt, context)
return query
Insight Extraction
Pattern Recognition
Identify patterns in query results:
def detect_patterns(data: list) -> list:
"""Detect patterns in data"""
import pandas as pd
import numpy as np
df = pd.DataFrame(data)
patterns = []
# Trend detection
for column in df.select_dtypes(include=[np.number]).columns:
# Calculate trend
x = np.arange(len(df))
y = df[column].values
slope, _ = np.polyfit(x, y, 1)
if abs(slope) > 0.1: # Significant trend
direction = 'increasing' if slope > 0 else 'decreasing'
patterns.append({
'type': 'trend',
'metric': column,
'direction': direction,
'magnitude': abs(slope),
'confidence': 0.85
})
# Anomaly detection
for column in df.select_dtypes(include=[np.number]).columns:
mean = df[column].mean()
std = df[column].std()
z_scores = (df[column] - mean) / std
anomalies = df[abs(z_scores) > 2] # 2 standard deviations
if len(anomalies) > 0:
patterns.append({
'type': 'anomaly',
'metric': column,
'values': anomalies[column].tolist(),
'indices': anomalies.index.tolist(),
'confidence': 0.75
})
# Correlation detection
numeric_cols = df.select_dtypes(include=[np.number]).columns
if len(numeric_cols) >= 2:
corr_matrix = df[numeric_cols].corr()
for i in range(len(numeric_cols)):
for j in range(i + 1, len(numeric_cols)):
corr = corr_matrix.iloc[i, j]
if abs(corr) > 0.7: # Strong correlation
patterns.append({
'type': 'correlation',
'metric_1': numeric_cols[i],
'metric_2': numeric_cols[j],
'correlation': corr,
'confidence': 0.9
})
return patterns
Insight Generation
Transform patterns into actionable insights:
async def generate_insights(patterns: list, context: dict):
"""Generate insights from detected patterns"""
insights = []
for pattern in patterns:
if pattern['type'] == 'trend':
insight = {
'title': f"{pattern['metric'].replace('_', ' ').title()} {pattern['direction'].title()}",
'description': await generate_trend_insight(pattern, context),
'type': 'trend',
'priority': calculate_priority(pattern),
'confidence': pattern['confidence'],
'data': pattern
}
insights.append(insight)
elif pattern['type'] == 'anomaly':
insight = {
'title': f"Unusual {pattern['metric'].replace('_', ' ').title()} Detected",
'description': await generate_anomaly_insight(pattern, context),
'type': 'anomaly',
'priority': 'high',
'confidence': pattern['confidence'],
'data': pattern
}
insights.append(insight)
elif pattern['type'] == 'correlation':
insight = {
'title': f"{pattern['metric_1']} Correlated with {pattern['metric_2']}",
'description': await generate_correlation_insight(pattern, context),
'type': 'correlation',
'priority': calculate_priority(pattern),
'confidence': pattern['confidence'],
'data': pattern
}
insights.append(insight)
return insights
async def generate_trend_insight(pattern: dict, context: dict):
"""Generate natural language insight for trend"""
metric = pattern['metric'].replace('_', ' ')
direction = pattern['direction']
magnitude = pattern['magnitude']
prompt = f"""
Generate a concise insight about this trend:
- Metric: {metric}
- Direction: {direction}
- Magnitude: {magnitude:.2f}
- Context: {json.dumps(context, indent=2)}
Explain what this means for the business and potential causes.
Keep it under 100 words.
"""
insight = await call_claude(prompt)
return insight
Report Assembly
Section Generation
Organize insights into report sections:
def assemble_report(
insights: list,
queries: list,
visualizations: list,
context: dict
) -> dict:
"""Assemble final report"""
report = {
'title': context.get('title', 'Growth Analysis Report'),
'created_at': datetime.now().isoformat(),
'summary': generate_executive_summary(insights),
'sections': []
}
# Group insights by category
insights_by_category = group_insights_by_category(insights)
for category, category_insights in insights_by_category.items():
section = {
'title': category.replace('_', ' ').title(),
'insights': category_insights,
'visualizations': filter_visualizations(visualizations, category),
'queries': filter_queries(queries, category)
}
report['sections'].append(section)
# Add recommendations
report['recommendations'] = generate_recommendations(insights, context)
return report
def generate_executive_summary(insights: list) -> str:
"""Generate executive summary"""
high_priority = [i for i in insights if i.get('priority') == 'high']
summary = f"Analysis identified {len(insights)} insights, "
summary += f"including {len(high_priority)} high-priority findings.\n\n"
# Top 3 insights
top_insights = sorted(
insights,
key=lambda x: (x.get('priority') == 'high', x.get('confidence', 0)),
reverse=True
)[:3]
summary += "Key Findings:\n"
for i, insight in enumerate(top_insights, 1):
summary += f"{i}. {insight['title']}\n"
return summary
Natural Language Generation
Generate human-readable report text:
async def generate_report_narrative(report: dict):
"""Generate natural language report narrative"""
narrative_prompt = f"""
Generate a comprehensive analytical narrative based on these findings:
{json.dumps(report, indent=2)}
Requirements:
- Clear, executive-friendly language
- Specific metrics and percentages
- Logical flow between sections
- Actionable insights
- 500-1000 words
Structure:
1. Executive Summary
2. Key Findings
3. Detailed Analysis
4. Recommendations
"""
narrative = await call_claude(narrative_prompt)
return narrative
Streaming Implementation
Server-Sent Events
Stream report generation progress:
async def stream_report_generation(request_id: str, user_prompt: str):
"""Stream report generation with SSE"""
async def event_generator():
context = ConversationContext(warehouse_id)
# Yield status updates
yield sse_event('status', {'message': 'Analyzing request...'})
# Intent analysis
intent = await analyze_user_intent(user_prompt, context)
yield sse_event('status', {'message': 'Planning queries...'})
# Execute queries
for query_plan in intent['query_sequence']:
yield sse_event('query', {'status': 'executing', 'query': query_plan})
results = await execute_query(query_plan)
yield sse_event('result', {
'rows': len(results),
'columns': list(results[0].keys()) if results else []
})
# Analyze results
patterns = detect_patterns(results)
insights = await generate_insights(patterns, context)
for insight in insights:
yield sse_event('insight', insight)
# Generate visualization
if should_visualize(results):
viz = await create_visualization(results, ...)
yield sse_event('visualization', viz)
# Complete
report = assemble_report(context.insights, context.query_history, ...)
yield sse_event('completed', report)
return StreamingResponse(event_generator(), media_type='text/event-stream')
def sse_event(event_type: str, data: dict) -> str:
"""Format SSE event"""
return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
Performance Optimization
Query Caching
Cache frequently executed queries:
from functools import lru_cache
import hashlib
@lru_cache(maxsize=1000)
def cached_query_results(query_hash: str):
"""Cache query results"""
# Fetch from cache or execute
pass
def hash_query(query: str, params: dict) -> str:
"""Generate cache key"""
cache_key = f"{query}_{json.dumps(params, sort_keys=True)}"
return hashlib.sha256(cache_key.encode()).hexdigest()
Parallel Query Execution
Execute independent queries in parallel:
import asyncio
async def execute_queries_parallel(queries: list):
"""Execute multiple queries in parallel"""
tasks = [execute_query(q) for q in queries]
results = await asyncio.gather(*tasks)
return results
Next Steps
- Growth Tickets Architecture - Automated recommendation system
- ICP Analysis - Customer profiling
- Report Builder API - API integration
Resources
- Claude API Documentation: anthropic.com/docs
- BigQuery Best Practices: cloud.google.com/bigquery/docs/best-practices
- Plotly Documentation: plotly.com/python
Talk to Our Technical Team
Schedule a technical consultation to discuss your integration requirements and implementation strategy.
Schedule Demo