Scaling Transaction Tagging: From Rules to ML at 10M+ Transactions¶
Published: December 2024 • 10 min read
Introduction¶
Building intelligent financial systems requires solving complex data challenges at scale. This article explores the architecture and implementation of an automated transaction tagging system that processes over 10 million transactions, transitioning from rule-based approaches to sophisticated ML pipelines.
Based on work done at Fold - read the original blog post for additional context.
System Architecture¶
The transaction tagging pipeline implements a three-stage approach designed for scalability and accuracy:
graph TD
    A[Raw Transaction Data] --> B[Lexical Rule-based System]
    B --> C[Intelligent F1 Tagging]
    C --> D[ML-based Categorization]
    D --> E[Tagged Transactions]
    subgraph "Stage 1: Preprocessing"
        B1[Text Normalization] --> B2[Merchant Extraction]
        B2 --> B3[Rule Application]
    end
    subgraph "Stage 2: Learning System"
        C1[User Behavior Analysis] --> C2[Pattern Recognition]
        C2 --> C3[F1 Score Optimization]
    end
    subgraph "Stage 3: ML Pipeline"
        D1[DistilBERT Embeddings] --> D2[Vector Similarity]
        D2 --> D3[Category Prediction]
    end
Stage 1: Lexical Rule-based Preprocessing¶
Data Normalization¶
The first stage handles the chaotic nature of transaction data:
def preprocess_transaction(transaction):
    # Normalize merchant names
    merchant = normalize_merchant_name(transaction.description)
    # Extract key features
    features = {
        'amount': transaction.amount,
        'timestamp': transaction.timestamp,
        'narration': clean_narration(transaction.description),
        'merchant': merchant,
        'location': extract_location(transaction.description)
    }
    return features
Rule-based Classification¶
Initial categorization using deterministic rules:
class LexicalRuleEngine:
    def __init__(self):
        self.rules = {
            'grocery': ['walmart', 'target', 'whole foods'],
            'gas': ['shell', 'exxon', 'chevron'],
            'dining': ['restaurant', 'cafe', 'pizza']
        }
    def apply_rules(self, transaction):
        merchant_lower = transaction['merchant'].lower()
        for category, keywords in self.rules.items():
            if any(keyword in merchant_lower for keyword in keywords):
                return category
        return 'unknown'
Stage 2: Intelligent F1 Tagging¶
Learning from User Behavior¶
The F1 tagging system learns from past user tagging patterns:
sequenceDiagram
    participant U as User
    participant S as System
    participant DB as Vector DB
    U->>S: Tags Transaction
    S->>S: Generate Embedding
    S->>DB: Store Tagged Example
    DB-->>S: Success
    Note over S: Learning Phase Complete
    U->>S: New Transaction
    S->>DB: Query Similar Transactions
    DB-->>S: Top-K Matches
    S->>S: Calculate F1 Score
    S->>U: Suggested Tag
Pattern Recognition Algorithm¶
class F1TaggingEngine:
    def __init__(self, vector_db):
        self.vector_db = vector_db
        self.embedding_model = load_embedding_model()
    def learn_from_tagging(self, transaction, user_tag):
        # Generate embedding for transaction
        embedding = self.embedding_model.encode(
            f"{transaction['merchant']} {transaction['narration']}"
        )
        # Store in vector database
        self.vector_db.upsert(
            vectors=[{
                'id': transaction['id'],
                'values': embedding,
                'metadata': {
                    'category': user_tag,
                    'merchant': transaction['merchant'],
                    'user_id': transaction['user_id']
                }
            }]
        )
    def suggest_tag(self, transaction, k=5):
        # Query similar transactions
        embedding = self.embedding_model.encode(
            f"{transaction['merchant']} {transaction['narration']}"
        )
        similar_transactions = self.vector_db.query(
            vector=embedding,
            top_k=k,
            filter={'user_id': transaction['user_id']}
        )
        return self.calculate_f1_optimal_tag(similar_transactions)
