Back to Documentation
    DocumentationfeaturesFeb 17, 2025

    ICP Analysis Technical Overview

    Deep dive into Cogny's Ideal Customer Profile analysis system, including machine learning algorithms, data processing pipelines, and implementation architecture.

    Overview

    Cogny's ICP (Ideal Customer Profile) Analysis uses machine learning to identify patterns in your highest-value customers, enabling data-driven targeting and acquisition strategies.

    Key Features:

    • Automated customer segmentation
    • Behavioral pattern recognition
    • Predictive lifetime value modeling
    • Channel effectiveness analysis
    • Geographic and demographic profiling

    Architecture

    System Components

    ┌─────────────────────────────────────────────────────────────┐
    │ Data Sources │
    ├─────────────────┬──────────────┬─────────────┬──────────────┤
    │ GA4 Events │ E-commerce │ User Props │ Ad Platforms│
    └────────┬────────┴──────┬───────┴──────┬──────┴──────┬───────┘
    │ │ │ │
    └────────────────┴──────────────┴─────────────┘

    ┌─────────▼──────────┐
    │ Data Aggregation │
    │ ETL Pipeline │
    └─────────┬──────────┘

    ┌────────────────┴────────────────┐
    │ │
    ┌────▼────────┐ ┌─────────▼────────┐
    │ Feature │ │ Data Cleaning │
    │ Engineering │ │ Normalization │
    └────┬────────┘ └─────────┬────────┘
    │ │
    └────────────────┬─────────────────┘

    ┌─────────▼──────────┐
    │ ML Pipeline │
    │ - Clustering │
    │ - Classification │
    │ - LTV Prediction │
    └─────────┬──────────┘

    ┌────────────────┴────────────────┐
    │ │
    ┌────▼────────┐ ┌─────────▼────────┐
    │ Segment │ │ ICP Profiles │
    │ Generation │ │ Scoring │
    └────┬────────┘ └─────────┬────────┘
    │ │
    └────────────────┬─────────────────┘

    ┌─────────▼──────────┐
    │ Results Storage │
    │ & API Layer │
    └────────────────────┘

    Data Collection

    User Events

    GA4 events aggregated for ICP analysis:

    -- Aggregate user behavior features
    WITH user_behavior AS (
    SELECT
    user_pseudo_id,
    COUNT(DISTINCT DATE(TIMESTAMP_MICROS(event_timestamp))) as days_active,
    COUNT(DISTINCT CONCAT(user_pseudo_id,
    (SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'ga_session_id'))) as total_sessions,
    COUNT(DISTINCT CASE WHEN event_name = 'page_view' THEN event_timestamp END) as page_views,
    COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN event_timestamp END) as purchases,
    SUM(CASE WHEN event_name = 'purchase' THEN ecommerce.purchase_revenue_in_usd END) as total_revenue,
    AVG((SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'engagement_time_msec')) / 1000 as avg_engagement_seconds,
    MAX(TIMESTAMP_MICROS(event_timestamp)) as last_activity,
    MIN(TIMESTAMP_MICROS(event_timestamp)) as first_activity
    FROM `project.analytics_123456789.events_*`
    WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
    AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
    GROUP BY user_pseudo_id
    )

    SELECT
    *,
    TIMESTAMP_DIFF(last_activity, first_activity, DAY) as customer_age_days,
    SAFE_DIVIDE(total_revenue, purchases) as avg_order_value,
    SAFE_DIVIDE(page_views, total_sessions) as pages_per_session
    FROM user_behavior
    WHERE purchases > 0 -- Focus on converted users

    User Attributes

    Extract demographic and technical attributes:

    -- Collect user attributes
    SELECT
    user_pseudo_id,
    -- Device attributes
    ARRAY_AGG(device.category IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as primary_device,
    ARRAY_AGG(device.operating_system IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as operating_system,
    ARRAY_AGG(device.browser IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as browser,

    -- Geographic attributes
    ARRAY_AGG(geo.country IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as country,
    ARRAY_AGG(geo.region IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as region,
    ARRAY_AGG(geo.city IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as city,

    -- Acquisition attributes
    ARRAY_AGG(traffic_source.source IGNORE NULLS ORDER BY event_timestamp LIMIT 1)[OFFSET(0)] as first_source,
    ARRAY_AGG(traffic_source.medium IGNORE NULLS ORDER BY event_timestamp LIMIT 1)[OFFSET(0)] as first_medium,

    -- Custom user properties
    ARRAY_AGG((SELECT value.string_value FROM UNNEST(user_properties) WHERE key = 'user_type') IGNORE NULLS LIMIT 1)[OFFSET(0)] as user_type,
    ARRAY_AGG((SELECT value.string_value FROM UNNEST(user_properties) WHERE key = 'plan_level') IGNORE NULLS LIMIT 1)[OFFSET(0)] as plan_level

    FROM `project.analytics_123456789.events_*`
    WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
    AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
    GROUP BY user_pseudo_id

    Product Interactions

    Track product engagement patterns:

    -- Product interaction patterns
    SELECT
    user_pseudo_id,
    ARRAY_AGG(DISTINCT item.item_category IGNORE NULLS) as categories_viewed,
    COUNT(DISTINCT item.item_id) as unique_products_viewed,
    COUNT(DISTINCT CASE WHEN event_name = 'add_to_cart' THEN item.item_id END) as products_added_to_cart,
    COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN item.item_id END) as products_purchased,
    AVG(CASE WHEN event_name = 'purchase' THEN item.price_in_usd END) as avg_product_price
    FROM `project.analytics_123456789.events_*`,
    UNNEST(items) as item
    WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
    AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
    GROUP BY user_pseudo_id

    Feature Engineering

    Behavioral Features

    Transform raw events into ML-ready features:

    import pandas as pd
    import numpy as np

    def engineer_behavioral_features(df):
    """Create behavioral features from user data"""

    features = pd.DataFrame()

    # Engagement metrics
    features['recency_days'] = (pd.Timestamp.now() - df['last_activity']).dt.days
    features['frequency_score'] = np.log1p(df['total_sessions'])
    features['monetary_value'] = np.log1p(df['total_revenue'])

    # RFM score (Recency, Frequency, Monetary)
    features['rfm_score'] = (
    normalize_score(features['recency_days'], reverse=True) +
    normalize_score(features['frequency_score']) +
    normalize_score(features['monetary_value'])
    )

    # Engagement metrics
    features['engagement_intensity'] = df['page_views'] / df['days_active']
    features['purchase_conversion_rate'] = df['purchases'] / df['total_sessions']
    features['session_frequency'] = df['total_sessions'] / df['customer_age_days']

    # Value metrics
    features['customer_lifetime_value'] = df['total_revenue']
    features['avg_order_value'] = df['total_revenue'] / df['purchases']
    features['revenue_per_session'] = df['total_revenue'] / df['total_sessions']

    return features

    def normalize_score(series, reverse=False):
    """Normalize to 0-100 scale"""
    min_val = series.min()
    max_val = series.max()

    if reverse:
    return 100 * (max_val - series) / (max_val - min_val)
    else:
    return 100 * (series - min_val) / (max_val - min_val)

    Demographic Features

    Encode categorical attributes:

    from sklearn.preprocessing import LabelEncoder, OneHotEncoder

    def engineer_demographic_features(df):
    """Create demographic features"""

    features = pd.DataFrame()

    # Device features
    features['is_mobile'] = (df['primary_device'] == 'mobile').astype(int)
    features['is_desktop'] = (df['primary_device'] == 'desktop').astype(int)

    # Geographic features (one-hot encoding for top countries)
    top_countries = df['country'].value_counts().head(10).index
    for country in top_countries:
    features[f'country_{country.lower()}'] = (df['country'] == country).astype(int)

    # Acquisition channel features
    acquisition_mapping = {
    'organic': 'organic',
    'cpc': 'paid',
    'social': 'social',
    'referral': 'referral',
    'email': 'email',
    '(none)': 'direct',
    '(direct)': 'direct'
    }
    df['acquisition_channel'] = df['first_medium'].map(acquisition_mapping).fillna('other')

    # One-hot encode acquisition channels
    channel_dummies = pd.get_dummies(df['acquisition_channel'], prefix='channel')
    features = pd.concat([features, channel_dummies], axis=1)

    return features

    Time-Based Features

    Capture temporal patterns:

    def engineer_temporal_features(df):
    """Create time-based features"""

    features = pd.DataFrame()

    # Customer lifecycle stage
    features['customer_age_weeks'] = df['customer_age_days'] / 7
    features['is_new_customer'] = (df['customer_age_days'] <= 30).astype(int)
    features['is_returning_customer'] = (df['purchases'] > 1).astype(int)

    # Activity patterns
    features['days_since_last_purchase'] = (
    pd.Timestamp.now() - df['last_purchase_date']
    ).dt.days
    features['purchase_frequency_weeks'] = (
    df['purchases'] / features['customer_age_weeks']
    )

    # Churn risk indicators
    features['days_inactive'] = (pd.Timestamp.now() - df['last_activity']).dt.days
    features['is_at_risk'] = (features['days_inactive'] > 30).astype(int)

    return features

    Machine Learning Pipeline

    Clustering Algorithm

    K-means clustering to identify customer segments:

    from sklearn.cluster import KMeans
    from sklearn.preprocessing import StandardScaler
    from sklearn.decomposition import PCA

    class ICPClustering:
    def __init__(self, n_clusters=5):
    self.n_clusters = n_clusters
    self.scaler = StandardScaler()
    self.pca = PCA(n_components=0.95) # Retain 95% variance
    self.kmeans = KMeans(
    n_clusters=n_clusters,
    init='k-means++',
    n_init=10,
    max_iter=300,
    random_state=42
    )

    def fit(self, X):
    """Fit clustering model"""
    # Standardize features
    X_scaled = self.scaler.fit_transform(X)

    # Dimensionality reduction
    X_pca = self.pca.fit_transform(X_scaled)

    # Cluster
    self.kmeans.fit(X_pca)

    return self

    def predict(self, X):
    """Predict cluster membership"""
    X_scaled = self.scaler.transform(X)
    X_pca = self.pca.transform(X_scaled)
    return self.kmeans.predict(X_pca)

    def get_cluster_centers(self):
    """Get cluster centers in original feature space"""
    centers_pca = self.kmeans.cluster_centers_
    centers_scaled = self.pca.inverse_transform(centers_pca)
    centers_original = self.scaler.inverse_transform(centers_scaled)
    return centers_original

    Optimal Cluster Selection

    Use elbow method and silhouette score:

    from sklearn.metrics import silhouette_score
    import matplotlib.pyplot as plt

    def find_optimal_clusters(X, max_clusters=10):
    """Find optimal number of clusters"""

    inertias = []
    silhouette_scores = []

    for k in range(2, max_clusters + 1):
    model = ICPClustering(n_clusters=k)
    model.fit(X)

    labels = model.predict(X)

    inertias.append(model.kmeans.inertia_)
    silhouette_scores.append(silhouette_score(X, labels))

    # Find elbow point
    optimal_k = find_elbow(inertias) + 2 # +2 because range starts at 2

    return optimal_k, inertias, silhouette_scores

    def find_elbow(inertias):
    """Find elbow point using knee detection"""
    from kneed import KneeLocator

    kl = KneeLocator(
    range(2, len(inertias) + 2),
    inertias,
    curve='convex',
    direction='decreasing'
    )

    return kl.elbow if kl.elbow else 5 # Default to 5

    Lifetime Value Prediction

    Predict future customer value:

    from sklearn.ensemble import GradientBoostingRegressor
    from sklearn.model_selection import cross_val_score

    class LTVPredictor:
    def __init__(self):
    self.model = GradientBoostingRegressor(
    n_estimators=100,
    learning_rate=0.1,
    max_depth=5,
    random_state=42
    )

    def train(self, X, y):
    """Train LTV prediction model"""
    # Log-transform target for better prediction
    y_log = np.log1p(y)

    # Cross-validation
    cv_scores = cross_val_score(
    self.model, X, y_log,
    cv=5,
    scoring='neg_mean_squared_error'
    )
    print(f"CV RMSE: {np.sqrt(-cv_scores.mean()):.2f}")

    # Train final model
    self.model.fit(X, y_log)

    return self

    def predict(self, X):
    """Predict LTV"""
    y_log_pred = self.model.predict(X)
    return np.expm1(y_log_pred) # Inverse log transform

    def feature_importance(self, feature_names):
    """Get feature importance"""
    importance = pd.DataFrame({
    'feature': feature_names,
    'importance': self.model.feature_importances_
    }).sort_values('importance', ascending=False)

    return importance

    Segment Analysis

    Profile Generation

    Generate human-readable segment profiles:

    def generate_segment_profiles(df, cluster_labels):
    """Generate descriptive profiles for each segment"""

    df['segment'] = cluster_labels

    profiles = []

    for segment_id in sorted(df['segment'].unique()):
    segment_data = df[df['segment'] == segment_id]

    profile = {
    'segment_id': segment_id,
    'size': len(segment_data),
    'percentage': len(segment_data) / len(df) * 100,

    # Value metrics
    'avg_ltv': segment_data['total_revenue'].mean(),
    'avg_orders': segment_data['purchases'].mean(),
    'avg_order_value': (segment_data['total_revenue'] / segment_data['purchases']).mean(),

    # Engagement metrics
    'avg_sessions': segment_data['total_sessions'].mean(),
    'avg_engagement_time': segment_data['avg_engagement_seconds'].mean(),
    'conversion_rate': (segment_data['purchases'] / segment_data['total_sessions']).mean(),

    # Demographics
    'top_countries': segment_data['country'].value_counts().head(3).to_dict(),
    'device_breakdown': segment_data['primary_device'].value_counts(normalize=True).to_dict(),
    'top_channels': segment_data['first_medium'].value_counts().head(3).to_dict(),

    # Behavioral patterns
    'avg_customer_age_days': segment_data['customer_age_days'].mean(),
    'avg_recency_days': segment_data['recency_days'].mean(),
    }

    # Assign segment name based on characteristics
    profile['name'] = classify_segment(profile)
    profile['description'] = generate_segment_description(profile)

    profiles.append(profile)

    return profiles

    def classify_segment(profile):
    """Assign human-readable name to segment"""

    if profile['avg_ltv'] > 1000 and profile['avg_orders'] > 5:
    return "VIP Customers"
    elif profile['avg_ltv'] > 500 and profile['conversion_rate'] > 0.1:
    return "High-Value Regulars"
    elif profile['avg_orders'] > 3 and profile['avg_order_value'] < 100:
    return "Frequent Bargain Hunters"
    elif profile['avg_customer_age_days'] < 30:
    return "New Customers"
    elif profile['avg_recency_days'] > 60:
    return "At-Risk Customers"
    else:
    return "Occasional Shoppers"

    def generate_segment_description(profile):
    """Generate natural language description"""

    ltv = profile['avg_ltv']
    orders = profile['avg_orders']
    sessions = profile['avg_sessions']
    device = max(profile['device_breakdown'], key=profile['device_breakdown'].get)

    description = (
    f"This segment represents {profile['percentage']:.1f}% of customers. "
    f"They have an average lifetime value of ${ltv:.2f} across {orders:.1f} orders. "
    f"Typical engagement includes {sessions:.1f} sessions. "
    f"Primarily {device} users."
    )

    return description

    ICP Scoring

    Score leads based on similarity to best customers:

    class ICPScorer:
    def __init__(self, ideal_segment_features):
    self.ideal_features = ideal_segment_features
    self.scaler = StandardScaler()
    self.scaler.fit(ideal_segment_features)

    def score(self, lead_features):
    """Score lead based on similarity to ICP"""

    # Standardize features
    ideal_scaled = self.scaler.transform(self.ideal_features)
    lead_scaled = self.scaler.transform(lead_features.reshape(1, -1))

    # Calculate cosine similarity
    from sklearn.metrics.pairwise import cosine_similarity

    similarity = cosine_similarity(lead_scaled, ideal_scaled)[0]

    # Convert to 0-100 score
    icp_score = np.mean(similarity) * 100

    return {
    'score': icp_score,
    'tier': self._classify_tier(icp_score),
    'match_confidence': self._calculate_confidence(similarity)
    }

    def _classify_tier(self, score):
    """Classify lead tier"""
    if score >= 80:
    return 'A - Excellent Fit'
    elif score >= 60:
    return 'B - Good Fit'
    elif score >= 40:
    return 'C - Moderate Fit'
    else:
    return 'D - Poor Fit'

    def _calculate_confidence(self, similarity_scores):
    """Calculate confidence in score"""
    std = np.std(similarity_scores)
    if std < 0.1:
    return 'high'
    elif std < 0.2:
    return 'medium'
    else:
    return 'low'

    API Integration

    Get ICP Analysis

    Retrieve ICP analysis results:

    curl -X GET https://api.cogny.com/v1/warehouses/wh_123abc/icp-analysis \
    -H "Authorization: Bearer sk_live_abc123xyz789"

    Response:

    {
    "success": true,
    "data": {
    "analysis_id": "icp_xyz789",
    "warehouse_id": "wh_123abc",
    "created_at": "2025-02-17T10:30:00Z",
    "segments": [
    {
    "segment_id": 0,
    "name": "VIP Customers",
    "size": 1247,
    "percentage": 8.3,
    "avg_ltv": 2450.00,
    "avg_orders": 8.5,
    "avg_order_value": 288.24,
    "top_countries": ["United States", "United Kingdom", "Canada"],
    "device_breakdown": {"desktop": 0.65, "mobile": 0.35},
    "top_channels": ["organic", "cpc", "email"],
    "description": "High-value customers with strong engagement..."
    }
    ],
    "recommendations": [
    {
    "segment": "VIP Customers",
    "priority": "high",
    "action": "Create lookalike audiences for paid acquisition",
    "expected_impact": "25-35% increase in high-value customer acquisition"
    }
    ]
    }
    }

    Score a Lead

    Score a potential customer against ICP:

    curl -X POST https://api.cogny.com/v1/warehouses/wh_123abc/icp-score \
    -H "Authorization: Bearer sk_live_abc123xyz789" \
    -d '{
    "features": {
    "country": "United States",
    "device": "desktop",
    "acquisition_channel": "organic",
    "session_count": 5,
    "page_views": 25,
    "engagement_time_seconds": 450
    }
    }'

    Response:

    {
    "success": true,
    "data": {
    "score": 87.5,
    "tier": "A - Excellent Fit",
    "match_confidence": "high",
    "matched_segment": "VIP Customers",
    "probability_high_value": 0.78,
    "predicted_ltv": 1850.00,
    "recommendations": [
    "Prioritize for sales outreach",
    "Offer premium onboarding",
    "Target with high-value product recommendations"
    ]
    }
    }

    Performance Optimization

    Query Optimization

    Optimize data aggregation queries:

    -- Use partitioning and clustering
    SELECT *
    FROM `project.analytics_123456789.events_*`
    WHERE _TABLE_SUFFIX BETWEEN '20250101' AND '20250217' -- Partition pruning
    AND event_name IN ('purchase', 'add_to_cart') -- Cluster pruning

    Caching Strategy

    Cache ICP analysis results:

    from functools import lru_cache
    import hashlib

    @lru_cache(maxsize=100)
    def get_icp_analysis(warehouse_id, date_range_hash):
    """Cached ICP analysis retrieval"""
    # Fetch from database or recompute
    pass

    # Use hash of date range for cache key
    date_range_hash = hashlib.md5(
    f"{start_date}_{end_date}".encode()
    ).hexdigest()

    analysis = get_icp_analysis(warehouse_id, date_range_hash)

    Next Steps

    Resources

    Need Implementation Help?

    Talk to Our Technical Team

    Schedule a technical consultation to discuss your integration requirements and implementation strategy.

    Schedule Demo