Predictive Analytics in AdTech#

Note: The code examples below are complete implementations you can run. For interactive experimentation, consider creating Jupyter notebooks based on these examples.

Using data to predict the future is what predictive analytics does. You don’t use crystal balls; instead, you use regression models, time series forecasting, and a lot of patterns from the past. The goal is to guess what will happen next so you can do something about it before it happens.

In AdTech, this means predicting conversions, churn, lifetime value, budget pacing, and about a dozen other metrics that tell you if your campaigns make money or lose it.

Why Prediction Is Important#

It costs a lot to do reactive advertising. You start a campaign, wait for results, and then make changes. Once you realize something isn’t working, you’ve already spent too much money.

Predictive analytics turns this around: it lets you predict performance before you launch, find problems before they ruin your results, and make things better while there’s still time to fix them.

I’ve seen campaigns save 30–40% of their budget by catching performance trends that are going down early instead of waiting until the end of the month to look at them. That is real money.

Core Predictive Models in AdTech#

1. Conversion Probability Prediction#

The most fundamental: given a user’s characteristics, what’s the chance they convert?

import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, roc_auc_score

class ConversionPredictor:
    """
    Predict likelihood of conversion for targeting and bidding
    """
    
    def __init__(self):
        self.model = GradientBoostingClassifier(
            n_estimators=100,
            learning_rate=0.1,
            max_depth=5,
            random_state=42
        )
        self.feature_names = None
    
    def prepare_features(self, user_data):
        """
        Feature engineering for conversion prediction
        """
        
        features = pd.DataFrame()
        
        # Behavioral features
        features['sessions_last_30d'] = user_data['sessions_last_30d']
        features['pages_per_session'] = user_data['total_pages'] / user_data['sessions_last_30d'].clip(lower=1)
        features['avg_session_duration'] = user_data['total_time'] / user_data['sessions_last_30d'].clip(lower=1)
        
        # Engagement features
        features['email_open_rate'] = user_data['emails_opened'] / user_data['emails_sent'].clip(lower=1)
        features['product_views'] = user_data['product_views']
        features['cart_adds'] = user_data['cart_adds']
        features['checkout_starts'] = user_data['checkout_starts']
        
        # Recency features (days since last action)
        features['days_since_visit'] = user_data['days_since_visit']
        features['days_since_product_view'] = user_data['days_since_product_view']
        
        # Derived features
        features['cart_add_rate'] = user_data['cart_adds'] / user_data['product_views'].clip(lower=1)
        features['checkout_conversion'] = user_data['checkout_starts'] / user_data['cart_adds'].clip(lower=1)
        
        # Device/channel features
        features['is_mobile'] = (user_data['device_type'] == 'mobile').astype(int)
        features['came_from_paid'] = (user_data['acquisition_channel'] == 'paid').astype(int)
        
        self.feature_names = features.columns.tolist()
        
        return features
    
    def train(self, user_data, conversions):
        """
        Train conversion prediction model
        """
        
        X = self.prepare_features(user_data)
        y = conversions
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        
        # Train model
        self.model.fit(X_train, y_train)
        
        # Evaluate
        y_pred = self.model.predict(X_test)
        y_pred_proba = self.model.predict_proba(X_test)[:, 1]
        
        print("Model Performance:")
        print(classification_report(y_test, y_pred))
        print(f"\nROC-AUC Score: {roc_auc_score(y_test, y_pred_proba):.3f}")
        
        # Feature importance
        importance = pd.DataFrame({
            'feature': self.feature_names,
            'importance': self.model.feature_importances_
        }).sort_values('importance', ascending=False)
        
        print("\nTop Conversion Predictors:")
        print(importance.head(10))
        
        return self.model
    
    def predict_proba(self, user_data):
        """
        Predict conversion probability for new users
        """
        
        X = self.prepare_features(user_data)
        probabilities = self.model.predict_proba(X)[:, 1]
        
        return probabilities
    
    def score_users(self, user_data):
        """
        Score users from 0-100 based on conversion likelihood
        """
        
        probabilities = self.predict_proba(user_data)
        scores = (probabilities * 100).round().astype(int)
        
        return scores

