ICP Analysis Technical Overview
Deep dive into Cogny's Ideal Customer Profile analysis system, including machine learning algorithms, data processing pipelines, and implementation architecture.
Overview
Cogny's ICP (Ideal Customer Profile) Analysis uses machine learning to identify patterns in your highest-value customers, enabling data-driven targeting and acquisition strategies.
Key Features:
- Automated customer segmentation
- Behavioral pattern recognition
- Predictive lifetime value modeling
- Channel effectiveness analysis
- Geographic and demographic profiling
Architecture
System Components
┌─────────────────────────────────────────────────────────────┐
│ Data Sources │
├─────────────────┬──────────────┬─────────────┬──────────────┤
│ GA4 Events │ E-commerce │ User Props │ Ad Platforms│
└────────┬────────┴──────┬───────┴──────┬──────┴──────┬───────┘
│ │ │ │
└────────────────┴──────────────┴─────────────┘
│
┌─────────▼──────────┐
│ Data Aggregation │
│ ETL Pipeline │
└─────────┬──────────┘
│
┌────────────────┴────────────────┐
│ │
┌────▼────────┐ ┌─────────▼────────┐
│ Feature │ │ Data Cleaning │
│ Engineering │ │ Normalization │
└────┬────────┘ └─────────┬────────┘
│ │
└────────────────┬─────────────────┘
│
┌─────────▼──────────┐
│ ML Pipeline │
│ - Clustering │
│ - Classification │
│ - LTV Prediction │
└─────────┬──────────┘
│
┌────────────────┴────────────────┐
│ │
┌────▼────────┐ ┌─────────▼────────┐
│ Segment │ │ ICP Profiles │
│ Generation │ │ Scoring │
└────┬────────┘ └─────────┬────────┘
│ │
└────────────────┬─────────────────┘
│
┌─────────▼──────────┐
│ Results Storage │
│ & API Layer │
└────────────────────┘
Data Collection
User Events
GA4 events aggregated for ICP analysis:
-- Aggregate user behavior features
WITH user_behavior AS (
SELECT
user_pseudo_id,
COUNT(DISTINCT DATE(TIMESTAMP_MICROS(event_timestamp))) as days_active,
COUNT(DISTINCT CONCAT(user_pseudo_id,
(SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'ga_session_id'))) as total_sessions,
COUNT(DISTINCT CASE WHEN event_name = 'page_view' THEN event_timestamp END) as page_views,
COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN event_timestamp END) as purchases,
SUM(CASE WHEN event_name = 'purchase' THEN ecommerce.purchase_revenue_in_usd END) as total_revenue,
AVG((SELECT value.int_value FROM UNNEST(event_params) WHERE key = 'engagement_time_msec')) / 1000 as avg_engagement_seconds,
MAX(TIMESTAMP_MICROS(event_timestamp)) as last_activity,
MIN(TIMESTAMP_MICROS(event_timestamp)) as first_activity
FROM `project.analytics_123456789.events_*`
WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
GROUP BY user_pseudo_id
)
SELECT
*,
TIMESTAMP_DIFF(last_activity, first_activity, DAY) as customer_age_days,
SAFE_DIVIDE(total_revenue, purchases) as avg_order_value,
SAFE_DIVIDE(page_views, total_sessions) as pages_per_session
FROM user_behavior
WHERE purchases > 0 -- Focus on converted users
User Attributes
Extract demographic and technical attributes:
-- Collect user attributes
SELECT
user_pseudo_id,
-- Device attributes
ARRAY_AGG(device.category IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as primary_device,
ARRAY_AGG(device.operating_system IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as operating_system,
ARRAY_AGG(device.browser IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as browser,
-- Geographic attributes
ARRAY_AGG(geo.country IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as country,
ARRAY_AGG(geo.region IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as region,
ARRAY_AGG(geo.city IGNORE NULLS ORDER BY event_timestamp DESC LIMIT 1)[OFFSET(0)] as city,
-- Acquisition attributes
ARRAY_AGG(traffic_source.source IGNORE NULLS ORDER BY event_timestamp LIMIT 1)[OFFSET(0)] as first_source,
ARRAY_AGG(traffic_source.medium IGNORE NULLS ORDER BY event_timestamp LIMIT 1)[OFFSET(0)] as first_medium,
-- Custom user properties
ARRAY_AGG((SELECT value.string_value FROM UNNEST(user_properties) WHERE key = 'user_type') IGNORE NULLS LIMIT 1)[OFFSET(0)] as user_type,
ARRAY_AGG((SELECT value.string_value FROM UNNEST(user_properties) WHERE key = 'plan_level') IGNORE NULLS LIMIT 1)[OFFSET(0)] as plan_level
FROM `project.analytics_123456789.events_*`
WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
GROUP BY user_pseudo_id
Product Interactions
Track product engagement patterns:
-- Product interaction patterns
SELECT
user_pseudo_id,
ARRAY_AGG(DISTINCT item.item_category IGNORE NULLS) as categories_viewed,
COUNT(DISTINCT item.item_id) as unique_products_viewed,
COUNT(DISTINCT CASE WHEN event_name = 'add_to_cart' THEN item.item_id END) as products_added_to_cart,
COUNT(DISTINCT CASE WHEN event_name = 'purchase' THEN item.item_id END) as products_purchased,
AVG(CASE WHEN event_name = 'purchase' THEN item.price_in_usd END) as avg_product_price
FROM `project.analytics_123456789.events_*`,
UNNEST(items) as item
WHERE _TABLE_SUFFIX BETWEEN FORMAT_DATE('%Y%m%d', DATE_SUB(CURRENT_DATE(), INTERVAL 90 DAY))
AND FORMAT_DATE('%Y%m%d', CURRENT_DATE() - 1)
GROUP BY user_pseudo_id
Feature Engineering
Behavioral Features
Transform raw events into ML-ready features:
import pandas as pd
import numpy as np
def engineer_behavioral_features(df):
"""Create behavioral features from user data"""
features = pd.DataFrame()
# Engagement metrics
features['recency_days'] = (pd.Timestamp.now() - df['last_activity']).dt.days
features['frequency_score'] = np.log1p(df['total_sessions'])
features['monetary_value'] = np.log1p(df['total_revenue'])
# RFM score (Recency, Frequency, Monetary)
features['rfm_score'] = (
normalize_score(features['recency_days'], reverse=True) +
normalize_score(features['frequency_score']) +
normalize_score(features['monetary_value'])
)
# Engagement metrics
features['engagement_intensity'] = df['page_views'] / df['days_active']
features['purchase_conversion_rate'] = df['purchases'] / df['total_sessions']
features['session_frequency'] = df['total_sessions'] / df['customer_age_days']
# Value metrics
features['customer_lifetime_value'] = df['total_revenue']
features['avg_order_value'] = df['total_revenue'] / df['purchases']
features['revenue_per_session'] = df['total_revenue'] / df['total_sessions']
return features
def normalize_score(series, reverse=False):
"""Normalize to 0-100 scale"""
min_val = series.min()
max_val = series.max()
if reverse:
return 100 * (max_val - series) / (max_val - min_val)
else:
return 100 * (series - min_val) / (max_val - min_val)
Demographic Features
Encode categorical attributes:
from sklearn.preprocessing import LabelEncoder, OneHotEncoder
def engineer_demographic_features(df):
"""Create demographic features"""
features = pd.DataFrame()
# Device features
features['is_mobile'] = (df['primary_device'] == 'mobile').astype(int)
features['is_desktop'] = (df['primary_device'] == 'desktop').astype(int)
# Geographic features (one-hot encoding for top countries)
top_countries = df['country'].value_counts().head(10).index
for country in top_countries:
features[f'country_{country.lower()}'] = (df['country'] == country).astype(int)
# Acquisition channel features
acquisition_mapping = {
'organic': 'organic',
'cpc': 'paid',
'social': 'social',
'referral': 'referral',
'email': 'email',
'(none)': 'direct',
'(direct)': 'direct'
}
df['acquisition_channel'] = df['first_medium'].map(acquisition_mapping).fillna('other')
# One-hot encode acquisition channels
channel_dummies = pd.get_dummies(df['acquisition_channel'], prefix='channel')
features = pd.concat([features, channel_dummies], axis=1)
return features
Time-Based Features
Capture temporal patterns:
def engineer_temporal_features(df):
"""Create time-based features"""
features = pd.DataFrame()
# Customer lifecycle stage
features['customer_age_weeks'] = df['customer_age_days'] / 7
features['is_new_customer'] = (df['customer_age_days'] <= 30).astype(int)
features['is_returning_customer'] = (df['purchases'] > 1).astype(int)
# Activity patterns
features['days_since_last_purchase'] = (
pd.Timestamp.now() - df['last_purchase_date']
).dt.days
features['purchase_frequency_weeks'] = (
df['purchases'] / features['customer_age_weeks']
)
# Churn risk indicators
features['days_inactive'] = (pd.Timestamp.now() - df['last_activity']).dt.days
features['is_at_risk'] = (features['days_inactive'] > 30).astype(int)
return features
Machine Learning Pipeline
Clustering Algorithm
K-means clustering to identify customer segments:
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
class ICPClustering:
def __init__(self, n_clusters=5):
self.n_clusters = n_clusters
self.scaler = StandardScaler()
self.pca = PCA(n_components=0.95) # Retain 95% variance
self.kmeans = KMeans(
n_clusters=n_clusters,
init='k-means++',
n_init=10,
max_iter=300,
random_state=42
)
def fit(self, X):
"""Fit clustering model"""
# Standardize features
X_scaled = self.scaler.fit_transform(X)
# Dimensionality reduction
X_pca = self.pca.fit_transform(X_scaled)
# Cluster
self.kmeans.fit(X_pca)
return self
def predict(self, X):
"""Predict cluster membership"""
X_scaled = self.scaler.transform(X)
X_pca = self.pca.transform(X_scaled)
return self.kmeans.predict(X_pca)
def get_cluster_centers(self):
"""Get cluster centers in original feature space"""
centers_pca = self.kmeans.cluster_centers_
centers_scaled = self.pca.inverse_transform(centers_pca)
centers_original = self.scaler.inverse_transform(centers_scaled)
return centers_original
Optimal Cluster Selection
Use elbow method and silhouette score:
from sklearn.metrics import silhouette_score
import matplotlib.pyplot as plt
def find_optimal_clusters(X, max_clusters=10):
"""Find optimal number of clusters"""
inertias = []
silhouette_scores = []
for k in range(2, max_clusters + 1):
model = ICPClustering(n_clusters=k)
model.fit(X)
labels = model.predict(X)
inertias.append(model.kmeans.inertia_)
silhouette_scores.append(silhouette_score(X, labels))
# Find elbow point
optimal_k = find_elbow(inertias) + 2 # +2 because range starts at 2
return optimal_k, inertias, silhouette_scores
def find_elbow(inertias):
"""Find elbow point using knee detection"""
from kneed import KneeLocator
kl = KneeLocator(
range(2, len(inertias) + 2),
inertias,
curve='convex',
direction='decreasing'
)
return kl.elbow if kl.elbow else 5 # Default to 5
Lifetime Value Prediction
Predict future customer value:
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import cross_val_score
class LTVPredictor:
def __init__(self):
self.model = GradientBoostingRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=5,
random_state=42
)
def train(self, X, y):
"""Train LTV prediction model"""
# Log-transform target for better prediction
y_log = np.log1p(y)
# Cross-validation
cv_scores = cross_val_score(
self.model, X, y_log,
cv=5,
scoring='neg_mean_squared_error'
)
print(f"CV RMSE: {np.sqrt(-cv_scores.mean()):.2f}")
# Train final model
self.model.fit(X, y_log)
return self
def predict(self, X):
"""Predict LTV"""
y_log_pred = self.model.predict(X)
return np.expm1(y_log_pred) # Inverse log transform
def feature_importance(self, feature_names):
"""Get feature importance"""
importance = pd.DataFrame({
'feature': feature_names,
'importance': self.model.feature_importances_
}).sort_values('importance', ascending=False)
return importance
Segment Analysis
Profile Generation
Generate human-readable segment profiles:
def generate_segment_profiles(df, cluster_labels):
"""Generate descriptive profiles for each segment"""
df['segment'] = cluster_labels
profiles = []
for segment_id in sorted(df['segment'].unique()):
segment_data = df[df['segment'] == segment_id]
profile = {
'segment_id': segment_id,
'size': len(segment_data),
'percentage': len(segment_data) / len(df) * 100,
# Value metrics
'avg_ltv': segment_data['total_revenue'].mean(),
'avg_orders': segment_data['purchases'].mean(),
'avg_order_value': (segment_data['total_revenue'] / segment_data['purchases']).mean(),
# Engagement metrics
'avg_sessions': segment_data['total_sessions'].mean(),
'avg_engagement_time': segment_data['avg_engagement_seconds'].mean(),
'conversion_rate': (segment_data['purchases'] / segment_data['total_sessions']).mean(),
# Demographics
'top_countries': segment_data['country'].value_counts().head(3).to_dict(),
'device_breakdown': segment_data['primary_device'].value_counts(normalize=True).to_dict(),
'top_channels': segment_data['first_medium'].value_counts().head(3).to_dict(),
# Behavioral patterns
'avg_customer_age_days': segment_data['customer_age_days'].mean(),
'avg_recency_days': segment_data['recency_days'].mean(),
}
# Assign segment name based on characteristics
profile['name'] = classify_segment(profile)
profile['description'] = generate_segment_description(profile)
profiles.append(profile)
return profiles
def classify_segment(profile):
"""Assign human-readable name to segment"""
if profile['avg_ltv'] > 1000 and profile['avg_orders'] > 5:
return "VIP Customers"
elif profile['avg_ltv'] > 500 and profile['conversion_rate'] > 0.1:
return "High-Value Regulars"
elif profile['avg_orders'] > 3 and profile['avg_order_value'] < 100:
return "Frequent Bargain Hunters"
elif profile['avg_customer_age_days'] < 30:
return "New Customers"
elif profile['avg_recency_days'] > 60:
return "At-Risk Customers"
else:
return "Occasional Shoppers"
def generate_segment_description(profile):
"""Generate natural language description"""
ltv = profile['avg_ltv']
orders = profile['avg_orders']
sessions = profile['avg_sessions']
device = max(profile['device_breakdown'], key=profile['device_breakdown'].get)
description = (
f"This segment represents {profile['percentage']:.1f}% of customers. "
f"They have an average lifetime value of ${ltv:.2f} across {orders:.1f} orders. "
f"Typical engagement includes {sessions:.1f} sessions. "
f"Primarily {device} users."
)
return description
ICP Scoring
Score leads based on similarity to best customers:
class ICPScorer:
def __init__(self, ideal_segment_features):
self.ideal_features = ideal_segment_features
self.scaler = StandardScaler()
self.scaler.fit(ideal_segment_features)
def score(self, lead_features):
"""Score lead based on similarity to ICP"""
# Standardize features
ideal_scaled = self.scaler.transform(self.ideal_features)
lead_scaled = self.scaler.transform(lead_features.reshape(1, -1))
# Calculate cosine similarity
from sklearn.metrics.pairwise import cosine_similarity
similarity = cosine_similarity(lead_scaled, ideal_scaled)[0]
# Convert to 0-100 score
icp_score = np.mean(similarity) * 100
return {
'score': icp_score,
'tier': self._classify_tier(icp_score),
'match_confidence': self._calculate_confidence(similarity)
}
def _classify_tier(self, score):
"""Classify lead tier"""
if score >= 80:
return 'A - Excellent Fit'
elif score >= 60:
return 'B - Good Fit'
elif score >= 40:
return 'C - Moderate Fit'
else:
return 'D - Poor Fit'
def _calculate_confidence(self, similarity_scores):
"""Calculate confidence in score"""
std = np.std(similarity_scores)
if std < 0.1:
return 'high'
elif std < 0.2:
return 'medium'
else:
return 'low'
API Integration
Get ICP Analysis
Retrieve ICP analysis results:
curl -X GET https://api.cogny.com/v1/warehouses/wh_123abc/icp-analysis \
-H "Authorization: Bearer sk_live_abc123xyz789"
Response:
{
"success": true,
"data": {
"analysis_id": "icp_xyz789",
"warehouse_id": "wh_123abc",
"created_at": "2025-02-17T10:30:00Z",
"segments": [
{
"segment_id": 0,
"name": "VIP Customers",
"size": 1247,
"percentage": 8.3,
"avg_ltv": 2450.00,
"avg_orders": 8.5,
"avg_order_value": 288.24,
"top_countries": ["United States", "United Kingdom", "Canada"],
"device_breakdown": {"desktop": 0.65, "mobile": 0.35},
"top_channels": ["organic", "cpc", "email"],
"description": "High-value customers with strong engagement..."
}
],
"recommendations": [
{
"segment": "VIP Customers",
"priority": "high",
"action": "Create lookalike audiences for paid acquisition",
"expected_impact": "25-35% increase in high-value customer acquisition"
}
]
}
}
Score a Lead
Score a potential customer against ICP:
curl -X POST https://api.cogny.com/v1/warehouses/wh_123abc/icp-score \
-H "Authorization: Bearer sk_live_abc123xyz789" \
-d '{
"features": {
"country": "United States",
"device": "desktop",
"acquisition_channel": "organic",
"session_count": 5,
"page_views": 25,
"engagement_time_seconds": 450
}
}'
Response:
{
"success": true,
"data": {
"score": 87.5,
"tier": "A - Excellent Fit",
"match_confidence": "high",
"matched_segment": "VIP Customers",
"probability_high_value": 0.78,
"predicted_ltv": 1850.00,
"recommendations": [
"Prioritize for sales outreach",
"Offer premium onboarding",
"Target with high-value product recommendations"
]
}
}
Performance Optimization
Query Optimization
Optimize data aggregation queries:
-- Use partitioning and clustering
SELECT *
FROM `project.analytics_123456789.events_*`
WHERE _TABLE_SUFFIX BETWEEN '20250101' AND '20250217' -- Partition pruning
AND event_name IN ('purchase', 'add_to_cart') -- Cluster pruning
Caching Strategy
Cache ICP analysis results:
from functools import lru_cache
import hashlib
@lru_cache(maxsize=100)
def get_icp_analysis(warehouse_id, date_range_hash):
"""Cached ICP analysis retrieval"""
# Fetch from database or recompute
pass
# Use hash of date range for cache key
date_range_hash = hashlib.md5(
f"{start_date}_{end_date}".encode()
).hexdigest()
analysis = get_icp_analysis(warehouse_id, date_range_hash)
Next Steps
- AI Report Generation - How AI analyzes data
- Growth Tickets - Automated recommendations
- GA4 Schema Reference - Understand data structure
Resources
- Scikit-learn Documentation: scikit-learn.org
- K-means Clustering: wikipedia.org/wiki/K-means_clustering
- RFM Analysis: wikipedia.org/wiki/RFM_(market_research)
Talk to Our Technical Team
Schedule a technical consultation to discuss your integration requirements and implementation strategy.
Schedule Demo