VelocityCheckProcessor.java
package com.hsbc.fraud.processor;
import com.hsbc.fraud.model.*;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
/**
* Stateful processor that detects velocity-based fraud patterns.
* Tracks transaction frequency per account and flags excessive transaction rates.
*/
public class VelocityCheckProcessor extends KeyedProcessFunction<String, Transaction, FraudAlert> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(VelocityCheckProcessor.class);
private final int maxTransactionsPerWindow;
private final Duration windowDuration;
// State to track recent transaction timestamps
private transient ValueState<List<Long>> recentTransactionsState;
// State to track last transaction timestamp
private transient ValueState<Long> lastTransactionState;
public VelocityCheckProcessor() {
this(5, Duration.ofMinutes(5));
}
public VelocityCheckProcessor(int maxTransactionsPerWindow, Duration windowDuration) {
this.maxTransactionsPerWindow = maxTransactionsPerWindow;
this.windowDuration = windowDuration;
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
ValueStateDescriptor<List<Long>> recentTxDescriptor = new ValueStateDescriptor<>(
"recentTransactions",
Types.LIST(Types.LONG)
);
recentTransactionsState = getRuntimeContext().getState(recentTxDescriptor);
ValueStateDescriptor<Long> lastTxDescriptor = new ValueStateDescriptor<>(
"lastTransaction",
Types.LONG
);
lastTransactionState = getRuntimeContext().getState(lastTxDescriptor);
LOG.info("Velocity check processor initialized: max {} transactions per {} seconds",
maxTransactionsPerWindow, windowDuration.getSeconds());
}
@Override
public void processElement(Transaction transaction, Context ctx, Collector<FraudAlert> out) throws Exception {
if (transaction == null || transaction.getTimestamp() == null) {
return;
}
long currentTimestamp = transaction.getTimestamp().toEpochMilli();
long windowStart = currentTimestamp - windowDuration.toMillis();
// Get recent transactions
List<Long> recentTransactions = recentTransactionsState.value();
if (recentTransactions == null) {
recentTransactions = new ArrayList<>();
}
// Remove old transactions outside the window
recentTransactions.removeIf(ts -> ts < windowStart);
// Check for rapid successive transactions
Long lastTransaction = lastTransactionState.value();
if (lastTransaction != null) {
long timeSinceLastTx = currentTimestamp - lastTransaction;
// Flag transactions within 1 second of each other
if (timeSinceLastTx < 1000 && recentTransactions.size() >= 2) {
FraudAlert alert = createRapidTransactionAlert(transaction, timeSinceLastTx);
out.collect(alert);
LOG.warn("Rapid successive transaction detected for account: {}",
transaction.getAccountId());
}
}
// Check velocity breach
if (recentTransactions.size() >= maxTransactionsPerWindow) {
FraudAlert alert = createVelocityBreachAlert(transaction, recentTransactions.size() + 1);
out.collect(alert);
LOG.warn("Velocity breach detected for account: {} - {} transactions in window",
transaction.getAccountId(), recentTransactions.size() + 1);
}
// Update state
recentTransactions.add(currentTimestamp);
recentTransactionsState.update(recentTransactions);
lastTransactionState.update(currentTimestamp);
// Set timer to clean up old state
ctx.timerService().registerEventTimeTimer(currentTimestamp + windowDuration.toMillis() + 1000);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<FraudAlert> out) throws Exception {
// Clean up old transactions from state
List<Long> recentTransactions = recentTransactionsState.value();
if (recentTransactions != null) {
long windowStart = timestamp - windowDuration.toMillis();
recentTransactions.removeIf(ts -> ts < windowStart);
if (recentTransactions.isEmpty()) {
recentTransactionsState.clear();
lastTransactionState.clear();
} else {
recentTransactionsState.update(recentTransactions);
}
}
}
private FraudAlert createVelocityBreachAlert(Transaction transaction, int transactionCount) {
double riskScore = Math.min(1.0, 0.6 + (transactionCount - maxTransactionsPerWindow) * 0.1);
return FraudAlert.builder()
.transaction(transaction)
.alertType(AlertType.VELOCITY_BREACH)
.severity(AlertSeverity.fromRiskScore(riskScore))
.riskScore(riskScore)
.addTriggeredRule("VELOCITY_CHECK")
.description(String.format(
"Account %s has %d transactions in %d seconds (max allowed: %d)",
transaction.getAccountId(),
transactionCount,
windowDuration.getSeconds(),
maxTransactionsPerWindow))
.status(AlertStatus.NEW)
.build();
}
private FraudAlert createRapidTransactionAlert(Transaction transaction, long timeSinceLastTx) {
return FraudAlert.builder()
.transaction(transaction)
.alertType(AlertType.RAPID_SUCCESSIVE_TRANSACTIONS)
.severity(AlertSeverity.HIGH)
.riskScore(0.8)
.addTriggeredRule("RAPID_TRANSACTION_CHECK")
.description(String.format(
"Rapid successive transaction detected for account %s - %d ms since last transaction",
transaction.getAccountId(),
timeSinceLastTx))
.status(AlertStatus.NEW)
.build();
}
public int getMaxTransactionsPerWindow() {
return maxTransactionsPerWindow;
}
public Duration getWindowDuration() {
return windowDuration;
}
}