FraudPatternDetector.java

package com.hsbc.fraud.cep;

import com.hsbc.fraud.model.*;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.util.List;
import java.util.Map;

/**
 * Complex Event Processing (CEP) based fraud pattern detector.
 * Uses Flink CEP to detect complex fraud patterns across multiple transactions.
 */
public class FraudPatternDetector {

    private static final Logger LOG = LoggerFactory.getLogger(FraudPatternDetector.class);

    /**
     * Detects rapid small transactions followed by a large transaction.
     * This pattern is common in testing stolen cards before making large purchases.
     */
    public static DataStream<FraudAlert> detectSmallThenLargePattern(
            DataStream<Transaction> transactions) {

        Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("small")
                .where(new SimpleCondition<Transaction>() {
                    @Override
                    public boolean filter(Transaction tx) {
                        return tx.getAmount() != null && 
                               tx.getAmount().compareTo(new BigDecimal("50")) < 0;
                    }
                })
                .timesOrMore(3)
                .within(Time.minutes(5))
                .followedBy("large")
                .where(new SimpleCondition<Transaction>() {
                    @Override
                    public boolean filter(Transaction tx) {
                        return tx.getAmount() != null && 
                               tx.getAmount().compareTo(new BigDecimal("1000")) >= 0;
                    }
                });

        PatternStream<Transaction> patternStream = CEP.pattern(
                transactions.keyBy(Transaction::getAccountId), 
                pattern
        );

        return patternStream.select(new PatternSelectFunction<Transaction, FraudAlert>() {
            @Override
            public FraudAlert select(Map<String, List<Transaction>> pattern) {
                List<Transaction> smallTransactions = pattern.get("small");
                Transaction largeTransaction = pattern.get("large").get(0);

                return FraudAlert.builder()
                        .transaction(largeTransaction)
                        .alertType(AlertType.UNUSUAL_PATTERN)
                        .severity(AlertSeverity.HIGH)
                        .riskScore(0.85)
                        .addTriggeredRule("SMALL_THEN_LARGE_PATTERN")
                        .description(String.format(
                                "Pattern detected: %d small transactions followed by large transaction of %s %s",
                                smallTransactions.size(),
                                largeTransaction.getAmount(),
                                largeTransaction.getCurrency()))
                        .status(AlertStatus.NEW)
                        .build();
            }
        });
    }

    /**
     * Detects transactions from multiple countries in a short time period.
     * This is physically impossible and indicates compromised credentials.
     */
    public static DataStream<FraudAlert> detectMultiCountryPattern(
            DataStream<Transaction> transactions) {

        Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("first")
                .where(new SimpleCondition<Transaction>() {
                    @Override
                    public boolean filter(Transaction tx) {
                        return tx.getCountryCode() != null;
                    }
                })
                .followedBy("second")
                .where(new SimpleCondition<Transaction>() {
                    @Override
                    public boolean filter(Transaction tx) {
                        return tx.getCountryCode() != null;
                    }
                })
                .within(Time.minutes(10));

        PatternStream<Transaction> patternStream = CEP.pattern(
                transactions.keyBy(Transaction::getAccountId),
                pattern
        );

        return patternStream.select(new PatternSelectFunction<Transaction, FraudAlert>() {
            @Override
            public FraudAlert select(Map<String, List<Transaction>> pattern) {
                Transaction first = pattern.get("first").get(0);
                Transaction second = pattern.get("second").get(0);

                // Only alert if countries are different
                if (first.getCountryCode().equals(second.getCountryCode())) {
                    return null;
                }

                return FraudAlert.builder()
                        .transaction(second)
                        .alertType(AlertType.GEOGRAPHIC_ANOMALY)
                        .severity(AlertSeverity.CRITICAL)
                        .riskScore(0.95)
                        .addTriggeredRule("MULTI_COUNTRY_PATTERN")
                        .description(String.format(
                                "Impossible travel detected: transaction from %s followed by %s within 10 minutes",
                                first.getCountryCode(),
                                second.getCountryCode()))
                        .status(AlertStatus.NEW)
                        .build();
            }
        }).filter(alert -> alert != null);
    }

