Back to Documentation
    DocumentationfeaturesFeb 21, 2025

    Growth Tickets System Architecture

    Technical architecture of Cogny's automated growth ticket generation system, including recommendation algorithms, prioritization logic, and integration patterns.

    Overview

    Growth Tickets are AI-generated, actionable recommendations automatically derived from data analysis. The system continuously monitors analytics data, identifies opportunities, and generates prioritized action items for growth teams.

    Key Features:

    • Automated opportunity detection
    • Impact estimation
    • Priority scoring
    • Action item generation
    • Project management integration

    System Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │ Data Sources │
    ├─────────────┬──────────────┬─────────────┬─────────────────┤
    │ Analytics │ Reports │ ICP Data │ Historical │
    └──────┬──────┴──────┬───────┴──────┬──────┴─────────┬───────┘
    │ │ │ │
    └──────────────┴──────────────┴────────────────┘

    ┌────────────▼────────────┐
    │ Opportunity Detection │
    │ - Pattern recognition │
    │ - Anomaly detection │
    │ - Trend analysis │
    └────────────┬────────────┘

    ┌────────────▼────────────┐
    │ Recommendation Engine │
    │ - Solution matching │
    │ - Impact estimation │
    │ - Effort calculation │
    └────────────┬────────────┘

    ┌────────────▼────────────┐
    │ Priority Scoring │
    │ - ICE framework │
    │ - Business context │
    │ - Resource availability│
    └────────────┬────────────┘

    ┌────────────▼────────────┐
    │ Action Item Generator │
    │ - Task breakdown │
    │ - Time estimation │
    │ - Dependency mapping │
    └────────────┬────────────┘

    ┌────────────▼────────────┐
    │ Ticket Management │
    │ - Deduplication │
    │ - Status tracking │
    │ - API exposure │
    └─────────────────────────┘

    Opportunity Detection

    Pattern Recognition

    Identify growth opportunities from data patterns:

    class OpportunityDetector:
    def __init__(self, warehouse_id: str):
    self.warehouse_id = warehouse_id
    self.analyzers = [
    ConversionFunnelAnalyzer(),
    TrafficSourceAnalyzer(),
    UserSegmentAnalyzer(),
    ProductPerformanceAnalyzer(),
    TechnicalPerformanceAnalyzer()
    ]

    async def detect_opportunities(self) -> list:
    """Detect all growth opportunities"""
    opportunities = []

    for analyzer in self.analyzers:
    try:
    detected = await analyzer.analyze(self.warehouse_id)
    opportunities.extend(detected)
    except Exception as e:
    logger.error(f"Analyzer {analyzer.__class__.__name__} failed: {e}")

    return opportunities

    Funnel Opportunity Detection

    Identify conversion funnel issues:

    class ConversionFunnelAnalyzer:
    async def analyze(self, warehouse_id: str) -> list:
    """Analyze conversion funnel for opportunities"""
    opportunities = []

    # Get funnel data
    funnel_data = await self.get_funnel_data(warehouse_id)

    # Check drop-off rates
    for i in range(len(funnel_data) - 1):
    current_step = funnel_data[i]
    next_step = funnel_data[i + 1]

    drop_off_rate = 1 - (next_step['users'] / current_step['users'])

    # Significant drop-off
    if drop_off_rate > 0.5:
    opportunity = {
    'type': 'funnel_optimization',
    'title': f"High drop-off at {next_step['name']} step",
    'description': (
    f"{drop_off_rate:.1%} of users drop off between "
    f"{current_step['name']} and {next_step['name']}"
    ),
    'impact_data': {
    'current_drop_off': drop_off_rate,
    'users_affected': current_step['users'] - next_step['users'],
    'step_from': current_step['name'],
    'step_to': next_step['name']
    },
    'confidence': self.calculate_confidence(current_step['users'])
    }
    opportunities.append(opportunity)

    # Check device-specific performance
    device_funnel = await self.get_funnel_by_device(warehouse_id)

    for device, funnel in device_funnel.items():
    conversion_rate = funnel[-1]['users'] / funnel[0]['users']
    avg_conversion = self.get_average_conversion_rate(device_funnel)

    # Underperforming device
    if conversion_rate < avg_conversion * 0.7:
    opportunity = {
    'type': 'device_optimization',
    'title': f"Low {device} conversion rate",
    'description': (
    f"{device.capitalize()} conversion rate ({conversion_rate:.2%}) "
    f"is {(1 - conversion_rate/avg_conversion):.0%} below average"
    ),
    'impact_data': {
    'device': device,
    'current_rate': conversion_rate,
    'average_rate': avg_conversion,
    'users_affected': funnel[0]['users']
    },
    'confidence': 0.85
    }
    opportunities.append(opportunity)

    return opportunities

    async def get_funnel_data(self, warehouse_id: str) -> list:
    """Get funnel step data"""
    query = """
    WITH funnel_steps AS (
    SELECT
    user_pseudo_id,
    COUNTIF(event_name = 'page_view') as step_1,
    COUNTIF(event_name = 'view_item') as step_2,
    COUNTIF(event_name = 'add_to_cart') as step_3,
    COUNTIF(event_name = 'begin_checkout') as step_4,
    COUNTIF(event_name = 'purchase') as step_5
    FROM `{project}.{dataset}.events_*`
    WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY))
    AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
    GROUP BY user_pseudo_id
    )

    SELECT
    'Landing' as name,
    1 as step_number,
    COUNT(DISTINCT CASE WHEN step_1 > 0 THEN user_pseudo_id END) as users
    FROM funnel_steps
    UNION ALL
    SELECT 'Product View', 2, COUNT(DISTINCT CASE WHEN step_2 > 0 THEN user_pseudo_id END)
    FROM funnel_steps
    UNION ALL
    SELECT 'Add to Cart', 3, COUNT(DISTINCT CASE WHEN step_3 > 0 THEN user_pseudo_id END)
    FROM funnel_steps
    UNION ALL
    SELECT 'Checkout', 4, COUNT(DISTINCT CASE WHEN step_4 > 0 THEN user_pseudo_id END)
    FROM funnel_steps
    UNION ALL
    SELECT 'Purchase', 5, COUNT(DISTINCT CASE WHEN step_5 > 0 THEN user_pseudo_id END)
    FROM funnel_steps
    ORDER BY step_number
    """

    results = await execute_query(warehouse_id, query)
    return results

    Traffic Source Analysis

    Identify channel optimization opportunities:

    class TrafficSourceAnalyzer:
    async def analyze(self, warehouse_id: str) -> list:
    """Analyze traffic sources for opportunities"""
    opportunities = []

    # Get channel performance
    channel_data = await self.get_channel_performance(warehouse_id)

    # Identify underperforming channels
    avg_conversion = sum(c['conversion_rate'] for c in channel_data) / len(channel_data)

    for channel in channel_data:
    # High traffic, low conversion
    if (channel['sessions'] > 1000 and
    channel['conversion_rate'] < avg_conversion * 0.5):

    opportunity = {
    'type': 'channel_optimization',
    'title': f"Low conversion rate from {channel['source']} traffic",
    'description': (
    f"{channel['source']} drives {channel['sessions']:,} sessions "
    f"but only {channel['conversion_rate']:.2%} convert, "
    f"significantly below the average of {avg_conversion:.2%}"
    ),
    'impact_data': {
    'source': channel['source'],
    'medium': channel['medium'],
    'sessions': channel['sessions'],
    'current_conversion': channel['conversion_rate'],
    'target_conversion': avg_conversion
    },
    'confidence': 0.9
    }
    opportunities.append(opportunity)

    # High conversion, low traffic
    if (channel['conversion_rate'] > avg_conversion * 1.5 and
    channel['sessions'] < 500):

    opportunity = {
    'type': 'acquisition_scaling',
    'title': f"Scale high-performing {channel['source']} channel",
    'description': (
    f"{channel['source']} has {channel['conversion_rate']:.2%} conversion rate "
    f"but only {channel['sessions']:,} sessions. Opportunity to scale."
    ),
    'impact_data': {
    'source': channel['source'],
    'medium': channel['medium'],
    'sessions': channel['sessions'],
    'conversion_rate': channel['conversion_rate']
    },
    'confidence': 0.85
    }
    opportunities.append(opportunity)

    return opportunities

    Recommendation Engine

    Solution Matching

    Match opportunities to proven solutions:

    class RecommendationEngine:
    def __init__(self):
    self.solution_database = SolutionDatabase()

    async def generate_recommendations(
    self,
    opportunity: dict
    ) -> dict:
    """Generate recommendation from opportunity"""

    # Match to solution patterns
    solutions = self.solution_database.find_solutions(
    opportunity_type=opportunity['type'],
    context=opportunity['impact_data']
    )

    # Rank solutions by expected impact
    ranked_solutions = self.rank_solutions(solutions, opportunity)

    # Select best solution
    best_solution = ranked_solutions[0]

    # Generate action items
    action_items = self.generate_action_items(best_solution, opportunity)

    # Estimate impact
    impact_estimate = self.estimate_impact(opportunity, best_solution)

    recommendation = {
    'title': opportunity['title'],
    'description': opportunity['description'],
    'category': opportunity['type'],
    'solution': best_solution,
    'action_items': action_items,
    'estimated_impact': impact_estimate,
    'confidence': opportunity['confidence']
    }

    return recommendation

    def rank_solutions(self, solutions: list, opportunity: dict) -> list:
    """Rank solutions by expected effectiveness"""

    scored_solutions = []

    for solution in solutions:
    # Calculate relevance score
    relevance = self.calculate_relevance(solution, opportunity)

    # Historical success rate
    success_rate = solution.get('success_rate', 0.5)

    # Implementation difficulty
    difficulty = solution.get('difficulty', 'medium')
    difficulty_penalty = {'easy': 0, 'medium': 0.1, 'hard': 0.2}[difficulty]

    # Combined score
    score = relevance * success_rate * (1 - difficulty_penalty)

    scored_solutions.append({
    'solution': solution,
    'score': score
    })

    # Sort by score
    scored_solutions.sort(key=lambda x: x['score'], reverse=True)

    return [s['solution'] for s in scored_solutions]

    Impact Estimation

    Estimate potential impact of recommendations:

    class ImpactEstimator:
    async def estimate_impact(
    self,
    opportunity: dict,
    solution: dict
    ) -> dict:
    """Estimate impact of implementing solution"""

    impact = {
    'metric': None,
    'current_value': None,
    'projected_value': None,
    'increase_percentage': None,
    'confidence': None
    }

    if opportunity['type'] == 'funnel_optimization':
    # Estimate conversion rate improvement
    current_drop_off = opportunity['impact_data']['current_drop_off']
    users_affected = opportunity['impact_data']['users_affected']

    # Assume 20-40% improvement in drop-off rate
    improvement_factor = solution.get('expected_improvement', 0.3)
    new_drop_off = current_drop_off * (1 - improvement_factor)

    impact.update({
    'metric': 'conversion_rate',
    'current_value': 1 - current_drop_off,
    'projected_value': 1 - new_drop_off,
    'increase_percentage': (
    ((1 - new_drop_off) / (1 - current_drop_off) - 1) * 100
    ),
    'additional_conversions': int(
    users_affected * improvement_factor * (1 - current_drop_off)
    ),
    'confidence': 0.7
    })

    elif opportunity['type'] == 'channel_optimization':
    # Estimate conversion rate improvement for channel
    current_conversion = opportunity['impact_data']['current_conversion']
    target_conversion = opportunity['impact_data']['target_conversion']
    sessions = opportunity['impact_data']['sessions']

    # Assume reaching 70% of target conversion
    projected_conversion = (
    current_conversion + (target_conversion - current_conversion) * 0.7
    )

    impact.update({
    'metric': 'conversion_rate',
    'current_value': current_conversion,
    'projected_value': projected_conversion,
    'increase_percentage': (
    (projected_conversion / current_conversion - 1) * 100
    ),
    'additional_conversions': int(
    sessions * (projected_conversion - current_conversion)
    ),
    'confidence': 0.65
    })

    return impact

    Priority Scoring

    ICE Framework

    Use Impact, Confidence, Ease framework:

    class PriorityScorer:
    def calculate_priority(
    self,
    recommendation: dict,
    context: dict = None
    ) -> dict:
    """Calculate priority score using ICE framework"""

    # Impact (1-10)
    impact_score = self.calculate_impact_score(
    recommendation['estimated_impact']
    )

    # Confidence (1-10)
    confidence_score = recommendation['confidence'] * 10

    # Ease (1-10)
    ease_score = self.calculate_ease_score(
    recommendation['action_items']
    )

    # ICE score
    ice_score = (impact_score * confidence_score * ease_score) / 100

    # Priority tier
    if ice_score >= 7:
    priority = 'critical'
    elif ice_score >= 5:
    priority = 'high'
    elif ice_score >= 3:
    priority = 'medium'
    else:
    priority = 'low'

    return {
    'priority': priority,
    'ice_score': ice_score,
    'impact_score': impact_score,
    'confidence_score': confidence_score,
    'ease_score': ease_score
    }

    def calculate_impact_score(self, impact: dict) -> float:
    """Calculate impact score (1-10)"""

    if not impact:
    return 5.0

    increase_pct = impact.get('increase_percentage', 0)

    # Map percentage increase to 1-10 scale
    if increase_pct >= 50:
    return 10.0
    elif increase_pct >= 30:
    return 8.0
    elif increase_pct >= 20:
    return 6.0
    elif increase_pct >= 10:
    return 4.0
    else:
    return 2.0

    def calculate_ease_score(self, action_items: list) -> float:
    """Calculate ease score (1-10)"""

    if not action_items:
    return 5.0

    total_hours = sum(item.get('estimated_hours', 0) for item in action_items)

    # Map estimated hours to ease score (inverse)
    if total_hours <= 4:
    return 10.0
    elif total_hours <= 8:
    return 8.0
    elif total_hours <= 16:
    return 6.0
    elif total_hours <= 40:
    return 4.0
    else:
    return 2.0

    Action Item Generation

    Task Breakdown

    Break recommendations into actionable tasks:

    class ActionItemGenerator:
    async def generate_action_items(
    self,
    solution: dict,
    opportunity: dict
    ) -> list:
    """Generate specific action items"""

    # Use Claude to generate detailed action items
    prompt = f"""
    Generate specific, actionable tasks to implement this solution:

    Problem: {opportunity['description']}
    Solution: {solution['description']}
    Context: {json.dumps(opportunity['impact_data'], indent=2)}

    For each action item, provide:
    1. Clear title (what to do)
    2. Detailed description (how to do it)
    3. Estimated hours to complete
    4. Required skills/resources
    5. Dependencies (if any)

    Generate 3-7 action items in order of execution.
    Return as JSON array.
    """

    response = await call_claude(prompt)
    action_items = json.loads(response)

    # Enhance with technical details
    for item in action_items:
    item['completed'] = False
    item['id'] = generate_id()

    # Add code examples if relevant
    if self.requires_code(item):
    item['code_example'] = await self.generate_code_example(item)

    return action_items

    async def generate_code_example(self, action_item: dict) -> str:
    """Generate code example for technical tasks"""

    prompt = f"""
    Generate a code example for this task:

    Task: {action_item['title']}
    Description: {action_item['description']}

    Provide practical, production-ready code with comments.
    """

    code = await call_claude(prompt)
    return code

    Deduplication

    Similarity Detection

    Prevent duplicate tickets:

    class TicketDeduplicator:
    def __init__(self):
    self.vectorizer = TfidfVectorizer()
    self.similarity_threshold = 0.8

    async def find_duplicates(
    self,
    new_ticket: dict,
    existing_tickets: list
    ) -> list:
    """Find potential duplicate tickets"""

    if not existing_tickets:
    return []

    # Vectorize ticket descriptions
    all_descriptions = [new_ticket['description']] + [
    t['description'] for t in existing_tickets
    ]

    vectors = self.vectorizer.fit_transform(all_descriptions)

    # Calculate cosine similarity
    from sklearn.metrics.pairwise import cosine_similarity

    similarities = cosine_similarity(vectors[0:1], vectors[1:]).flatten()

    # Find similar tickets
    duplicates = []
    for idx, similarity in enumerate(similarities):
    if similarity >= self.similarity_threshold:
    duplicates.append({
    'ticket': existing_tickets[idx],
    'similarity': similarity
    })

    return duplicates

    async def merge_similar_tickets(
    self,
    ticket1: dict,
    ticket2: dict
    ) -> dict:
    """Merge similar tickets"""

    # Combine action items
    all_actions = ticket1['action_items'] + ticket2['action_items']

    # Deduplicate actions
    unique_actions = self.deduplicate_actions(all_actions)

    # Take higher impact estimate
    impact = (
    ticket1['estimated_impact']
    if ticket1['estimated_impact']['increase_percentage'] >
    ticket2['estimated_impact']['increase_percentage']
    else ticket2['estimated_impact']
    )

    merged = {
    'title': ticket1['title'],
    'description': self.merge_descriptions(
    ticket1['description'],
    ticket2['description']
    ),
    'action_items': unique_actions,
    'estimated_impact': impact,
    'merged_from': [ticket1['id'], ticket2['id']]
    }

    return merged

    Storage and Retrieval

    Database Schema

    Store tickets in structured format:

    CREATE TABLE growth_tickets (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    warehouse_id UUID NOT NULL REFERENCES warehouses(id),
    title VARCHAR(255) NOT NULL,
    description TEXT NOT NULL,
    category VARCHAR(50) NOT NULL,
    priority VARCHAR(20) NOT NULL,
    status VARCHAR(20) DEFAULT 'open',

    -- Impact estimation
    estimated_impact JSONB NOT NULL,
    confidence FLOAT NOT NULL,

    -- Priority scoring
    ice_score FLOAT NOT NULL,
    impact_score FLOAT NOT NULL,
    confidence_score FLOAT NOT NULL,
    ease_score FLOAT NOT NULL,

    -- Action items
    action_items JSONB NOT NULL,

    -- Supporting data
    supporting_data JSONB,
    related_report_id UUID REFERENCES reports(id),

    -- Metadata
    assignee UUID REFERENCES users(id),
    notes TEXT,
    dismissed_at TIMESTAMP,
    dismiss_reason VARCHAR(50),

    -- Timestamps
    created_at TIMESTAMP DEFAULT NOW(),
    updated_at TIMESTAMP DEFAULT NOW(),
    completed_at TIMESTAMP,

    -- Indexes
    INDEX idx_warehouse_status (warehouse_id, status),
    INDEX idx_priority (priority),
    INDEX idx_category (category),
    INDEX idx_ice_score (ice_score DESC)
    );

    Query Optimization

    Efficient ticket retrieval:

    async def get_tickets(
    warehouse_id: str,
    filters: dict = None,
    sort: str = '-ice_score',
    limit: int = 50,
    cursor: str = None
    ) -> dict:
    """Get tickets with filtering and pagination"""

    query = """
    SELECT *
    FROM growth_tickets
    WHERE warehouse_id = $1
    """

    params = [warehouse_id]
    param_count = 1

    # Apply filters
    if filters:
    if filters.get('status'):
    param_count += 1
    query += f" AND status = ${param_count}"
    params.append(filters['status'])

    if filters.get('priority'):
    param_count += 1
    query += f" AND priority = ${param_count}"
    params.append(filters['priority'])

    if filters.get('category'):
    param_count += 1
    query += f" AND category = ${param_count}"
    params.append(filters['category'])

    # Apply sorting
    sort_field = sort.lstrip('-')
    sort_direction = 'DESC' if sort.startswith('-') else 'ASC'
    query += f" ORDER BY {sort_field} {sort_direction}"

    # Apply pagination
    if cursor:
    param_count += 1
    query += f" AND id > ${param_count}"
    params.append(cursor)

    param_count += 1
    query += f" LIMIT ${param_count}"
    params.append(limit)

    results = await db.fetch(query, *params)

    return {
    'data': results,
    'pagination': {
    'has_more': len(results) == limit,
    'next_cursor': results[-1]['id'] if results else None
    }
    }

    Integration Patterns

    Webhook Notifications

    Notify external systems of new tickets:

    async def notify_ticket_created(ticket: dict):
    """Send webhook notification for new ticket"""

    webhook_url = await get_webhook_url(ticket['warehouse_id'])

    if not webhook_url:
    return

    payload = {
    'event': 'ticket.created',
    'data': {
    'ticket_id': ticket['id'],
    'title': ticket['title'],
    'priority': ticket['priority'],
    'category': ticket['category'],
    'estimated_impact': ticket['estimated_impact'],
    'url': f"https://app.cogny.com/tickets/{ticket['id']}"
    },
    'timestamp': datetime.now().isoformat()
    }

    # Sign payload
    signature = sign_payload(payload, webhook_secret)

    try:
    async with httpx.AsyncClient() as client:
    response = await client.post(
    webhook_url,
    json=payload,
    headers={
    'X-Cogny-Signature': signature,
    'Content-Type': 'application/json'
    },
    timeout=10.0
    )
    response.raise_for_status()
    except Exception as e:
    logger.error(f"Webhook delivery failed: {e}")

    JIRA Synchronization

    Sync tickets to JIRA:

    from jira import JIRA

    class JIRAIntegration:
    def __init__(self, jira_url: str, credentials: dict):
    self.jira = JIRA(
    server=jira_url,
    basic_auth=(credentials['email'], credentials['api_token'])
    )

    async def sync_ticket(self, ticket: dict, project_key: str):
    """Sync Cogny ticket to JIRA"""

    # Create JIRA issue
    issue_dict = {
    'project': {'key': project_key},
    'summary': ticket['title'],
    'description': self.format_description(ticket),
    'issuetype': {'name': 'Task'},
    'priority': {'name': self.map_priority(ticket['priority'])},
    'labels': [ticket['category'], 'cogny-generated']
    }

    jira_issue = self.jira.create_issue(fields=issue_dict)

    # Create subtasks for action items
    for action_item in ticket['action_items']:
    subtask_dict = {
    'project': {'key': project_key},
    'summary': action_item['title'],
    'description': action_item['description'],
    'issuetype': {'name': 'Sub-task'},
    'parent': {'key': jira_issue.key}
    }
    self.jira.create_issue(fields=subtask_dict)

    return jira_issue.key

    def format_description(self, ticket: dict) -> str:
    """Format ticket description for JIRA"""

    description = f"{ticket['description']}\n\n"
    description += f"*Category:* {ticket['category']}\n"
    description += f"*Priority:* {ticket['priority']}\n"
    description += f"*ICE Score:* {ticket['ice_score']:.1f}\n\n"

    impact = ticket['estimated_impact']
    description += "*Estimated Impact:*\n"
    description += f"- Metric: {impact['metric']}\n"
    description += f"- Improvement: +{impact['increase_percentage']:.1f}%\n"
    description += f"- Confidence: {impact['confidence']:.0%}\n\n"

    description += f"[View in Cogny|{ticket['url']}]"

    return description

    Performance Monitoring

    Ticket Metrics

    Track ticket system performance:

    -- Ticket creation rate
    SELECT
    DATE_TRUNC('day', created_at) as date,
    COUNT(*) as tickets_created,
    AVG(ice_score) as avg_ice_score
    FROM growth_tickets
    WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
    GROUP BY 1
    ORDER BY 1 DESC;

    -- Completion rate by priority
    SELECT
    priority,
    COUNT(*) as total,
    COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed,
    AVG(EXTRACT(EPOCH FROM (completed_at - created_at)) / 86400) as avg_days_to_complete
    FROM growth_tickets
    WHERE created_at >= CURRENT_DATE - INTERVAL '90 days'
    GROUP BY 1;

    -- Category distribution
    SELECT
    category,
    COUNT(*) as count,
    AVG(ice_score) as avg_priority,
    COUNT(CASE WHEN status = 'completed' THEN 1 END)::FLOAT / COUNT(*) as completion_rate
    FROM growth_tickets
    WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
    GROUP BY 1
    ORDER BY count DESC;

    Next Steps

    Resources

    Need Implementation Help?

    Talk to Our Technical Team

    Schedule a technical consultation to discuss your integration requirements and implementation strategy.

    Schedule Demo