Skip to content

Scaling Transaction Tagging: From Rules to ML at 10M+ Transactions

Published: December 2024 • 10 min read

Introduction

Building intelligent financial systems requires solving complex data challenges at scale. This article explores the architecture and implementation of an automated transaction tagging system that processes over 10 million transactions, transitioning from rule-based approaches to sophisticated ML pipelines.

Based on work done at Fold - read the original blog post for additional context.

System Architecture

The transaction tagging pipeline implements a three-stage approach designed for scalability and accuracy:

graph TD
    A[Raw Transaction Data] --> B[Lexical Rule-based System]
    B --> C[Intelligent F1 Tagging]
    C --> D[ML-based Categorization]
    D --> E[Tagged Transactions]

    subgraph "Stage 1: Preprocessing"
        B1[Text Normalization] --> B2[Merchant Extraction]
        B2 --> B3[Rule Application]
    end

    subgraph "Stage 2: Learning System"
        C1[User Behavior Analysis] --> C2[Pattern Recognition]
        C2 --> C3[F1 Score Optimization]
    end

    subgraph "Stage 3: ML Pipeline"
        D1[DistilBERT Embeddings] --> D2[Vector Similarity]
        D2 --> D3[Category Prediction]
    end

Stage 1: Lexical Rule-based Preprocessing

Data Normalization

The first stage handles the chaotic nature of transaction data:

def preprocess_transaction(transaction):
    # Normalize merchant names
    merchant = normalize_merchant_name(transaction.description)

    # Extract key features
    features = {
        'amount': transaction.amount,
        'timestamp': transaction.timestamp,
        'narration': clean_narration(transaction.description),
        'merchant': merchant,
        'location': extract_location(transaction.description)
    }

    return features

Rule-based Classification

Initial categorization using deterministic rules:

class LexicalRuleEngine:
    def __init__(self):
        self.rules = {
            'grocery': ['walmart', 'target', 'whole foods'],
            'gas': ['shell', 'exxon', 'chevron'],
            'dining': ['restaurant', 'cafe', 'pizza']
        }

    def apply_rules(self, transaction):
        merchant_lower = transaction['merchant'].lower()

        for category, keywords in self.rules.items():
            if any(keyword in merchant_lower for keyword in keywords):
                return category

        return 'unknown'

Stage 2: Intelligent F1 Tagging

Learning from User Behavior

The F1 tagging system learns from past user tagging patterns:

sequenceDiagram
    participant U as User
    participant S as System
    participant DB as Vector DB

    U->>S: Tags Transaction
    S->>S: Generate Embedding
    S->>DB: Store Tagged Example
    DB-->>S: Success

    Note over S: Learning Phase Complete

    U->>S: New Transaction
    S->>DB: Query Similar Transactions
    DB-->>S: Top-K Matches
    S->>S: Calculate F1 Score
    S->>U: Suggested Tag

Pattern Recognition Algorithm

class F1TaggingEngine:
    def __init__(self, vector_db):
        self.vector_db = vector_db
        self.embedding_model = load_embedding_model()

    def learn_from_tagging(self, transaction, user_tag):
        # Generate embedding for transaction
        embedding = self.embedding_model.encode(
            f"{transaction['merchant']} {transaction['narration']}"
        )

        # Store in vector database
        self.vector_db.upsert(
            vectors=[{
                'id': transaction['id'],
                'values': embedding,
                'metadata': {
                    'category': user_tag,
                    'merchant': transaction['merchant'],
                    'user_id': transaction['user_id']
                }
            }]
        )

    def suggest_tag(self, transaction, k=5):
        # Query similar transactions
        embedding = self.embedding_model.encode(
            f"{transaction['merchant']} {transaction['narration']}"
        )

        similar_transactions = self.vector_db.query(
            vector=embedding,
            top_k=k,
            filter={'user_id': transaction['user_id']}
        )

        return self.calculate_f1_optimal_tag(similar_transactions)

Stage 3: ML-based Categorization

DistilBERT Architecture