# Example usage
# users = pd.read_csv('user_data.csv')
# conversions = users['converted']
# 
# predictor = ConversionPredictor()
# predictor.train(users, conversions)
# 
# # Score new users for targeting
# new_users = pd.read_csv('new_users.csv')
# scores = predictor.score_users(new_users)
# 
# # Target users with high conversion probability
# high_value_users = new_users[scores > 70]

This lets you focus ad spend on users most likely to convert instead of spraying budget everywhere.

2. Churn Prediction#

Identifying customers about to leave so you can win them back before they’re gone.

from sklearn.ensemble import RandomForestClassifier
from datetime import datetime, timedelta

class ChurnPredictor:
    """
    Predict which customers are likely to churn
    """
    
    def __init__(self, churn_definition_days=60):
        self.churn_definition_days = churn_definition_days
        self.model = RandomForestClassifier(n_estimators=100, random_state=42)
    
    def calculate_churn_labels(self, customer_data, reference_date):
        """
        Define churn: no activity in last N days
        """
        
        churn_threshold = reference_date - timedelta(days=self.churn_definition_days)
        last_activity = pd.to_datetime(customer_data['last_activity_date'])
        
        churned = (last_activity < churn_threshold).astype(int)
        
        return churned
    
    def engineer_churn_features(self, customer_data):
        """
        Create features that indicate churn risk
        """
        
        features = pd.DataFrame()
        
        # Recency features (strong churn indicators)
        features['days_since_purchase'] = customer_data['days_since_purchase']
        features['days_since_login'] = customer_data['days_since_login']
        features['days_since_email_open'] = customer_data['days_since_email_open']
        
        # Frequency features
        features['purchases_last_90d'] = customer_data['purchases_last_90d']
        features['logins_last_30d'] = customer_data['logins_last_30d']
        features['sessions_last_30d'] = customer_data['sessions_last_30d']
        
        # Trend features (declining engagement = churn risk)
        features['purchase_trend'] = (
            customer_data['purchases_last_30d'] - customer_data['purchases_30_60d']
        )
        features['engagement_trend'] = (
            customer_data['sessions_last_30d'] - customer_data['sessions_30_60d']
        )
        
        # Monetary features
        features['ltv'] = customer_data['lifetime_value']
        features['avg_order_value'] = customer_data['avg_order_value']
        features['days_as_customer'] = customer_data['days_as_customer']
        
        # Support interactions (more support = potential issues)
        features['support_tickets_last_30d'] = customer_data['support_tickets_last_30d']
        
        return features
    
    def train(self, customer_data, reference_date=None):
        """
        Train churn prediction model
        """
        
        if reference_date is None:
            reference_date = datetime.now()
        
        # Create labels
        y = self.calculate_churn_labels(customer_data, reference_date)
        
        # Create features
        X = self.engineer_churn_features(customer_data)
        
        # Split
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )
        
        # Train
        self.model.fit(X_train, y_train)
        
        # Evaluate
        y_pred = self.model.predict(X_test)
        y_proba = self.model.predict_proba(X_test)[:, 1]
        
        print("Churn Prediction Performance:")
        print(classification_report(y_test, y_pred))
        print(f"ROC-AUC: {roc_auc_score(y_test, y_proba):.3f}")
        
        # Identify key churn indicators
        importance = pd.DataFrame({
            'feature': X.columns,
            'importance': self.model.feature_importances_
        }).sort_values('importance', ascending=False)
        
        print("\nTop Churn Indicators:")
        print(importance.head(10))
        
        return self.model
    
    def identify_at_risk_customers(self, customer_data, risk_threshold=0.7):
        """
        Find customers at high risk of churning
        """
        
        X = self.engineer_churn_features(customer_data)
        churn_proba = self.model.predict_proba(X)[:, 1]
        
        at_risk = pd.DataFrame({
            'customer_id': customer_data['customer_id'],
            'churn_probability': churn_proba,
            'ltv': customer_data['lifetime_value'],
            'days_since_purchase': customer_data['days_since_purchase']
        })
        
        # Focus on high-value customers at risk
        at_risk = at_risk[churn_proba > risk_threshold].sort_values(
            'ltv', ascending=False
        )
        
        return at_risk

