Back to Documentation
    DocumentationfeaturesFeb 21, 2025

    Growth Tickets System Architecture

    Technical architecture of Cogny's automated growth ticket generation system, including recommendation algorithms, prioritization logic, and integration patterns.

    Overview

    Growth Tickets are AI-generated, actionable recommendations automatically derived from data analysis. The system continuously monitors analytics data, identifies opportunities, and generates prioritized action items for growth teams.

    Key Features:

    • Automated opportunity detection
    • Impact estimation
    • Priority scoring
    • Action item generation
    • Project management integration

    System Architecture

    ┌─────────────────────────────────────────────────────────────┐
    │                     Data Sources                             │
    ├─────────────┬──────────────┬─────────────┬─────────────────┤
    │  Analytics  │   Reports    │  ICP Data   │  Historical     │
    └──────┬──────┴──────┬───────┴──────┬──────┴─────────┬───────┘
           │              │              │                │
           └──────────────┴──────────────┴────────────────┘
                          │
             ┌────────────▼────────────┐
             │  Opportunity Detection  │
             │  - Pattern recognition  │
             │  - Anomaly detection    │
             │  - Trend analysis       │
             └────────────┬────────────┘
                          │
             ┌────────────▼────────────┐
             │  Recommendation Engine  │
             │  - Solution matching    │
             │  - Impact estimation    │
             │  - Effort calculation   │
             └────────────┬────────────┘
                          │
             ┌────────────▼────────────┐
             │  Priority Scoring       │
             │  - ICE framework        │
             │  - Business context     │
             │  - Resource availability│
             └────────────┬────────────┘
                          │
             ┌────────────▼────────────┐
             │  Action Item Generator  │
             │  - Task breakdown       │
             │  - Time estimation      │
             │  - Dependency mapping   │
             └────────────┬────────────┘
                          │
             ┌────────────▼────────────┐
             │  Ticket Management      │
             │  - Deduplication        │
             │  - Status tracking      │
             │  - API exposure         │
             └─────────────────────────┘
    

    Opportunity Detection

    Pattern Recognition

    Identify growth opportunities from data patterns:

    class OpportunityDetector:
        def __init__(self, warehouse_id: str):
            self.warehouse_id = warehouse_id
            self.analyzers = [
                ConversionFunnelAnalyzer(),
                TrafficSourceAnalyzer(),
                UserSegmentAnalyzer(),
                ProductPerformanceAnalyzer(),
                TechnicalPerformanceAnalyzer()
            ]
    
        async def detect_opportunities(self) -> list:
            """Detect all growth opportunities"""
            opportunities = []
    
            for analyzer in self.analyzers:
                try:
                    detected = await analyzer.analyze(self.warehouse_id)
                    opportunities.extend(detected)
                except Exception as e:
                    logger.error(f"Analyzer {analyzer.__class__.__name__} failed: {e}")
    
            return opportunities
    

    Funnel Opportunity Detection

    Identify conversion funnel issues:

    class ConversionFunnelAnalyzer:
        async def analyze(self, warehouse_id: str) -> list:
            """Analyze conversion funnel for opportunities"""
            opportunities = []
    
            # Get funnel data
            funnel_data = await self.get_funnel_data(warehouse_id)
    
            # Check drop-off rates
            for i in range(len(funnel_data) - 1):
                current_step = funnel_data[i]
                next_step = funnel_data[i + 1]
    
                drop_off_rate = 1 - (next_step['users'] / current_step['users'])
    
                # Significant drop-off
                if drop_off_rate > 0.5:
                    opportunity = {
                        'type': 'funnel_optimization',
                        'title': f"High drop-off at {next_step['name']} step",
                        'description': (
                            f"{drop_off_rate:.1%} of users drop off between "
                            f"{current_step['name']} and {next_step['name']}"
                        ),
                        'impact_data': {
                            'current_drop_off': drop_off_rate,
                            'users_affected': current_step['users'] - next_step['users'],
                            'step_from': current_step['name'],
                            'step_to': next_step['name']
                        },
                        'confidence': self.calculate_confidence(current_step['users'])
                    }
                    opportunities.append(opportunity)
    
            # Check device-specific performance
            device_funnel = await self.get_funnel_by_device(warehouse_id)
    
            for device, funnel in device_funnel.items():
                conversion_rate = funnel[-1]['users'] / funnel[0]['users']
                avg_conversion = self.get_average_conversion_rate(device_funnel)
    
                # Underperforming device
                if conversion_rate < avg_conversion * 0.7:
                    opportunity = {
                        'type': 'device_optimization',
                        'title': f"Low {device} conversion rate",
                        'description': (
                            f"{device.capitalize()} conversion rate ({conversion_rate:.2%}) "
                            f"is {(1 - conversion_rate/avg_conversion):.0%} below average"
                        ),
                        'impact_data': {
                            'device': device,
                            'current_rate': conversion_rate,
                            'average_rate': avg_conversion,
                            'users_affected': funnel[0]['users']
                        },
                        'confidence': 0.85
                    }
                    opportunities.append(opportunity)
    
            return opportunities
    
        async def get_funnel_data(self, warehouse_id: str) -> list:
            """Get funnel step data"""
            query = """
            WITH funnel_steps AS (
              SELECT
                user_pseudo_id,
                COUNTIF(event_name = 'page_view') as step_1,
                COUNTIF(event_name = 'view_item') as step_2,
                COUNTIF(event_name = 'add_to_cart') as step_3,
                COUNTIF(event_name = 'begin_checkout') as step_4,
                COUNTIF(event_name = 'purchase') as step_5
              FROM `{project}.{dataset}.events_*`
              WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY))
                                      AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
              GROUP BY user_pseudo_id
            )
    
            SELECT
              'Landing' as name,
              1 as step_number,
              COUNT(DISTINCT CASE WHEN step_1 > 0 THEN user_pseudo_id END) as users
            FROM funnel_steps
            UNION ALL
            SELECT 'Product View', 2, COUNT(DISTINCT CASE WHEN step_2 > 0 THEN user_pseudo_id END)
            FROM funnel_steps
            UNION ALL
            SELECT 'Add to Cart', 3, COUNT(DISTINCT CASE WHEN step_3 > 0 THEN user_pseudo_id END)
            FROM funnel_steps
            UNION ALL
            SELECT 'Checkout', 4, COUNT(DISTINCT CASE WHEN step_4 > 0 THEN user_pseudo_id END)
            FROM funnel_steps
            UNION ALL
            SELECT 'Purchase', 5, COUNT(DISTINCT CASE WHEN step_5 > 0 THEN user_pseudo_id END)
            FROM funnel_steps
            ORDER BY step_number
            """
    
            results = await execute_query(warehouse_id, query)
            return results
    

    Traffic Source Analysis

    Identify channel optimization opportunities:

    class TrafficSourceAnalyzer:
        async def analyze(self, warehouse_id: str) -> list:
            """Analyze traffic sources for opportunities"""
            opportunities = []
    
            # Get channel performance
            channel_data = await self.get_channel_performance(warehouse_id)
    
            # Identify underperforming channels
            avg_conversion = sum(c['conversion_rate'] for c in channel_data) / len(channel_data)
    
            for channel in channel_data:
                # High traffic, low conversion
                if (channel['sessions'] > 1000 and
                    channel['conversion_rate'] < avg_conversion * 0.5):
    
                    opportunity = {
                        'type': 'channel_optimization',
                        'title': f"Low conversion rate from {channel['source']} traffic",
                        'description': (
                            f"{channel['source']} drives {channel['sessions']:,} sessions "
                            f"but only {channel['conversion_rate']:.2%} convert, "
                            f"significantly below the average of {avg_conversion:.2%}"
                        ),
                        'impact_data': {
                            'source': channel['source'],
                            'medium': channel['medium'],
                            'sessions': channel['sessions'],
                            'current_conversion': channel['conversion_rate'],
                            'target_conversion': avg_conversion
                        },
                        'confidence': 0.9
                    }
                    opportunities.append(opportunity)
    
                # High conversion, low traffic
                if (channel['conversion_rate'] > avg_conversion * 1.5 and
                    channel['sessions'] < 500):
    
                    opportunity = {
                        'type': 'acquisition_scaling',
                        'title': f"Scale high-performing {channel['source']} channel",
                        'description': (
                            f"{channel['source']} has {channel['conversion_rate']:.2%} conversion rate "
                            f"but only {channel['sessions']:,} sessions. Opportunity to scale."
                        ),
                        'impact_data': {
                            'source': channel['source'],
                            'medium': channel['medium'],
                            'sessions': channel['sessions'],
                            'conversion_rate': channel['conversion_rate']
                        },
                        'confidence': 0.85
                    }
                    opportunities.append(opportunity)
    
            return opportunities
    

    Recommendation Engine

    Solution Matching

    Match opportunities to proven solutions:

    class RecommendationEngine:
        def __init__(self):
            self.solution_database = SolutionDatabase()
    
        async def generate_recommendations(
            self,
            opportunity: dict
        ) -> dict:
            """Generate recommendation from opportunity"""
    
            # Match to solution patterns
            solutions = self.solution_database.find_solutions(
                opportunity_type=opportunity['type'],
                context=opportunity['impact_data']
            )
    
            # Rank solutions by expected impact
            ranked_solutions = self.rank_solutions(solutions, opportunity)
    
            # Select best solution
            best_solution = ranked_solutions[0]
    
            # Generate action items
            action_items = self.generate_action_items(best_solution, opportunity)
    
            # Estimate impact
            impact_estimate = self.estimate_impact(opportunity, best_solution)
    
            recommendation = {
                'title': opportunity['title'],
                'description': opportunity['description'],
                'category': opportunity['type'],
                'solution': best_solution,
                'action_items': action_items,
                'estimated_impact': impact_estimate,
                'confidence': opportunity['confidence']
            }
    
            return recommendation
    
        def rank_solutions(self, solutions: list, opportunity: dict) -> list:
            """Rank solutions by expected effectiveness"""
    
            scored_solutions = []
    
            for solution in solutions:
                # Calculate relevance score
                relevance = self.calculate_relevance(solution, opportunity)
    
                # Historical success rate
                success_rate = solution.get('success_rate', 0.5)
    
                # Implementation difficulty
                difficulty = solution.get('difficulty', 'medium')
                difficulty_penalty = {'easy': 0, 'medium': 0.1, 'hard': 0.2}[difficulty]
    
                # Combined score
                score = relevance * success_rate * (1 - difficulty_penalty)
    
                scored_solutions.append({
                    'solution': solution,
                    'score': score
                })
    
            # Sort by score
            scored_solutions.sort(key=lambda x: x['score'], reverse=True)
    
            return [s['solution'] for s in scored_solutions]
    

    Impact Estimation

    Estimate potential impact of recommendations:

    class ImpactEstimator:
        async def estimate_impact(
            self,
            opportunity: dict,
            solution: dict
        ) -> dict:
            """Estimate impact of implementing solution"""
    
            impact = {
                'metric': None,
                'current_value': None,
                'projected_value': None,
                'increase_percentage': None,
                'confidence': None
            }
    
            if opportunity['type'] == 'funnel_optimization':
                # Estimate conversion rate improvement
                current_drop_off = opportunity['impact_data']['current_drop_off']
                users_affected = opportunity['impact_data']['users_affected']
    
                # Assume 20-40% improvement in drop-off rate
                improvement_factor = solution.get('expected_improvement', 0.3)
                new_drop_off = current_drop_off * (1 - improvement_factor)
    
                impact.update({
                    'metric': 'conversion_rate',
                    'current_value': 1 - current_drop_off,
                    'projected_value': 1 - new_drop_off,
                    'increase_percentage': (
                        ((1 - new_drop_off) / (1 - current_drop_off) - 1) * 100
                    ),
                    'additional_conversions': int(
                        users_affected * improvement_factor * (1 - current_drop_off)
                    ),
                    'confidence': 0.7
                })
    
            elif opportunity['type'] == 'channel_optimization':
                # Estimate conversion rate improvement for channel
                current_conversion = opportunity['impact_data']['current_conversion']
                target_conversion = opportunity['impact_data']['target_conversion']
                sessions = opportunity['impact_data']['sessions']
    
                # Assume reaching 70% of target conversion
                projected_conversion = (
                    current_conversion + (target_conversion - current_conversion) * 0.7
                )
    
                impact.update({
                    'metric': 'conversion_rate',
                    'current_value': current_conversion,
                    'projected_value': projected_conversion,
                    'increase_percentage': (
                        (projected_conversion / current_conversion - 1) * 100
                    ),
                    'additional_conversions': int(
                        sessions * (projected_conversion - current_conversion)
                    ),
                    'confidence': 0.65
                })
    
            return impact
    

    Priority Scoring

    ICE Framework

    Use Impact, Confidence, Ease framework:

    class PriorityScorer:
        def calculate_priority(
            self,
            recommendation: dict,
            context: dict = None
        ) -> dict:
            """Calculate priority score using ICE framework"""
    
            # Impact (1-10)
            impact_score = self.calculate_impact_score(
                recommendation['estimated_impact']
            )
    
            # Confidence (1-10)
            confidence_score = recommendation['confidence'] * 10
    
            # Ease (1-10)
            ease_score = self.calculate_ease_score(
                recommendation['action_items']
            )
    
            # ICE score
            ice_score = (impact_score * confidence_score * ease_score) / 100
    
            # Priority tier
            if ice_score >= 7:
                priority = 'critical'
            elif ice_score >= 5:
                priority = 'high'
            elif ice_score >= 3:
                priority = 'medium'
            else:
                priority = 'low'
    
            return {
                'priority': priority,
                'ice_score': ice_score,
                'impact_score': impact_score,
                'confidence_score': confidence_score,
                'ease_score': ease_score
            }
    
        def calculate_impact_score(self, impact: dict) -> float:
            """Calculate impact score (1-10)"""
    
            if not impact:
                return 5.0
    
            increase_pct = impact.get('increase_percentage', 0)
    
            # Map percentage increase to 1-10 scale
            if increase_pct >= 50:
                return 10.0
            elif increase_pct >= 30:
                return 8.0
            elif increase_pct >= 20:
                return 6.0
            elif increase_pct >= 10:
                return 4.0
            else:
                return 2.0
    
        def calculate_ease_score(self, action_items: list) -> float:
            """Calculate ease score (1-10)"""
    
            if not action_items:
                return 5.0
    
            total_hours = sum(item.get('estimated_hours', 0) for item in action_items)
    
            # Map estimated hours to ease score (inverse)
            if total_hours <= 4:
                return 10.0
            elif total_hours <= 8:
                return 8.0
            elif total_hours <= 16:
                return 6.0
            elif total_hours <= 40:
                return 4.0
            else:
                return 2.0
    

    Action Item Generation

    Task Breakdown

    Break recommendations into actionable tasks:

    class ActionItemGenerator:
        async def generate_action_items(
            self,
            solution: dict,
            opportunity: dict
        ) -> list:
            """Generate specific action items"""
    
            # Use Claude to generate detailed action items
            prompt = f"""
            Generate specific, actionable tasks to implement this solution:
    
            Problem: {opportunity['description']}
            Solution: {solution['description']}
            Context: {json.dumps(opportunity['impact_data'], indent=2)}
    
            For each action item, provide:
            1. Clear title (what to do)
            2. Detailed description (how to do it)
            3. Estimated hours to complete
            4. Required skills/resources
            5. Dependencies (if any)
    
            Generate 3-7 action items in order of execution.
            Return as JSON array.
            """
    
            response = await call_claude(prompt)
            action_items = json.loads(response)
    
            # Enhance with technical details
            for item in action_items:
                item['completed'] = False
                item['id'] = generate_id()
    
                # Add code examples if relevant
                if self.requires_code(item):
                    item['code_example'] = await self.generate_code_example(item)
    
            return action_items
    
        async def generate_code_example(self, action_item: dict) -> str:
            """Generate code example for technical tasks"""
    
            prompt = f"""
            Generate a code example for this task:
    
            Task: {action_item['title']}
            Description: {action_item['description']}
    
            Provide practical, production-ready code with comments.
            """
    
            code = await call_claude(prompt)
            return code
    

    Deduplication

    Similarity Detection

    Prevent duplicate tickets:

    class TicketDeduplicator:
        def __init__(self):
            self.vectorizer = TfidfVectorizer()
            self.similarity_threshold = 0.8
    
        async def find_duplicates(
            self,
            new_ticket: dict,
            existing_tickets: list
        ) -> list:
            """Find potential duplicate tickets"""
    
            if not existing_tickets:
                return []
    
            # Vectorize ticket descriptions
            all_descriptions = [new_ticket['description']] + [
                t['description'] for t in existing_tickets
            ]
    
            vectors = self.vectorizer.fit_transform(all_descriptions)
    
            # Calculate cosine similarity
            from sklearn.metrics.pairwise import cosine_similarity
    
            similarities = cosine_similarity(vectors[0:1], vectors[1:]).flatten()
    
            # Find similar tickets
            duplicates = []
            for idx, similarity in enumerate(similarities):
                if similarity >= self.similarity_threshold:
                    duplicates.append({
                        'ticket': existing_tickets[idx],
                        'similarity': similarity
                    })
    
            return duplicates
    
        async def merge_similar_tickets(
            self,
            ticket1: dict,
            ticket2: dict
        ) -> dict:
            """Merge similar tickets"""
    
            # Combine action items
            all_actions = ticket1['action_items'] + ticket2['action_items']
    
            # Deduplicate actions
            unique_actions = self.deduplicate_actions(all_actions)
    
            # Take higher impact estimate
            impact = (
                ticket1['estimated_impact']
                if ticket1['estimated_impact']['increase_percentage'] >
                   ticket2['estimated_impact']['increase_percentage']
                else ticket2['estimated_impact']
            )
    
            merged = {
                'title': ticket1['title'],
                'description': self.merge_descriptions(
                    ticket1['description'],
                    ticket2['description']
                ),
                'action_items': unique_actions,
                'estimated_impact': impact,
                'merged_from': [ticket1['id'], ticket2['id']]
            }
    
            return merged
    

    Storage and Retrieval

    Database Schema

    Store tickets in structured format:

    CREATE TABLE growth_tickets (
      id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
      warehouse_id UUID NOT NULL REFERENCES warehouses(id),
      title VARCHAR(255) NOT NULL,
      description TEXT NOT NULL,
      category VARCHAR(50) NOT NULL,
      priority VARCHAR(20) NOT NULL,
      status VARCHAR(20) DEFAULT 'open',
    
      -- Impact estimation
      estimated_impact JSONB NOT NULL,
      confidence FLOAT NOT NULL,
    
      -- Priority scoring
      ice_score FLOAT NOT NULL,
      impact_score FLOAT NOT NULL,
      confidence_score FLOAT NOT NULL,
      ease_score FLOAT NOT NULL,
    
      -- Action items
      action_items JSONB NOT NULL,
    
      -- Supporting data
      supporting_data JSONB,
      related_report_id UUID REFERENCES reports(id),
    
      -- Metadata
      assignee UUID REFERENCES users(id),
      notes TEXT,
      dismissed_at TIMESTAMP,
      dismiss_reason VARCHAR(50),
    
      -- Timestamps
      created_at TIMESTAMP DEFAULT NOW(),
      updated_at TIMESTAMP DEFAULT NOW(),
      completed_at TIMESTAMP,
    
      -- Indexes
      INDEX idx_warehouse_status (warehouse_id, status),
      INDEX idx_priority (priority),
      INDEX idx_category (category),
      INDEX idx_ice_score (ice_score DESC)
    );
    

    Query Optimization

    Efficient ticket retrieval:

    async def get_tickets(
        warehouse_id: str,
        filters: dict = None,
        sort: str = '-ice_score',
        limit: int = 50,
        cursor: str = None
    ) -> dict:
        """Get tickets with filtering and pagination"""
    
        query = """
        SELECT *
        FROM growth_tickets
        WHERE warehouse_id = $1
        """
    
        params = [warehouse_id]
        param_count = 1
    
        # Apply filters
        if filters:
            if filters.get('status'):
                param_count += 1
                query += f" AND status = ${param_count}"
                params.append(filters['status'])
    
            if filters.get('priority'):
                param_count += 1
                query += f" AND priority = ${param_count}"
                params.append(filters['priority'])
    
            if filters.get('category'):
                param_count += 1
                query += f" AND category = ${param_count}"
                params.append(filters['category'])
    
        # Apply sorting
        sort_field = sort.lstrip('-')
        sort_direction = 'DESC' if sort.startswith('-') else 'ASC'
        query += f" ORDER BY {sort_field} {sort_direction}"
    
        # Apply pagination
        if cursor:
            param_count += 1
            query += f" AND id > ${param_count}"
            params.append(cursor)
    
        param_count += 1
        query += f" LIMIT ${param_count}"
        params.append(limit)
    
        results = await db.fetch(query, *params)
    
        return {
            'data': results,
            'pagination': {
                'has_more': len(results) == limit,
                'next_cursor': results[-1]['id'] if results else None
            }
        }
    

    Integration Patterns

    Webhook Notifications

    Notify external systems of new tickets:

    async def notify_ticket_created(ticket: dict):
        """Send webhook notification for new ticket"""
    
        webhook_url = await get_webhook_url(ticket['warehouse_id'])
    
        if not webhook_url:
            return
    
        payload = {
            'event': 'ticket.created',
            'data': {
                'ticket_id': ticket['id'],
                'title': ticket['title'],
                'priority': ticket['priority'],
                'category': ticket['category'],
                'estimated_impact': ticket['estimated_impact'],
                'url': f"https://app.cogny.com/tickets/{ticket['id']}"
            },
            'timestamp': datetime.now().isoformat()
        }
    
        # Sign payload
        signature = sign_payload(payload, webhook_secret)
    
        try:
            async with httpx.AsyncClient() as client:
                response = await client.post(
                    webhook_url,
                    json=payload,
                    headers={
                        'X-Cogny-Signature': signature,
                        'Content-Type': 'application/json'
                    },
                    timeout=10.0
                )
                response.raise_for_status()
        except Exception as e:
            logger.error(f"Webhook delivery failed: {e}")
    

    JIRA Synchronization

    Sync tickets to JIRA:

    from jira import JIRA
    
    class JIRAIntegration:
        def __init__(self, jira_url: str, credentials: dict):
            self.jira = JIRA(
                server=jira_url,
                basic_auth=(credentials['email'], credentials['api_token'])
            )
    
        async def sync_ticket(self, ticket: dict, project_key: str):
            """Sync Cogny ticket to JIRA"""
    
            # Create JIRA issue
            issue_dict = {
                'project': {'key': project_key},
                'summary': ticket['title'],
                'description': self.format_description(ticket),
                'issuetype': {'name': 'Task'},
                'priority': {'name': self.map_priority(ticket['priority'])},
                'labels': [ticket['category'], 'cogny-generated']
            }
    
            jira_issue = self.jira.create_issue(fields=issue_dict)
    
            # Create subtasks for action items
            for action_item in ticket['action_items']:
                subtask_dict = {
                    'project': {'key': project_key},
                    'summary': action_item['title'],
                    'description': action_item['description'],
                    'issuetype': {'name': 'Sub-task'},
                    'parent': {'key': jira_issue.key}
                }
                self.jira.create_issue(fields=subtask_dict)
    
            return jira_issue.key
    
        def format_description(self, ticket: dict) -> str:
            """Format ticket description for JIRA"""
    
            description = f"{ticket['description']}\n\n"
            description += f"*Category:* {ticket['category']}\n"
            description += f"*Priority:* {ticket['priority']}\n"
            description += f"*ICE Score:* {ticket['ice_score']:.1f}\n\n"
    
            impact = ticket['estimated_impact']
            description += "*Estimated Impact:*\n"
            description += f"- Metric: {impact['metric']}\n"
            description += f"- Improvement: +{impact['increase_percentage']:.1f}%\n"
            description += f"- Confidence: {impact['confidence']:.0%}\n\n"
    
            description += f"[View in Cogny|{ticket['url']}]"
    
            return description
    

    Performance Monitoring

    Ticket Metrics

    Track ticket system performance:

    -- Ticket creation rate
    SELECT
      DATE_TRUNC('day', created_at) as date,
      COUNT(*) as tickets_created,
      AVG(ice_score) as avg_ice_score
    FROM growth_tickets
    WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
    GROUP BY 1
    ORDER BY 1 DESC;
    
    -- Completion rate by priority
    SELECT
      priority,
      COUNT(*) as total,
      COUNT(CASE WHEN status = 'completed' THEN 1 END) as completed,
      AVG(EXTRACT(EPOCH FROM (completed_at - created_at)) / 86400) as avg_days_to_complete
    FROM growth_tickets
    WHERE created_at >= CURRENT_DATE - INTERVAL '90 days'
    GROUP BY 1;
    
    -- Category distribution
    SELECT
      category,
      COUNT(*) as count,
      AVG(ice_score) as avg_priority,
      COUNT(CASE WHEN status = 'completed' THEN 1 END)::FLOAT / COUNT(*) as completion_rate
    FROM growth_tickets
    WHERE created_at >= CURRENT_DATE - INTERVAL '30 days'
    GROUP BY 1
    ORDER BY count DESC;
    

    Next Steps

    Resources

    Need Implementation Help?

    Talk to Our Technical Team

    Schedule a technical consultation to discuss your integration requirements and implementation strategy.

    Schedule Demo