Back to Documentation
    DocumentationfeaturesFeb 17, 2025

    ICP Analysis Technical Overview

    Deep dive into Cogny's Ideal Customer Profile analysis system, including machine learning algorithms, data processing pipelines, and implementation architecture.

    Overview

    Cogny's ICP (Ideal Customer Profile) Analysis uses machine learning to identify patterns in your highest-value customers, enabling data-driven targeting and acquisition strategies.

    Key Features:

    • Automated customer segmentation
    • Behavioral pattern recognition
    • Predictive lifetime value modeling
    • Channel effectiveness analysis
    • Geographic and demographic profiling

    Architecture

    System Components

    ┌─────────────────────────────────────────────────────────────┐
    │                        Data Sources                          │
    ├─────────────────┬──────────────┬─────────────┬──────────────┤
    │   GA4 Events    │  E-commerce  │  User Props │  Ad Platforms│
    └────────┬────────┴──────┬───────┴──────┬──────┴──────┬───────┘
             │                │              │             │
             └────────────────┴──────────────┴─────────────┘
                              │
                    ┌─────────▼──────────┐
                    │  Data Aggregation  │
                    │   ETL Pipeline     │
                    └─────────┬──────────┘
                              │
             ┌────────────────┴────────────────┐
             │                                  │
        ┌────▼────────┐              ┌─────────▼────────┐
        │  Feature     │              │   Data Cleaning  │
        │  Engineering │              │   Normalization  │
        └────┬────────┘              └─────────┬────────┘
             │                                  │
             └────────────────┬─────────────────┘
                              │
                    ┌─────────▼──────────┐
                    │   ML Pipeline      │
                    │  - Clustering      │
                    │  - Classification  │
                    │  - LTV Prediction  │
                    └─────────┬──────────┘
                              │
             ┌────────────────┴────────────────┐
             │                                  │
        ┌────▼────────┐              ┌─────────▼────────┐
        │  Segment     │              │  ICP Profiles    │
        │  Generation  │              │  Scoring         │
        └────┬────────┘              └─────────┬────────┘
             │                                  │
             └────────────────┬─────────────────┘
                              │
                    ┌─────────▼──────────┐
                    │  Results Storage   │
                    │  & API Layer       │
                    └────────────────────┘
    

    Data Collection

    User Events

    GA4 events aggregated for ICP analysis:

    -- Aggregate user behavior features
    WITH user_behavior AS (
      SELECT
        user_pseudo_id,
        COUNT(DISTINCT DATE(TIMESTAMP_MICROS(event_timestamp))) as days_active,
        COUNT(DISTINCT CONCAT(user_pseudo_id,
          (SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'ga_session_id'))) as total_sessions,
        COUNT(DISTINCT CASE WHEN event_name = 'page_view' THEN event_timestamp END) as page_views,
        COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN event_timestamp END) as purchases,
        SUM(CASE WHEN event_name = 'purchase' THEN ecommerce.purchase_revenue_in_usd END) as total_revenue,
        AVG((SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'engagement_time_msec')) / 1000 as avg_engagement_seconds,
        MAX(TIMESTAMP_MICROS(event_timestamp)) as last_activity,
        MIN(TIMESTAMP_MICROS(event_timestamp)) as first_activity
      FROM `project.analytics_123456789.events_*`
      WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
                              AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
      GROUP BY user_pseudo_id
    )
    
    SELECT
      *,
      TIMESTAMP_DIFF(last_activity, first_activity, DAY) as customer_age_days,
      SAFE_DIVIDE(total_revenue, purchases) as avg_order_value,
      SAFE_DIVIDE(page_views, total_sessions) as pages_per_session
    FROM user_behavior
    WHERE purchases > 0  -- Focus on converted users
    

    User Attributes

    Extract demographic and technical attributes:

    -- Collect user attributes
    SELECT
      user_pseudo_id,
      -- Device attributes
      ARRAY_AGG(device.category IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as primary_device,
      ARRAY_AGG(device.operating_system IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as operating_system,
      ARRAY_AGG(device.browser IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as browser,
    
      -- Geographic attributes
      ARRAY_AGG(geo.country IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as country,
      ARRAY_AGG(geo.region IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as region,
      ARRAY_AGG(geo.city IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as city,
    
      -- Acquisition attributes
      ARRAY_AGG(traffic_source.source IGNORE NULLS ORDER BY event_timestamp LIMIT 1)[OFFSET(0)] as first_source,
      ARRAY_AGG(traffic_source.medium IGNORE NULLS ORDER BY event_timestamp LIMIT 1)[OFFSET(0)] as first_medium,
    
      -- Custom user properties
      ARRAY_AGG((SELECT value.string_value FROM UNNEST(user_properties) WHERE key = 'user_type') IGNORE NULLS LIMIT 1)[OFFSET(0)] as user_type,
      ARRAY_AGG((SELECT value.string_value FROM UNNEST(user_properties) WHERE key = 'plan_level') IGNORE NULLS LIMIT 1)[OFFSET(0)] as plan_level
    
    FROM `project.analytics_123456789.events_*`
    WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
                            AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
    GROUP BY user_pseudo_id
    

    Product Interactions

    Track product engagement patterns:

    -- Product interaction patterns
    SELECT
      user_pseudo_id,
      ARRAY_AGG(DISTINCT item.item_category IGNORE NULLS) as categories_viewed,
      COUNT(DISTINCT item.item_id) as unique_products_viewed,
      COUNT(DISTINCT CASE WHEN event_name = 'add_to_cart' THEN item.item_id END) as products_added_to_cart,
      COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN item.item_id END) as products_purchased,
      AVG(CASE WHEN event_name = 'purchase' THEN item.price_in_usd END) as avg_product_price
    FROM `project.analytics_123456789.events_*`,
      UNNEST(items) as item
    WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
                            AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
    GROUP BY user_pseudo_id
    

    Feature Engineering

    Behavioral Features

    Transform raw events into ML-ready features:

    import pandas as pd
    import numpy as np
    
    def engineer_behavioral_features(df):
        """Create behavioral features from user data"""
    
        features = pd.DataFrame()
    
        # Engagement metrics
        features['recency_days'] = (pd.Timestamp.now() - df['last_activity']).dt.days
        features['frequency_score'] = np.log1p(df['total_sessions'])
        features['monetary_value'] = np.log1p(df['total_revenue'])
    
        # RFM score (Recency, Frequency, Monetary)
        features['rfm_score'] = (
            normalize_score(features['recency_days'], reverse=True) +
            normalize_score(features['frequency_score']) +
            normalize_score(features['monetary_value'])
        )
    
        # Engagement metrics
        features['engagement_intensity'] = df['page_views'] / df['days_active']
        features['purchase_conversion_rate'] = df['purchases'] / df['total_sessions']
        features['session_frequency'] = df['total_sessions'] / df['customer_age_days']
    
        # Value metrics
        features['customer_lifetime_value'] = df['total_revenue']
        features['avg_order_value'] = df['total_revenue'] / df['purchases']
        features['revenue_per_session'] = df['total_revenue'] / df['total_sessions']
    
        return features
    
    def normalize_score(series, reverse=False):
        """Normalize to 0-100 scale"""
        min_val = series.min()
        max_val = series.max()
    
        if reverse:
            return 100 * (max_val - series) / (max_val - min_val)
        else:
            return 100 * (series - min_val) / (max_val - min_val)
    

    Demographic Features

    Encode categorical attributes:

    from sklearn.preprocessing import LabelEncoder, OneHotEncoder
    
    def engineer_demographic_features(df):
        """Create demographic features"""
    
        features = pd.DataFrame()
    
        # Device features
        features['is_mobile'] = (df['primary_device'] == 'mobile').astype(int)
        features['is_desktop'] = (df['primary_device'] == 'desktop').astype(int)
    
        # Geographic features (one-hot encoding for top countries)
        top_countries = df['country'].value_counts().head(10).index
        for country in top_countries:
            features[f'country_{country.lower()}'] = (df['country'] == country).astype(int)
    
        # Acquisition channel features
        acquisition_mapping = {
            'organic': 'organic',
            'cpc': 'paid',
            'social': 'social',
            'referral': 'referral',
            'email': 'email',
            '(none)': 'direct',
            '(direct)': 'direct'
        }
        df['acquisition_channel'] = df['first_medium'].map(acquisition_mapping).fillna('other')
    
        # One-hot encode acquisition channels
        channel_dummies = pd.get_dummies(df['acquisition_channel'], prefix='channel')
        features = pd.concat([features, channel_dummies], axis=1)
    
        return features
    

    Time-Based Features

    Capture temporal patterns:

    def engineer_temporal_features(df):
        """Create time-based features"""
    
        features = pd.DataFrame()
    
        # Customer lifecycle stage
        features['customer_age_weeks'] = df['customer_age_days'] / 7
        features['is_new_customer'] = (df['customer_age_days'] <= 30).astype(int)
        features['is_returning_customer'] = (df['purchases'] > 1).astype(int)
    
        # Activity patterns
        features['days_since_last_purchase'] = (
            pd.Timestamp.now() - df['last_purchase_date']
        ).dt.days
        features['purchase_frequency_weeks'] = (
            df['purchases'] / features['customer_age_weeks']
        )
    
        # Churn risk indicators
        features['days_inactive'] = (pd.Timestamp.now() - df['last_activity']).dt.days
        features['is_at_risk'] = (features['days_inactive'] > 30).astype(int)
    
        return features
    

    Machine Learning Pipeline

    Clustering Algorithm

    K-means clustering to identify customer segments:

    from sklearn.cluster import KMeans
    from sklearn.preprocessing import StandardScaler
    from sklearn.decomposition import PCA
    
    class ICPClustering:
        def __init__(self, n_clusters=5):
            self.n_clusters = n_clusters
            self.scaler = StandardScaler()
            self.pca = PCA(n_components=0.95)  # Retain 95% variance
            self.kmeans = KMeans(
                n_clusters=n_clusters,
                init='k-means++',
                n_init=10,
                max_iter=300,
                random_state=42
            )
    
        def fit(self, X):
            """Fit clustering model"""
            # Standardize features
            X_scaled = self.scaler.fit_transform(X)
    
            # Dimensionality reduction
            X_pca = self.pca.fit_transform(X_scaled)
    
            # Cluster
            self.kmeans.fit(X_pca)
    
            return self
    
        def predict(self, X):
            """Predict cluster membership"""
            X_scaled = self.scaler.transform(X)
            X_pca = self.pca.transform(X_scaled)
            return self.kmeans.predict(X_pca)
    
        def get_cluster_centers(self):
            """Get cluster centers in original feature space"""
            centers_pca = self.kmeans.cluster_centers_
            centers_scaled = self.pca.inverse_transform(centers_pca)
            centers_original = self.scaler.inverse_transform(centers_scaled)
            return centers_original
    

    Optimal Cluster Selection

    Use elbow method and silhouette score:

    from sklearn.metrics import silhouette_score
    import matplotlib.pyplot as plt
    
    def find_optimal_clusters(X, max_clusters=10):
        """Find optimal number of clusters"""
    
        inertias = []
        silhouette_scores = []
    
        for k in range(2, max_clusters + 1):
            model = ICPClustering(n_clusters=k)
            model.fit(X)
    
            labels = model.predict(X)
    
            inertias.append(model.kmeans.inertia_)
            silhouette_scores.append(silhouette_score(X, labels))
    
        # Find elbow point
        optimal_k = find_elbow(inertias) + 2  # +2 because range starts at 2
    
        return optimal_k, inertias, silhouette_scores
    
    def find_elbow(inertias):
        """Find elbow point using knee detection"""
        from kneed import KneeLocator
    
        kl = KneeLocator(
            range(2, len(inertias) + 2),
            inertias,
            curve='convex',
            direction='decreasing'
        )
    
        return kl.elbow if kl.elbow else 5  # Default to 5
    

    Lifetime Value Prediction

    Predict future customer value:

    from sklearn.ensemble import GradientBoostingRegressor
    from sklearn.model_selection import cross_val_score
    
    class LTVPredictor:
        def __init__(self):
            self.model = GradientBoostingRegressor(
                n_estimators=100,
                learning_rate=0.1,
                max_depth=5,
                random_state=42
            )
    
        def train(self, X, y):
            """Train LTV prediction model"""
            # Log-transform target for better prediction
            y_log = np.log1p(y)
    
            # Cross-validation
            cv_scores = cross_val_score(
                self.model, X, y_log,
                cv=5,
                scoring='neg_mean_squared_error'
            )
            print(f"CV RMSE: {np.sqrt(-cv_scores.mean()):.2f}")
    
            # Train final model
            self.model.fit(X, y_log)
    
            return self
    
        def predict(self, X):
            """Predict LTV"""
            y_log_pred = self.model.predict(X)
            return np.expm1(y_log_pred)  # Inverse log transform
    
        def feature_importance(self, feature_names):
            """Get feature importance"""
            importance = pd.DataFrame({
                'feature': feature_names,
                'importance': self.model.feature_importances_
            }).sort_values('importance', ascending=False)
    
            return importance
    

    Segment Analysis

    Profile Generation

    Generate human-readable segment profiles:

    def generate_segment_profiles(df, cluster_labels):
        """Generate descriptive profiles for each segment"""
    
        df['segment'] = cluster_labels
    
        profiles = []
    
        for segment_id in sorted(df['segment'].unique()):
            segment_data = df[df['segment'] == segment_id]
    
            profile = {
                'segment_id': segment_id,
                'size': len(segment_data),
                'percentage': len(segment_data) / len(df) * 100,
    
                # Value metrics
                'avg_ltv': segment_data['total_revenue'].mean(),
                'avg_orders': segment_data['purchases'].mean(),
                'avg_order_value': (segment_data['total_revenue'] / segment_data['purchases']).mean(),
    
                # Engagement metrics
                'avg_sessions': segment_data['total_sessions'].mean(),
                'avg_engagement_time': segment_data['avg_engagement_seconds'].mean(),
                'conversion_rate': (segment_data['purchases'] / segment_data['total_sessions']).mean(),
    
                # Demographics
                'top_countries': segment_data['country'].value_counts().head(3).to_dict(),
                'device_breakdown': segment_data['primary_device'].value_counts(normalize=True).to_dict(),
                'top_channels': segment_data['first_medium'].value_counts().head(3).to_dict(),
    
                # Behavioral patterns
                'avg_customer_age_days': segment_data['customer_age_days'].mean(),
                'avg_recency_days': segment_data['recency_days'].mean(),
            }
    
            # Assign segment name based on characteristics
            profile['name'] = classify_segment(profile)
            profile['description'] = generate_segment_description(profile)
    
            profiles.append(profile)
    
        return profiles
    
    def classify_segment(profile):
        """Assign human-readable name to segment"""
    
        if profile['avg_ltv'] > 1000 and profile['avg_orders'] > 5:
            return "VIP Customers"
        elif profile['avg_ltv'] > 500 and profile['conversion_rate'] > 0.1:
            return "High-Value Regulars"
        elif profile['avg_orders'] > 3 and profile['avg_order_value'] < 100:
            return "Frequent Bargain Hunters"
        elif profile['avg_customer_age_days'] < 30:
            return "New Customers"
        elif profile['avg_recency_days'] > 60:
            return "At-Risk Customers"
        else:
            return "Occasional Shoppers"
    
    def generate_segment_description(profile):
        """Generate natural language description"""
    
        ltv = profile['avg_ltv']
        orders = profile['avg_orders']
        sessions = profile['avg_sessions']
        device = max(profile['device_breakdown'], key=profile['device_breakdown'].get)
    
        description = (
            f"This segment represents {profile['percentage']:.1f}% of customers. "
            f"They have an average lifetime value of ${ltv:.2f} across {orders:.1f} orders. "
            f"Typical engagement includes {sessions:.1f} sessions. "
            f"Primarily {device} users."
        )
    
        return description
    

    ICP Scoring

    Score leads based on similarity to best customers:

    class ICPScorer:
        def __init__(self, ideal_segment_features):
            self.ideal_features = ideal_segment_features
            self.scaler = StandardScaler()
            self.scaler.fit(ideal_segment_features)
    
        def score(self, lead_features):
            """Score lead based on similarity to ICP"""
    
            # Standardize features
            ideal_scaled = self.scaler.transform(self.ideal_features)
            lead_scaled = self.scaler.transform(lead_features.reshape(1, -1))
    
            # Calculate cosine similarity
            from sklearn.metrics.pairwise import cosine_similarity
    
            similarity = cosine_similarity(lead_scaled, ideal_scaled)[0]
    
            # Convert to 0-100 score
            icp_score = np.mean(similarity) * 100
    
            return {
                'score': icp_score,
                'tier': self._classify_tier(icp_score),
                'match_confidence': self._calculate_confidence(similarity)
            }
    
        def _classify_tier(self, score):
            """Classify lead tier"""
            if score >= 80:
                return 'A - Excellent Fit'
            elif score >= 60:
                return 'B - Good Fit'
            elif score >= 40:
                return 'C - Moderate Fit'
            else:
                return 'D - Poor Fit'
    
        def _calculate_confidence(self, similarity_scores):
            """Calculate confidence in score"""
            std = np.std(similarity_scores)
            if std < 0.1:
                return 'high'
            elif std < 0.2:
                return 'medium'
            else:
                return 'low'
    

    API Integration

    Get ICP Analysis

    Retrieve ICP analysis results:

    curl -X GET https://api.cogny.com/v1/warehouses/wh_123abc/icp-analysis \
      -H "Authorization: Bearer sk_live_abc123xyz789"
    

    Response:

    {
      "success": true,
      "data": {
        "analysis_id": "icp_xyz789",
        "warehouse_id": "wh_123abc",
        "created_at": "2025-02-17T10:30:00Z",
        "segments": [
          {
            "segment_id": 0,
            "name": "VIP Customers",
            "size": 1247,
            "percentage": 8.3,
            "avg_ltv": 2450.00,
            "avg_orders": 8.5,
            "avg_order_value": 288.24,
            "top_countries": ["United States", "United Kingdom", "Canada"],
            "device_breakdown": {"desktop": 0.65, "mobile": 0.35},
            "top_channels": ["organic", "cpc", "email"],
            "description": "High-value customers with strong engagement..."
          }
        ],
        "recommendations": [
          {
            "segment": "VIP Customers",
            "priority": "high",
            "action": "Create lookalike audiences for paid acquisition",
            "expected_impact": "25-35% increase in high-value customer acquisition"
          }
        ]
      }
    }
    

    Score a Lead

    Score a potential customer against ICP:

    curl -X POST https://api.cogny.com/v1/warehouses/wh_123abc/icp-score \
      -H "Authorization: Bearer sk_live_abc123xyz789" \
      -d '{
        "features": {
          "country": "United States",
          "device": "desktop",
          "acquisition_channel": "organic",
          "session_count": 5,
          "page_views": 25,
          "engagement_time_seconds": 450
        }
      }'
    

    Response:

    {
      "success": true,
      "data": {
        "score": 87.5,
        "tier": "A - Excellent Fit",
        "match_confidence": "high",
        "matched_segment": "VIP Customers",
        "probability_high_value": 0.78,
        "predicted_ltv": 1850.00,
        "recommendations": [
          "Prioritize for sales outreach",
          "Offer premium onboarding",
          "Target with high-value product recommendations"
        ]
      }
    }
    

    Performance Optimization

    Query Optimization

    Optimize data aggregation queries:

    -- Use partitioning and clustering
    SELECT *
    FROM `project.analytics_123456789.events_*`
    WHERE _TABLE_SUFFIX BETWEEN '20250101' AND '20250217'  -- Partition pruning
      AND event_name IN ('purchase', 'add_to_cart')  -- Cluster pruning
    

    Caching Strategy

    Cache ICP analysis results:

    from functools import lru_cache
    import hashlib
    
    @lru_cache(maxsize=100)
    def get_icp_analysis(warehouse_id, date_range_hash):
        """Cached ICP analysis retrieval"""
        # Fetch from database or recompute
        pass
    
    # Use hash of date range for cache key
    date_range_hash = hashlib.md5(
        f"{start_date}_{end_date}".encode()
    ).hexdigest()
    
    analysis = get_icp_analysis(warehouse_id, date_range_hash)
    

    Next Steps

    Resources

    Need Implementation Help?

    Talk to Our Technical Team

    Schedule a technical consultation to discuss your integration requirements and implementation strategy.

    Schedule Demo