# Usage: Build a retargeting campaign for at-risk customers
# churn_model = ChurnPredictor(churn_definition_days=60)
# churn_model.train(customers)
# 
# at_risk = churn_model.identify_at_risk_customers(customers, risk_threshold=0.7)
# 
# # Target these users with win-back campaigns
# print(f"Found {len(at_risk)} high-risk customers")
# print(f"Combined LTV at risk: ${at_risk['ltv'].sum():,.2f}")

Use this to trigger win-back campaigns automatically when a customer’s churn risk crosses a threshold. Way more effective than generic “We miss you!” emails sent randomly.

3. Budget Pacing Prediction#

Forecast whether you’ll spend your entire budget or run out before the campaign ends.

from sklearn.linear_model import LinearRegression
import numpy as np

class BudgetPacingPredictor:
    """
    Predict campaign spend trajectory and recommend pacing adjustments
    """
    
    def __init__(self):
        self.model = LinearRegression()
    
    def forecast_spend(self, daily_spend_history, days_remaining):
        """
        Forecast remaining spend based on historical trend
        """
        
        # Prepare data
        days = np.arange(len(daily_spend_history)).reshape(-1, 1)
        spend = np.array(daily_spend_history)
        
        # Fit trend
        self.model.fit(days, spend)
        
        # Forecast future days
        future_days = np.arange(
            len(daily_spend_history),
            len(daily_spend_history) + days_remaining
        ).reshape(-1, 1)
        
        forecasted_spend = self.model.predict(future_days)
        
        return forecasted_spend
    
    def analyze_pacing(self, campaign_stats):
        """
        Analyze campaign pacing and recommend adjustments
        """
        
        total_budget = campaign_stats['total_budget']
        spent_so_far = campaign_stats['spent_so_far']
        days_elapsed = campaign_stats['days_elapsed']
        days_remaining = campaign_stats['days_remaining']
        daily_spend = campaign_stats['daily_spend_history']
        
        # Forecast remaining spend
        forecasted_daily_spend = self.forecast_spend(daily_spend, days_remaining)
        forecasted_total_spend = spent_so_far + forecasted_daily_spend.sum()
        
        # Calculate pacing metrics
        total_days = days_elapsed + days_remaining
        expected_spend_pct = days_elapsed / total_days
        actual_spend_pct = spent_so_far / total_budget
        
        pacing_ratio = actual_spend_pct / expected_spend_pct
        
        # Forecast vs budget
        spend_gap = forecasted_total_spend - total_budget
        spend_gap_pct = (spend_gap / total_budget) * 100
        
        # Determine pacing status
        if pacing_ratio < 0.85:
            status = "UNDER-PACING"
            recommendation = "Increase bids or expand targeting to spend remaining budget"
        elif pacing_ratio > 1.15:
            status = "OVER-PACING"
            recommendation = "Reduce bids or tighten targeting to avoid budget exhaustion"
        else:
            status = "ON-PACE"
            recommendation = "Maintain current strategy"
        
        # Calculate recommended daily budget adjustment
        if days_remaining > 0:
            remaining_budget = total_budget - spent_so_far
            recommended_daily = remaining_budget / days_remaining
            current_avg_daily = spent_so_far / days_elapsed
            adjustment_pct = ((recommended_daily / current_avg_daily) - 1) * 100
        else:
            recommended_daily = 0
            adjustment_pct = 0
        
        return {
            'status': status,
            'pacing_ratio': pacing_ratio,
            'spent_pct': actual_spend_pct * 100,
            'forecasted_total_spend': forecasted_total_spend,
            'spend_gap': spend_gap,
            'spend_gap_pct': spend_gap_pct,
            'recommended_daily_budget': recommended_daily,
            'adjustment_needed_pct': adjustment_pct,
            'recommendation': recommendation
        }

