FraudDetectionProcessor.java

package com.hsbc.fraud.processor;

import com.hsbc.fraud.engine.FraudDetectionEngine;
import com.hsbc.fraud.model.FraudAlert;
import com.hsbc.fraud.model.Transaction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

/**
 * Flink processor function that applies fraud detection logic to transactions.
 * Uses RichFlatMapFunction for initialization and lifecycle management.
 */
public class FraudDetectionProcessor extends RichFlatMapFunction<Transaction, FraudAlert> {

    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(FraudDetectionProcessor.class);

    private transient FraudDetectionEngine engine;
    private final double alertThreshold;

    public FraudDetectionProcessor() {
        this(0.4);
    }

    public FraudDetectionProcessor(double alertThreshold) {
        this.alertThreshold = alertThreshold;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        this.engine = new FraudDetectionEngine(alertThreshold);
        LOG.info("Fraud detection processor initialized with threshold: {}", alertThreshold);
    }

    @Override
    public void flatMap(Transaction transaction, Collector<FraudAlert> out) throws Exception {
        if (transaction == null) {
            return;
        }

        long startTime = System.nanoTime();

        try {
            Optional<FraudAlert> alert = engine.evaluate(transaction);

            if (alert.isPresent()) {
                out.collect(alert.get());
                LOG.info("Fraud alert generated for transaction: {}", transaction.getTransactionId());
            }
        } catch (Exception e) {
            LOG.error("Error processing transaction {}: {}", 
                    transaction.getTransactionId(), e.getMessage(), e);
        } finally {
            long processingTime = (System.nanoTime() - startTime) / 1_000_000;
            if (processingTime > 100) {
                LOG.warn("Slow processing detected: {} ms for transaction {}", 
                        processingTime, transaction.getTransactionId());
            }
        }
    }

    @Override
    public void close() throws Exception {
        super.close();
        LOG.info("Fraud detection processor closed");
    }
}