Stage 3: ML-based Categorization¶
DistilBERT Architecture¶
The final stage uses a fine-tuned DistilBERT model for semantic understanding:
from transformers import DistilBertTokenizer, DistilBertForSequenceClassification
class TransactionCategorizer:
    def __init__(self):
        self.tokenizer = DistilBertTokenizer.from_pretrained('distilbert-base-uncased')
        self.model = DistilBertForSequenceClassification.from_pretrained(
            './fine-tuned-transaction-model'
        )
    def categorize(self, transaction):
        # Prepare input text
        input_text = f"{transaction['merchant']} {transaction['narration']}"
        # Tokenize and predict
        inputs = self.tokenizer(
            input_text,
            return_tensors='pt',
            max_length=128,
            truncation=True,
            padding=True
        )
        with torch.no_grad():
            outputs = self.model(**inputs)
            predictions = torch.nn.functional.softmax(outputs.logits, dim=-1)
        return self.decode_prediction(predictions)
Model Training Pipeline¶
graph LR
    A[Transaction Data] --> B[Data Preprocessing]
    B --> C[Feature Engineering]
    C --> D[Train/Val Split]
    D --> E[DistilBERT Fine-tuning]
    E --> F[Model Evaluation]
    F --> G[Production Deployment]
    subgraph "Training Process"
        E1[Tokenization] --> E2[Attention Mechanism]
        E2 --> E3[Classification Head]
    end
Scaling to 10M+ Transactions¶
Performance Optimizations¶
class ScalableTransactionProcessor:
    def __init__(self):
        self.batch_size = 1000
        self.max_workers = 8
    def process_transactions_batch(self, transactions):
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # Process in parallel batches
            futures = []
            for batch in self.chunk_transactions(transactions, self.batch_size):
                future = executor.submit(self.process_batch, batch)
                futures.append(future)
            results = [future.result() for future in futures]
        return self.merge_results(results)
    def chunk_transactions(self, transactions, chunk_size):
        for i in range(0, len(transactions), chunk_size):
            yield transactions[i:i + chunk_size]
Infrastructure Architecture¶
graph TB
    A[Transaction Stream] --> B[Load Balancer]
    B --> C[Processing Nodes]
    C --> D[Vector Database]
    C --> E[ML Model Cluster]
    subgraph "Scaling Components"
        F[Redis Cache] --> G[Result Storage]
        H[Monitoring] --> I[Auto-scaling]
    end
    D --> F
    E --> F
Data Preprocessing Challenges¶
Handling Merchant Name Variations¶
def normalize_merchant_name(description):
    # Remove common prefixes/suffixes
    cleaned = re.sub(r'^(POS|ATM|DDA|DEBIT)\s+', '', description)
    # Standardize location information
    cleaned = re.sub(r'\s+[A-Z]{2}\s+\d{5}$', '', cleaned)
    # Remove transaction IDs
    cleaned = re.sub(r'#\d+', '', cleaned)
    return cleaned.strip().upper()
Feature Engineering¶
def extract_transaction_features(transaction):
    features = {
        # Temporal features
        'hour': transaction.timestamp.hour,
        'day_of_week': transaction.timestamp.weekday(),
        'is_weekend': transaction.timestamp.weekday() >= 5,
        # Amount features
        'amount_log': np.log1p(abs(transaction.amount)),
        'is_large_amount': abs(transaction.amount) > 100,
        # Text features
        'description_length': len(transaction.description),
        'has_numbers': bool(re.search(r'\d', transaction.description)),
        'merchant_category': get_merchant_category(transaction.merchant)
    }
    return features
Performance Metrics and Results¶
System Performance¶
- Throughput: 10M+ transactions processed daily
 - Latency: Sub-100ms categorization per transaction
 - Accuracy: 89% automatic categorization accuracy
 - F1 Score: 0.87 for learning-based suggestions
 
Model Evaluation¶
def evaluate_model_performance(predictions, ground_truth):
    metrics = {
        'accuracy': accuracy_score(ground_truth, predictions),
        'precision': precision_score(ground_truth, predictions, average='weighted'),
        'recall': recall_score(ground_truth, predictions, average='weighted'),
        'f1': f1_score(ground_truth, predictions, average='weighted')
    }
    return metrics
This article details the technical implementation of Fold's transaction tagging system. Read the original blog post for more business context.