# Example usage
campaign = {
    'total_budget': 10000,
    'spent_so_far': 3000,
    'days_elapsed': 10,
    'days_remaining': 20,
    'daily_spend_history': [250, 280, 310, 290, 300, 320, 295, 305, 330, 320]
}

pacer = BudgetPacingPredictor()
analysis = pacer.analyze_pacing(campaign)

print(f"Status: {analysis['status']}")
print(f"Pacing Ratio: {analysis['pacing_ratio']:.2f}")
print(f"Spent: {analysis['spent_pct']:.1f}% of budget")
print(f"Forecasted Total: ${analysis['forecasted_total_spend']:,.2f}")
print(f"Gap: ${analysis['spend_gap']:,.2f} ({analysis['spend_gap_pct']:.1f}%)")
print(f"\nRecommended Daily Budget: ${analysis['recommended_daily_budget']:.2f}")
print(f"Adjustment Needed: {analysis['adjustment_needed_pct']:+.1f}%")
print(f"\n{analysis['recommendation']}")

This prevents the classic mistake of spending your entire monthly budget in the first two weeks, then sitting idle for the rest of the month.

Lookalike Modeling#

Find new customers that resemble your best existing customers.

from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import StandardScaler

class LookalikeModeler:
    """
    Find users similar to your high-value customers
    """
    
    def __init__(self, n_neighbors=100):
        self.n_neighbors = n_neighbors
        self.model = NearestNeighbors(n_neighbors=n_neighbors, metric='cosine')
        self.scaler = StandardScaler()
    
    def fit_seed_audience(self, seed_users, feature_columns):
        """
        Define your seed audience (high-value customers)
        """
        
        self.feature_columns = feature_columns
        X = seed_users[feature_columns]
        
        # Normalize features
        X_scaled = self.scaler.fit_transform(X)
        
        # Fit model
        self.model.fit(X_scaled)
        
        return self
    
    def find_lookalikes(self, potential_users):
        """
        Find users similar to seed audience from potential user pool
        """
        
        X = potential_users[self.feature_columns]
        X_scaled = self.scaler.transform(X)
        
        # Find nearest neighbors
        distances, indices = self.model.kneighbors(X_scaled)
        
        # Calculate similarity scores (inverse of distance)
        similarity_scores = 1 / (1 + distances.mean(axis=1))
        
        lookalikes = potential_users.copy()
        lookalikes['similarity_score'] = similarity_scores
        lookalikes = lookalikes.sort_values('similarity_score', ascending=False)
        
        return lookalikes

# Example usage
# high_value_customers = customers[customers['ltv'] > 1000]
# 
# modeler = LookalikeModeler(n_neighbors=50)
# modeler.fit_seed_audience(
#     high_value_customers,
#     feature_columns=['sessions', 'pages_viewed', 'email_opens', 'product_categories']
# )
# 
# # Find similar users in prospect database
# lookalikes = modeler.find_lookalikes(prospects)
# 
# # Target top 10% most similar users
# top_lookalikes = lookalikes.head(int(len(lookalikes) * 0.1))

Facebook’s Lookalike Audiences use this concept—upload your customer list, they find similar users to target. You can build your own version.

Key Takeaways#

Predictive analytics in AdTech is about seeing around corners—forecasting conversions, identifying churn before it happens, optimizing budget pacing, finding high-value prospects.

The models aren’t complex—mostly regression, classification, and time series. The value is in applying them systematically: predict churn every week, adjust bids based on conversion probability daily, forecast pacing in real-time.

The code above gives you working implementations. In production, you’d add more sophisticated feature engineering, ensemble methods, and automated retraining. But the core logic stays the same: use past patterns to predict future outcomes, then act on those predictions.

The companies winning in AdTech aren’t the ones with the fanciest models—they’re the ones consistently using predictions to make better decisions faster than the competition.