The final stage uses a fine-tuned DistilBERT model for semantic understanding:

from transformers import DistilBertTokenizer, DistilBertForSequenceClassification

class TransactionCategorizer:
    def __init__(self):
        self.tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
        self.model = DistilBertForSequenceClassification.from_pretrained(
            './fine-tuned-transaction-model'
        )

    def categorize(self, transaction):
        # Prepare input text
        input_text = f"{transaction['merchant']} {transaction['narration']}"

        # Tokenize and predict
        inputs = self.tokenizer(
            input_text,
            return_tensors='pt',
            max_length=128,
            truncation=True,
            padding=True
        )

        with torch.no_grad():
            outputs = self.model(**inputs)
            predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)

        return self.decode_prediction(predictions)

Model Training Pipeline

graph LR
    A[Transaction Data] --> B[Data Preprocessing]
    B --> C[Feature Engineering]
    C --> D[Train/Val Split]
    D --> E[DistilBERT Fine-tuning]
    E --> F[Model Evaluation]
    F --> G[Production Deployment]

    subgraph "Training Process"
        E1[Tokenization] --> E2[Attention Mechanism]
        E2 --> E3[Classification Head]
    end

Scaling to 10M+ Transactions

Performance Optimizations

class ScalableTransactionProcessor:
    def __init__(self):
        self.batch_size = 1000
        self.max_workers = 8

    def process_transactions_batch(self, transactions):
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Process in parallel batches
            futures = []
            for batch in self.chunk_transactions(transactions, self.batch_size):
                future = executor.submit(self.process_batch, batch)
                futures.append(future)

            results = [future.result() for future in futures]

        return self.merge_results(results)

    def chunk_transactions(self, transactions, chunk_size):
        for i in range(0, len(transactions), chunk_size):
            yield transactions[i:i + chunk_size]

Infrastructure Architecture

graph TB
    A[Transaction Stream] --> B[Load Balancer]
    B --> C[Processing Nodes]
    C --> D[Vector Database]
    C --> E[ML Model Cluster]

    subgraph "Scaling Components"
        F[Redis Cache] --> G[Result Storage]
        H[Monitoring] --> I[Auto-scaling]
    end

    D --> F
    E --> F

Data Preprocessing Challenges

Handling Merchant Name Variations

def normalize_merchant_name(description):
    # Remove common prefixes/suffixes
    cleaned = re.sub(r'^(POS|ATM|DDA|DEBIT)\s+', '', description)

    # Standardize location information
    cleaned = re.sub(r'\s+[A-Z]{2}\s+\d{5}$', '', cleaned)

    # Remove transaction IDs
    cleaned = re.sub(r'#\d+', '', cleaned)

    return cleaned.strip().upper()

Feature Engineering

def extract_transaction_features(transaction):
    features = {
        # Temporal features
        'hour': transaction.timestamp.hour,
        'day_of_week': transaction.timestamp.weekday(),
        'is_weekend': transaction.timestamp.weekday() >= 5,

        # Amount features
        'amount_log': np.log1p(abs(transaction.amount)),
        'is_large_amount': abs(transaction.amount) > 100,

        # Text features
        'description_length': len(transaction.description),
        'has_numbers': bool(re.search(r'\d', transaction.description)),
        'merchant_category': get_merchant_category(transaction.merchant)
    }

    return features

Performance Metrics and Results

System Performance

  • Throughput: 10M+ transactions processed daily
  • Latency: Sub-100ms categorization per transaction
  • Accuracy: 89% automatic categorization accuracy
  • F1 Score: 0.87 for learning-based suggestions

Model Evaluation

def evaluate_model_performance(predictions, ground_truth):
    metrics = {
        'accuracy': accuracy_score(ground_truth, predictions),
        'precision': precision_score(ground_truth, predictions, average='weighted'),
        'recall': recall_score(ground_truth, predictions, average='weighted'),
        'f1': f1_score(ground_truth, predictions, average='weighted')
    }

    return metrics

This article details the technical implementation of Fold's transaction tagging system. Read the original blog post for more business context.