    /**
     * Detects multiple failed transactions followed by a successful one.
     * Common pattern when fraudsters are guessing card details.
     */
    public static DataStream<FraudAlert> detectRepeatedAmountPattern(
            DataStream<Transaction> transactions) {

        Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("repeated")
                .where(new SimpleCondition<Transaction>() {
                    @Override
                    public boolean filter(Transaction tx) {
                        return tx.getAmount() != null;
                    }
                })
                .timesOrMore(3)
                .consecutive()
                .within(Time.minutes(2));

        PatternStream<Transaction> patternStream = CEP.pattern(
                transactions.keyBy(Transaction::getAccountId),
                pattern
        );

        return patternStream.select(new PatternSelectFunction<Transaction, FraudAlert>() {
            @Override
            public FraudAlert select(Map<String, List<Transaction>> pattern) {
                List<Transaction> repeatedTx = pattern.get("repeated");
                
                // Check if amounts are all the same (suspicious pattern)
                BigDecimal firstAmount = repeatedTx.get(0).getAmount();
                boolean allSameAmount = repeatedTx.stream()
                        .allMatch(tx -> tx.getAmount().compareTo(firstAmount) == 0);

                if (!allSameAmount) {
                    return null;
                }

                Transaction lastTx = repeatedTx.get(repeatedTx.size() - 1);

                return FraudAlert.builder()
                        .transaction(lastTx)
                        .alertType(AlertType.UNUSUAL_PATTERN)
                        .severity(AlertSeverity.MEDIUM)
                        .riskScore(0.65)
                        .addTriggeredRule("REPEATED_AMOUNT_PATTERN")
                        .description(String.format(
                                "Repeated same amount transactions detected: %d transactions of %s %s",
                                repeatedTx.size(),
                                firstAmount,
                                lastTx.getCurrency()))
                        .status(AlertStatus.NEW)
                        .build();
            }
        }).filter(alert -> alert != null);
    }

    /**
     * Detects gradual increase in transaction amounts.
     * Pattern used to test account limits.
     */
    public static DataStream<FraudAlert> detectIncreasingAmountPattern(
            DataStream<Transaction> transactions) {

        Pattern<Transaction, ?> pattern = Pattern.<Transaction>begin("increasing")
                .where(new SimpleCondition<Transaction>() {
                    @Override
                    public boolean filter(Transaction tx) {
                        return tx.getAmount() != null;
                    }
                })
                .timesOrMore(4)
                .within(Time.hours(1));

        PatternStream<Transaction> patternStream = CEP.pattern(
                transactions.keyBy(Transaction::getAccountId),
                pattern
        );

        return patternStream.select(new PatternSelectFunction<Transaction, FraudAlert>() {
            @Override
            public FraudAlert select(Map<String, List<Transaction>> pattern) {
                List<Transaction> txList = pattern.get("increasing");

                // Check if amounts are consistently increasing
                boolean isIncreasing = true;
                for (int i = 1; i < txList.size(); i++) {
                    if (txList.get(i).getAmount().compareTo(txList.get(i - 1).getAmount()) <= 0) {
                        isIncreasing = false;
                        break;
                    }
                }

                if (!isIncreasing) {
                    return null;
                }

                Transaction lastTx = txList.get(txList.size() - 1);
                BigDecimal totalAmount = txList.stream()
                        .map(Transaction::getAmount)
                        .reduce(BigDecimal.ZERO, BigDecimal::add);

                return FraudAlert.builder()
                        .transaction(lastTx)
                        .alertType(AlertType.UNUSUAL_PATTERN)
                        .severity(AlertSeverity.MEDIUM)
                        .riskScore(0.6)
                        .addTriggeredRule("INCREASING_AMOUNT_PATTERN")
                        .description(String.format(
                                "Increasing amount pattern detected: %d transactions totaling %s %s",
                                txList.size(),
                                totalAmount,
                                lastTx.getCurrency()))
                        .status(AlertStatus.NEW)
                        .build();
            }
        }).filter(alert -> alert != null);
    }
}