Growth Tickets System Architecture
Technical architecture of Cogny's automated growth ticket generation system, including recommendation algorithms, prioritization logic, and integration patterns.
Overview
Growth Tickets are AI-generated, actionable recommendations automatically derived from data analysis. The system continuously monitors analytics data, identifies opportunities, and generates prioritized action items for growth teams.
Key Features:
- Automated opportunity detection
- Impact estimation
- Priority scoring
- Action item generation
- Project management integration
System Architecture
┌─────────────────────────────────────────────────────────────┐
│ Data Sources │
├─────────────┬──────────────┬─────────────┬─────────────────┤
│ Analytics │ Reports │ ICP Data │ Historical │
└──────┬──────┴──────┬───────┴──────┬──────┴─────────┬───────┘
│ │ │ │
└──────────────┴──────────────┴────────────────┘
│
┌────────────▼────────────┐
│ Opportunity Detection │
│ - Pattern recognition │
│ - Anomaly detection │
│ - Trend analysis │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Recommendation Engine │
│ - Solution matching │
│ - Impact estimation │
│ - Effort calculation │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Priority Scoring │
│ - ICE framework │
│ - Business context │
│ - Resource availability│
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Action Item Generator │
│ - Task breakdown │
│ - Time estimation │
│ - Dependency mapping │
└────────────┬────────────┘
│
┌────────────▼────────────┐
│ Ticket Management │
│ - Deduplication │
│ - Status tracking │
│ - API exposure │
└─────────────────────────┘
Opportunity Detection
Pattern Recognition
Identify growth opportunities from data patterns:
class OpportunityDetector:
def __init__(self, warehouse_id: str):
self.warehouse_id = warehouse_id
self.analyzers = [
ConversionFunnelAnalyzer(),
TrafficSourceAnalyzer(),
UserSegmentAnalyzer(),
ProductPerformanceAnalyzer(),
TechnicalPerformanceAnalyzer()
]
async def detect_opportunities(self) -> list:
"""Detect all growth opportunities"""
opportunities = []
for analyzer in self.analyzers:
try:
detected = await analyzer.analyze(self.warehouse_id)
opportunities.extend(detected)
except Exception as e:
logger.error(f"Analyzer {analyzer.__class__.__name__} failed: {e}")
return opportunities
Funnel Opportunity Detection
Identify conversion funnel issues:
class ConversionFunnelAnalyzer:
async def analyze(self, warehouse_id: str) -> list:
"""Analyze conversion funnel for opportunities"""
opportunities = []
# Get funnel data
funnel_data = await self.get_funnel_data(warehouse_id)
# Check drop-off rates
for i in range(len(funnel_data) - 1):
current_step = funnel_data[i]
next_step = funnel_data[i + 1]
drop_off_rate = 1 - (next_step['users'] / current_step['users'])
# Significant drop-off
if drop_off_rate > 0.5:
opportunity = {
'type': 'funnel_optimization',
'title': f"High drop-off at {next_step['name']} step",
'description': (
f"{drop_off_rate:.1%} of users drop off between "
f"{current_step['name']} and {next_step['name']}"
),
'impact_data': {
'current_drop_off': drop_off_rate,
'users_affected': current_step['users'] - next_step['users'],
'step_from': current_step['name'],
'step_to': next_step['name']
},
'confidence': self.calculate_confidence(current_step['users'])
}
opportunities.append(opportunity)
# Check device-specific performance
device_funnel = await self.get_funnel_by_device(warehouse_id)
for device, funnel in device_funnel.items():
conversion_rate = funnel[-1]['users'] / funnel[0]['users']
avg_conversion = self.get_average_conversion_rate(device_funnel)
# Underperforming device
if conversion_rate < avg_conversion * 0.7:
opportunity = {
'type': 'device_optimization',
'title': f"Low {device} conversion rate",
'description': (
f"{device.capitalize()} conversion rate ({conversion_rate:.2%}) "
f"is {(1 - conversion_rate/avg_conversion):.0%} below average"
),
'impact_data': {
'device': device,
'current_rate': conversion_rate,
'average_rate': avg_conversion,
'users_affected': funnel[0]['users']
},
'confidence': 0.85
}
opportunities.append(opportunity)
return opportunities
async def get_funnel_data(self, warehouse_id: str) -> list:
"""Get funnel step data"""
query = """
WITH funnel_steps AS (
SELECT
user_pseudo_id,
COUNTIF(event_name = 'page_view') as step_1,
COUNTIF(event_name = 'view_item') as step_2,
COUNTIF(event_name = 'add_to_cart') as step_3,
COUNTIF(event_name = 'begin_checkout') as step_4,
COUNTIF(event_name = 'purchase') as step_5
FROM `{project}.{dataset}.events_*`
WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY))
AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
GROUP BY user_pseudo_id
)
SELECT
'Landing' as name,
1 as step_number,
COUNT(DISTINCT CASE WHEN step_1 > 0 THEN user_pseudo_id END) as users
FROM funnel_steps
UNION ALL
SELECT 'Product View', 2, COUNT(DISTINCT CASE WHEN step_2 > 0 THEN user_pseudo_id END)
FROM funnel_steps
UNION ALL
SELECT 'Add to Cart', 3, COUNT(DISTINCT CASE WHEN step_3 > 0 THEN user_pseudo_id END)
FROM funnel_steps
UNION ALL
SELECT 'Checkout', 4, COUNT(DISTINCT CASE WHEN step_4 > 0 THEN user_pseudo_id END)
FROM funnel_steps
UNION ALL
SELECT 'Purchase', 5, COUNT(DISTINCT CASE WHEN step_5 > 0 THEN user_pseudo_id END)
FROM funnel_steps
ORDER BY step_number
"""
results = await execute_query(warehouse_id, query)
return results
Traffic Source Analysis
Identify channel optimization opportunities:
class TrafficSourceAnalyzer:
async def analyze(self, warehouse_id: str) -> list:
"""Analyze traffic sources for opportunities"""
opportunities = []
# Get channel performance
channel_data = await self.get_channel_performance(warehouse_id)
# Identify underperforming channels
avg_conversion = sum(c['conversion_rate'] for c in channel_data) / len(channel_data)
for channel in channel_data:
# High traffic, low conversion
if (channel['sessions'] > 1000 and
channel['conversion_rate'] < avg_conversion * 0.5):
opportunity = {
'type': 'channel_optimization',
'title': f"Low conversion rate from {channel['source']} traffic",
'description': (
f"{channel['source']} drives {channel['sessions']:,} sessions "
f"but only {channel['conversion_rate']:.2%} convert, "
f"significantly below the average of {avg_conversion:.2%}"
),
'impact_data': {
'source': channel['source'],
'medium': channel['medium'],
'sessions': channel['sessions'],
'current_conversion': channel['conversion_rate'],
'target_conversion': avg_conversion
},
'confidence': 0.9
}
opportunities.append(opportunity)
# High conversion, low traffic
if (channel['conversion_rate'] > avg_conversion * 1.5 and
channel['sessions'] < 500):
opportunity = {
'type': 'acquisition_scaling',
'title': f"Scale high-performing {channel['source']} channel",
'description': (
f"{channel['source']} has {channel['conversion_rate']:.2%} conversion rate "
f"but only {channel['sessions']:,} sessions. Opportunity to scale."
),
'impact_data': {
'source': channel['source'],
'medium': channel['medium'],
'sessions': channel['sessions'],
'conversion_rate': channel['conversion_rate']
},
'confidence': 0.85
}
opportunities.append(opportunity)
return opportunities
Recommendation Engine
Solution Matching
Match opportunities to proven solutions:
class RecommendationEngine:
def __init__(self):
self.solution_database = SolutionDatabase()
async def generate_recommendations(
self,
opportunity: dict
) -> dict:
"""Generate recommendation from opportunity"""
# Match to solution patterns
solutions = self.solution_database.find_solutions(
opportunity_type=opportunity['type'],
context=opportunity['impact_data']
)
# Rank solutions by expected impact
ranked_solutions = self.rank_solutions(solutions, opportunity)
# Select best solution
best_solution = ranked_solutions[0]
# Generate action items
action_items = self.generate_action_items(best_solution, opportunity)
# Estimate impact
impact_estimate = self.estimate_impact(opportunity, best_solution)
recommendation = {
'title': opportunity['title'],
'description': opportunity['description'],
'category': opportunity['type'],
'solution': best_solution,
'action_items': action_items,
'estimated_impact': impact_estimate,
'confidence': opportunity['confidence']
}
return recommendation
def rank_solutions(self, solutions: list, opportunity: dict) -> list:
"""Rank solutions by expected effectiveness"""
scored_solutions = []
for solution in solutions:
# Calculate relevance score
relevance = self.calculate_relevance(solution, opportunity)
# Historical success rate
success_rate = solution.get('success_rate', 0.5)
# Implementation difficulty
difficulty = solution.get('difficulty', 'medium')
difficulty_penalty = {'easy': 0, 'medium': 0.1, 'hard': 0.2}[difficulty]
# Combined score
score = relevance * success_rate * (1 - difficulty_penalty)
scored_solutions.append({
'solution': solution,
'score': score
})
# Sort by score
scored_solutions.sort(key=lambda x: x['score'], reverse=True)
return [s['solution'] for s in scored_solutions]
Impact Estimation
Estimate potential impact of recommendations:
class ImpactEstimator:
async def estimate_impact(
self,
opportunity: dict,
solution: dict
) -> dict:
"""Estimate impact of implementing solution"""
impact = {
'metric': None,
'current_value': None,
'projected_value': None,
'increase_percentage': None,
'confidence': None
}
if opportunity['type'] == 'funnel_optimization':
# Estimate conversion rate improvement
current_drop_off = opportunity['impact_data']['current_drop_off']
users_affected = opportunity['impact_data']['users_affected']
# Assume 20-40% improvement in drop-off rate
improvement_factor = solution.get('expected_improvement', 0.3)
new_drop_off = current_drop_off * (1 - improvement_factor)
impact.update({
'metric': 'conversion_rate',
'current_value': 1 - current_drop_off,
'projected_value': 1 - new_drop_off,
'increase_percentage': (
((1 - new_drop_off) / (1 - current_drop_off) - 1) * 100
),
'additional_conversions': int(
users_affected * improvement_factor * (1 - current_drop_off)
),
'confidence': 0.7
})
elif opportunity['type'] == 'channel_optimization':
# Estimate conversion rate improvement for channel
current_conversion = opportunity['impact_data']['current_conversion']
target_conversion = opportunity['impact_data']['target_conversion']
sessions = opportunity['impact_data']['sessions']
# Assume reaching 70% of target conversion
projected_conversion = (
current_conversion + (target_conversion - current_conversion) * 0.7
)
impact.update({
'metric': 'conversion_rate',
'current_value': current_conversion,
'projected_value': projected_conversion,
'increase_percentage': (
(projected_conversion / current_conversion - 1) * 100
),
'additional_conversions': int(
sessions * (projected_conversion - current_conversion)
),
'confidence': 0.65
})
return impact
Priority Scoring
ICE Framework
Use Impact, Confidence, Ease framework:
class PriorityScorer:
def calculate_priority(
self,
recommendation: dict,
context: dict = None
) -> dict:
"""Calculate priority score using ICE framework"""
# Impact (1-10)
impact_score = self.calculate_impact_score(
recommendation['estimated_impact']
)
# Confidence (1-10)
confidence_score = recommendation['confidence'] * 10
# Ease (1-10)
ease_score = self.calculate_ease_score(
recommendation['action_items']
)
# ICE score
ice_score = (impact_score * confidence_score * ease_score) / 100
# Priority tier
if ice_score >= 7:
priority = 'critical'
elif ice_score >= 5:
priority = 'high'
elif ice_score >= 3:
priority = 'medium'
else:
priority = 'low'
return {
'priority': priority,
'ice_score': ice_score,
'impact_score': impact_score,
'confidence_score': confidence_score,
'ease_score': ease_score
}
def calculate_impact_score(self, impact: dict) -> float:
"""Calculate impact score (1-10)"""
if not impact:
return 5.0
increase_pct = impact.get('increase_percentage', 0)
# Map percentage increase to 1-10 scale
if increase_pct >= 50:
return 10.0
elif increase_pct >= 30:
return 8.0
elif increase_pct >= 20:
return 6.0
elif increase_pct >= 10:
return 4.0
else:
return 2.0
def calculate_ease_score(self, action_items: list) -> float:
"""Calculate ease score (1-10)"""
if not action_items:
return 5.0
total_hours = sum(item.get('estimated_hours', 0) for item in action_items)
# Map estimated hours to ease score (inverse)
if total_hours <= 4:
return 10.0
elif total_hours <= 8:
return 8.0
elif total_hours <= 16:
return 6.0
elif total_hours <= 40:
return 4.0
else:
return 2.0
Action Item Generation
Task Breakdown
Break recommendations into actionable tasks:
class ActionItemGenerator:
async def generate_action_items(
self,
solution: dict,
opportunity: dict
) -> list:
"""Generate specific action items"""
# Use Claude to generate detailed action items
prompt = f"""
Generate specific, actionable tasks to implement this solution:
Problem: {opportunity['description']}
Solution: {solution['description']}
Context: {json.dumps(opportunity['impact_data'], indent=2)}
For each action item, provide:
1. Clear title (what to do)
2. Detailed description (how to do it)
3. Estimated hours to complete
4. Required skills/resources
5. Dependencies (if any)
Generate 3-7 action items in order of execution.
Return as JSON array.
"""
response = await call_claude(prompt)
action_items = json.loads(response)
# Enhance with technical details
for item in action_items:
item['completed'] = False
item['id'] = generate_id()
# Add code examples if relevant
if self.requires_code(item):
item['code_example'] = await self.generate_code_example(item)
return action_items
async def generate_code_example(self, action_item: dict) -> str:
"""Generate code example for technical tasks"""
prompt = f"""
Generate a code example for this task:
Task: {action_item['title']}
Description: {action_item['description']}
Provide practical, production-ready code with comments.
"""
code = await call_claude(prompt)
return code
Deduplication
Similarity Detection
Prevent duplicate tickets:
class TicketDeduplicator:
def __init__(self):
self.vectorizer = TfidfVectorizer()
self.similarity_threshold = 0.8
async def find_duplicates(
self,
new_ticket: dict,
existing_tickets: list
) -> list:
"""Find potential duplicate tickets"""
if not existing_tickets:
return []
# Vectorize ticket descriptions
all_descriptions = [new_ticket['description']] + [
t['description'] for t in existing_tickets
]
vectors = self.vectorizer.fit_transform(all_descriptions)
# Calculate cosine similarity
from sklearn.metrics.pairwise import cosine_similarity
similarities = cosine_similarity(vectors[0:1], vectors[1:]).flatten()
# Find similar tickets
duplicates = []
for idx, similarity in enumerate(similarities):
if similarity >= self.similarity_threshold:
duplicates.append({
'ticket': existing_tickets[idx],
'similarity': similarity
})
return duplicates
async def merge_similar_tickets(
self,
ticket1: dict,
ticket2: dict
) -> dict:
"""Merge similar tickets"""
# Combine action items
all_actions = ticket1['action_items'] + ticket2['action_items']
# Deduplicate actions
unique_actions = self.deduplicate_actions(all_actions)
# Take higher impact estimate
impact = (
ticket1['estimated_impact']
if ticket1['estimated_impact']['increase_percentage'] >
ticket2['estimated_impact']['increase_percentage']
else ticket2['estimated_impact']
)
merged = {
'title': ticket1['title'],
'description': self.merge_descriptions(
ticket1['description'],
ticket2['description']
),
'action_items': unique_actions,
'estimated_impact': impact,
'merged_from': [ticket1['id'], ticket2['id']]
}
return merged
Storage and Retrieval
Database Schema
Store tickets in structured format:
CREATE TABLE growth_tickets (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
warehouse_id UUID NOT NULL REFERENCES warehouses(id),
title VARCHAR(255) NOT NULL,
description TEXT NOT NULL,
category VARCHAR(50) NOT NULL,
priority VARCHAR(20) NOT NULL,
status VARCHAR(20) DEFAULT 'open',
-- Impact estimation
estimated_impact JSONB NOT NULL,
confidence FLOAT NOT NULL,
-- Priority scoring
ice_score FLOAT NOT NULL,
impact_score FLOAT NOT NULL,
confidence_score FLOAT NOT NULL,
ease_score FLOAT NOT NULL,
-- Action items
action_items JSONB NOT NULL,
-- Supporting data
supporting_data JSONB,
related_report_id UUID REFERENCES reports(id),
-- Metadata
assignee UUID REFERENCES users(id),
notes TEXT,
dismissed_at TIMESTAMP,
dismiss_reason VARCHAR(50),
-- Timestamps
created_at TIMESTAMP DEFAULT NOW(),
updated_at TIMESTAMP DEFAULT NOW(),
completed_at TIMESTAMP,
-- Indexes
INDEX idx_warehouse_status (warehouse_id, status),
INDEX idx_priority (priority),
INDEX idx_category (category),
INDEX idx_ice_score (ice_score DESC)
);
Query Optimization
Efficient ticket retrieval:
async def get_tickets(
warehouse_id: str,
filters: dict = None,
sort: str = '-ice_score',
limit: int = 50,
cursor: str = None
) -> dict:
"""Get tickets with filtering and pagination"""
query = """
SELECT *
FROM growth_tickets
WHERE warehouse_id = $1
"""
params = [warehouse_id]
param_count = 1
# Apply filters
if filters:
if filters.get('status'):
param_count += 1
query += f" AND status = ${param_count}"
params.append(filters['status'])
if filters.get('priority'):
param_count += 1
query += f" AND priority = ${param_count}"
params.append(filters['priority'])
if filters.get('category'):
param_count += 1
query += f" AND category = ${param_count}"
params.append(filters['category'])
# Apply sorting
sort_field = sort.lstrip('-')
sort_direction = 'DESC' if sort.startswith('-') else 'ASC'
query += f" ORDER BY {sort_field} {sort_direction}"
# Apply pagination
if cursor:
param_count += 1
query += f" AND id > ${param_count}"
params.append(cursor)
param_count += 1
query += f" LIMIT ${param_count}"
params.append(limit)
results = await db.fetch(query, *params)
return {
'data': results,
'pagination': {
'has_more': len(results) == limit,
'next_cursor': results[-1]['id'] if results else None
}
}
Integration Patterns
Webhook Notifications
Notify external systems of new tickets:
async def notify_ticket_created(ticket: dict):
"""Send webhook notification for new ticket"""
webhook_url = await get_webhook_url(ticket['warehouse_id'])
if not webhook_url:
return
payload = {
'event': 'ticket.created',
'data': {
'ticket_id': ticket['id'],
'title': ticket['title'],
'priority': ticket['priority'],
'category': ticket['category'],
'estimated_impact': ticket['estimated_impact'],
'url': f"https://app.cogny.com/tickets/{ticket['id']}"
},
'timestamp': datetime.now().isoformat()
}
# Sign payload
signature = sign_payload(payload, webhook_secret)
try:
async with httpx.AsyncClient() as client:
response = await client.post(
webhook_url,
json=payload,
headers={
'X-Cogny-Signature': signature,
'Content-Type': 'application/json'
},
timeout=10.0
)
response.raise_for_status()
except Exception as e:
logger.error(f"Webhook delivery failed: {e}")
JIRA Synchronization
Sync tickets to JIRA:
from jira import JIRA
class JIRAIntegration:
def __init__(self, jira_url: str, credentials: dict):
self.jira = JIRA(
server=jira_url,
basic_auth=(credentials['email'], credentials['api_token'])
)
async def sync_ticket(self, ticket: dict, project_key: str):
"""Sync Cogny ticket to JIRA"""
# Create JIRA issue
issue_dict = {
'project': {'key': project_key},
'summary': ticket['title'],
'description': self.format_description(ticket),
'issuetype': {'name': 'Task'},
'priority': {'name': self.map_priority(ticket['priority'])},
'labels': [ticket['category'], 'cogny-generated']
}
jira_issue = self.jira.create_issue(fields=issue_dict)
# Create subtasks for action items
for action_item in ticket['action_items']:
subtask_dict = {
'project': {'key': project_key},
'summary': action_item['title'],
'description': action_item['description'],
'issuetype': {'name': 'Sub-task'},
'parent': {'key': jira_issue.key}
}
self.jira.create_issue(fields=subtask_dict)
return jira_issue.key
def format_description(self, ticket: dict) -> str:
"""Format ticket description for JIRA"""
description = f"{ticket['description']}\n\n"
description += f"*Category:* {ticket['category']}\n"
description += f"*Priority:* {ticket['priority']}\n"
description += f"*ICE Score:* {ticket['ice_score']:.1f}\n\n"
impact = ticket['estimated_impact']
description += "*Estimated Impact:*\n"
description += f"- Metric: {impact['metric']}\n"
description += f"- Improvement: +{impact['increase_percentage']:.1f}%\n"
description += f"- Confidence: {impact['confidence']:.0%}\n\n"
description += f"[View in Cogny|{ticket['url']}]"
return description
Performance Monitoring
Ticket Metrics
Track ticket system performance:
-- Ticket creation rate
SELECT
DATE_TRUNC('day', created_at) as date,
COUNT(*) as tickets_created,
AVG(ice_score) as avg_ice_score
FROM growth_tickets
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1
ORDER BY 1 DESC;
-- Completion rate by priority
SELECT
priority,
COUNT(*) as total,
COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed,
AVG(EXTRACT(EPOCH FROM (completed_at - created_at)) / 86400) as avg_days_to_complete
FROM growth_tickets
WHERE created_at >= CURRENT_DATE - INTERVAL '90 days'
GROUP BY 1;
-- Category distribution
SELECT
category,
COUNT(*) as count,
AVG(ice_score) as avg_priority,
COUNT(CASE WHEN status = 'completed' THEN 1 END)::FLOAT / COUNT(*) as completion_rate
FROM growth_tickets
WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
GROUP BY 1
ORDER BY count DESC;
Next Steps
- Growth Tickets API - API integration
- ICP Analysis - Customer profiling
- AI Report Generation - Report system
Resources
- ICE Prioritization: hygger.io/blog/ice-scoring-model
- JIRA API: developer.atlassian.com/cloud/jira
- sklearn Documentation: scikit-learn.org
Talk to Our Technical Team
Schedule a technical consultation to discuss your integration requirements and implementation strategy.
Schedule Demo