FraudDetectionJob.java

package com.hsbc.fraud;

import com.hsbc.fraud.model.FraudAlert;
import com.hsbc.fraud.model.Transaction;
import com.hsbc.fraud.processor.FraudDetectionProcessor;
import com.hsbc.fraud.processor.VelocityCheckProcessor;
import com.hsbc.fraud.sink.AlertSink;
import com.hsbc.fraud.source.TransactionSource;
import com.hsbc.fraud.source.TransactionSource.SourceType;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;

/**
 * Main Flink job for real-time fraud detection.
 * This job processes transaction streams and generates fraud alerts in real-time.
 * 
 * Supported data sources (configured via SOURCE_TYPE environment variable):
 * - PUBSUB: Google Cloud Pub/Sub (default)
 * - KAFKA: Apache Kafka
 * - DEMO: Simulated data for testing
 */
public class FraudDetectionJob {

    private static final Logger LOG = LoggerFactory.getLogger(FraudDetectionJob.class);

    // Default configuration values
    private static final String DEFAULT_SOURCE_TYPE = "PUBSUB";
    private static final String DEFAULT_GCP_PROJECT = "hsbc-484505";
    private static final String DEFAULT_PUBSUB_SUBSCRIPTION = "transactions-sub";
    private static final String DEFAULT_KAFKA_SERVERS = "10.113.192.45:9092";
    private static final String DEFAULT_INPUT_TOPIC = "transactions";
    private static final String DEFAULT_ALERT_TOPIC = "fraud-alerts";
    private static final String DEFAULT_ALERT_THRESHOLD = "0.4";

    public static void main(String[] args) throws Exception {
        LOG.info("Starting Fraud Detection Job...");

        // Get source type configuration
        String sourceTypeStr = getEnvOrDefault("SOURCE_TYPE", DEFAULT_SOURCE_TYPE);
        SourceType sourceType;
        try {
            sourceType = SourceType.valueOf(sourceTypeStr.toUpperCase());
        } catch (IllegalArgumentException e) {
            LOG.warn("Invalid SOURCE_TYPE '{}', defaulting to PUBSUB", sourceTypeStr);
            sourceType = SourceType.PUBSUB;
        }

        // Get common configuration
        double alertThreshold = Double.parseDouble(getEnvOrDefault("ALERT_THRESHOLD", DEFAULT_ALERT_THRESHOLD));

        // Create execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // Configure environment
        configureEnvironment(env);

        // Build and execute the pipeline based on source type
        FraudDetectionJob job = new FraudDetectionJob();

        LOG.info("Using source type: {}", sourceType);

        switch (sourceType) {
            case PUBSUB:
                String gcpProject = getEnvOrDefault("GOOGLE_CLOUD_PROJECT", DEFAULT_GCP_PROJECT);
                String subscription = getEnvOrDefault("PUBSUB_SUBSCRIPTION", DEFAULT_PUBSUB_SUBSCRIPTION);
                String alertTopic = getEnvOrDefault("PUBSUB_ALERT_TOPIC", "fraud-alerts");
                job.buildPubSubPipeline(env, gcpProject, subscription, alertTopic, alertThreshold);
                break;

            case KAFKA:
                String kafkaServers = getEnvOrDefault("KAFKA_BOOTSTRAP_SERVERS", DEFAULT_KAFKA_SERVERS);
                String inputTopic = getEnvOrDefault("INPUT_TOPIC", DEFAULT_INPUT_TOPIC);
                String kafkaAlertTopic = getEnvOrDefault("ALERT_TOPIC", DEFAULT_ALERT_TOPIC);
                job.buildKafkaPipeline(env, kafkaServers, inputTopic, kafkaAlertTopic, alertThreshold);
                break;

            case DEMO:
            default:
                job.buildDemoPipeline(env, alertThreshold);
                break;
        }

        // Execute
        env.execute("Real-Time Fraud Detection System");
    }

    /**
     * Configures the Flink execution environment.
     */
    private static void configureEnvironment(StreamExecutionEnvironment env) {
        // Enable checkpointing for fault tolerance
        env.enableCheckpointing(60000); // Checkpoint every 60 seconds
        
        // Set parallelism based on environment
        int parallelism = Integer.parseInt(getEnvOrDefault("PARALLELISM", "1"));
        env.setParallelism(parallelism);

        LOG.info("Environment configured with parallelism: {}", parallelism);
    }

    /**
     * Builds the fraud detection pipeline with Google Cloud Pub/Sub source.
     * This is the default pipeline configuration.
     */
    public void buildPubSubPipeline(StreamExecutionEnvironment env,
                                     String projectId,
                                     String subscriptionId,
                                     String alertTopic,
                                     double alertThreshold) {

        LOG.info("Building Pub/Sub pipeline: project={}, subscription={}, alertTopic={}",
                projectId, subscriptionId, alertTopic);

        // Create Pub/Sub source
        DataStream<Transaction> transactions = TransactionSource.createPubSubSource(
                env, projectId, subscriptionId);

        // Process transactions
        DataStream<FraudAlert> alerts = processTransactions(transactions, alertThreshold);

        // Output alerts to Pub/Sub
        alerts.addSink(new AlertSink.PubSubSink(projectId, alertTopic))
              .name("Pub/Sub Alert Sink");
        
        // Output alerts to log (Pub/Sub sink can be added later if needed)
        alerts.addSink(new AlertSink.LoggingSink())
              .name("Logging Sink");

        // Print to console for monitoring
        alerts.print().name("Console Output");
    }

    /**
     * Builds the fraud detection pipeline with Kafka source and sink.
     */
    public void buildKafkaPipeline(StreamExecutionEnvironment env, 
                                    String bootstrapServers,
                                    String inputTopic, 
                                    String alertTopic,
                                    double alertThreshold) {

        LOG.info("Building Kafka pipeline: input={}, alerts={}", inputTopic, alertTopic);

        // Create Kafka source
        DataStream<Transaction> transactions = TransactionSource.createKafkaSource(
                env, bootstrapServers, inputTopic);

        // Process transactions
        DataStream<FraudAlert> alerts = processTransactions(transactions, alertThreshold);

        // Output to Kafka
        alerts.sinkTo(AlertSink.createKafkaSink(bootstrapServers, alertTopic))
              .name("Kafka Alert Sink");

        // Also log to console for monitoring
        alerts.addSink(new AlertSink.LoggingSink())
              .name("Logging Sink");
    }

    /**
     * Builds a demo pipeline with generated transaction data.
     */
    public void buildDemoPipeline(StreamExecutionEnvironment env, double alertThreshold) {
        LOG.info("Building demo pipeline with simulated transactions");

        // Create demo source that generates random transactions
        DataStream<Transaction> transactions = env
                .addSource(new TransactionSource.DemoTransactionSource())
                .name("Demo Transaction Source");

        // Add watermarks for event time processing
        transactions = transactions
                .assignTimestampsAndWatermarks(
                        WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                                .withTimestampAssigner((tx, timestamp) -> 
                                        tx.getTimestamp() != null ? tx.getTimestamp().toEpochMilli() : System.currentTimeMillis())
                );

        // Process transactions
        DataStream<FraudAlert> alerts = processTransactions(transactions, alertThreshold);

        // Output alerts to log
        alerts.addSink(new AlertSink.LoggingSink())
              .name("Logging Sink");

        // Print to console
        alerts.print().name("Console Output");
    }

    /**
     * Processes transactions through the fraud detection pipeline.
     * This is the core processing logic shared by all pipeline configurations.
     */
    public DataStream<FraudAlert> processTransactions(DataStream<Transaction> transactions, 
                                                       double alertThreshold) {
        // Rule-based fraud detection
        SingleOutputStreamOperator<FraudAlert> ruleBasedAlerts = transactions
                .flatMap(new FraudDetectionProcessor(alertThreshold))
                .name("Rule-Based Fraud Detection");

        // Velocity-based fraud detection (stateful)
        SingleOutputStreamOperator<FraudAlert> velocityAlerts = transactions
                .keyBy(Transaction::getAccountId)
                .process(new VelocityCheckProcessor())
                .name("Velocity Check");

        // Union all alert streams
        return ruleBasedAlerts.union(velocityAlerts);
    }

    /**
     * Gets environment variable or returns default value.
     */
    private static String getEnvOrDefault(String name, String defaultValue) {
        String value = System.getenv(name);
        return value != null && !value.isEmpty() ? value : defaultValue;
    }
}