PubSubTransactionProducer.java

package com.hsbc.fraud.producer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.hsbc.fraud.model.Transaction;
import com.hsbc.fraud.model.TransactionType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

/**
 * Transaction Producer that generates simulated transactions and sends to Google Cloud Pub/Sub.
 * 
 * Prerequisites:
 *   1. Set up Google Cloud authentication:
 *      - Set GOOGLE_APPLICATION_CREDENTIALS environment variable to your service account key file
 *      - Or run on GCP with appropriate IAM permissions
 *   2. Create the Pub/Sub topic in your GCP project
 * 
 * Usage:
 *   java -cp fraud-detection-system-1.0.0.jar com.hsbc.fraud.producer.PubSubTransactionProducer
 * 
 * Environment variables:
 *   GOOGLE_CLOUD_PROJECT - GCP project ID (required)
 *   PUBSUB_TOPIC - Pub/Sub topic name (default: transactions)
 *   TPS - Transactions per second (default: 10)
 *   FRAUD_RATE - Percentage of fraudulent transactions (default: 15)
 *   GOOGLE_APPLICATION_CREDENTIALS - Path to service account key JSON file
 */
public class PubSubTransactionProducer {

    private static final Logger LOG = LoggerFactory.getLogger(PubSubTransactionProducer.class);

    // Configuration defaults
    private static final String DEFAULT_TOPIC = "transactions";
    private static final int DEFAULT_TPS = 10;
    private static final int DEFAULT_FRAUD_RATE = 15;

    // Account pools
    private static final String[] NORMAL_ACCOUNTS = {
            "ACC-001", "ACC-002", "ACC-003", "ACC-004", "ACC-005",
            "ACC-006", "ACC-007", "ACC-008", "ACC-009", "ACC-010",
            "ACC-011", "ACC-012", "ACC-013", "ACC-014", "ACC-015"
    };

    private static final String[] SUSPICIOUS_ACCOUNTS = {
            "SUSP-001", "SUSP-002", "WATCH-001"
    };

    private static final String[] BLACKLISTED_ACCOUNTS = {
            "BLACK-001", "BLACK-002", "FRAUD-001"
    };

    private static final String[] NORMAL_COUNTRIES = {
            "US", "UK", "DE", "FR", "JP", "CA", "AU", "SG", "HK", "CN"
    };

    private static final String[] HIGH_RISK_COUNTRIES = {
            "XX", "YY"
    };

    private static final String[] BLOCKED_COUNTRIES = {
            "ZZ"
    };

    private static final String[] CHANNELS = {"ONLINE", "ATM", "POS", "MOBILE"};
    private static final String[] CURRENCIES = {"USD", "EUR", "GBP", "CNY", "JPY"};
    private static final TransactionType[] TX_TYPES = TransactionType.values();

    private final Publisher publisher;
    private final ObjectMapper objectMapper;
    private final String projectId;
    private final String topicId;
    private final int tps;
    private final int fraudRate;
    private final Random random;
    private final AtomicBoolean running;
    private final AtomicLong totalSent;
    private final AtomicLong fraudSent;
    private final AtomicLong failedCount;

    public PubSubTransactionProducer(String projectId, String topicId, int tps, int fraudRate) throws IOException {
        this.projectId = projectId;
        this.topicId = topicId;
        this.tps = tps;
        this.fraudRate = fraudRate;
        this.random = new Random();
        this.running = new AtomicBoolean(true);
        this.totalSent = new AtomicLong(0);
        this.fraudSent = new AtomicLong(0);
        this.failedCount = new AtomicLong(0);

        // Configure JSON serializer
        this.objectMapper = new ObjectMapper();
        this.objectMapper.registerModule(new JavaTimeModule());
        this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);

        // Create Pub/Sub publisher
        TopicName topicName = TopicName.of(projectId, topicId);
        LOG.info("Initializing Pub/Sub publisher for topic: {}", topicName);
        
        this.publisher = Publisher.newBuilder(topicName)
                .build();

        LOG.info("PubSubTransactionProducer initialized - Project: {}, Topic: {}, TPS: {}, FraudRate: {}%",
                projectId, topicId, tps, fraudRate);
    }

    /**
     * Start producing transactions at the configured rate.
     */
    public void start() {
        LOG.info("Starting Pub/Sub transaction producer...");

        long intervalMs = 1000 / tps;
        long lastLogTime = System.currentTimeMillis();

        // Add shutdown hook
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            LOG.info("Shutdown signal received, stopping producer...");
            running.set(false);
        }));

        while (running.get()) {
            try {
                // Decide if this should be a fraudulent transaction
                boolean isFraud = random.nextInt(100) < fraudRate;

                Transaction transaction = isFraud ? generateFraudulentTransaction() : generateNormalTransaction();
                String json = objectMapper.writeValueAsString(transaction);

                // Create Pub/Sub message
                ByteString data = ByteString.copyFromUtf8(json);
                PubsubMessage pubsubMessage = PubsubMessage.newBuilder()
                        .setData(data)
                        .putAttributes("account_id", transaction.getAccountId())
                        .putAttributes("transaction_type", transaction.getTransactionType().name())
                        .putAttributes("country_code", transaction.getCountryCode())
                        .putAttributes("is_suspicious", String.valueOf(isFraud))
                        .build();

                // Publish asynchronously
                ApiFuture<String> future = publisher.publish(pubsubMessage);
                
                final boolean finalIsFraud = isFraud;
                ApiFutures.addCallback(future, new ApiFutureCallback<String>() {
                    @Override
                    public void onSuccess(String messageId) {
                        totalSent.incrementAndGet();
                        if (finalIsFraud) {
                            fraudSent.incrementAndGet();
                        }
                        LOG.debug("Published message ID: {}", messageId);
                    }

                    @Override
                    public void onFailure(Throwable t) {
                        failedCount.incrementAndGet();
                        if (t instanceof ApiException) {
                            ApiException apiException = (ApiException) t;
                            LOG.error("Failed to publish message. Status: {}, Error: {}", 
                                    apiException.getStatusCode().getCode(), 
                                    apiException.getMessage());
                        } else {
                            LOG.error("Failed to publish message: {}", t.getMessage());
                        }
                    }
                }, MoreExecutors.directExecutor());

                // Log stats every 10 seconds
                long now = System.currentTimeMillis();
                if (now - lastLogTime >= 10000) {
                    LOG.info("Stats - Total sent: {}, Fraudulent: {} ({}%), Failed: {}, Rate: {} tx/s",
                            totalSent.get(),
                            fraudSent.get(),
                            totalSent.get() > 0 ? (fraudSent.get() * 100 / totalSent.get()) : 0,
                            failedCount.get(),
                            tps);
                    lastLogTime = now;
                }

                // Sleep to maintain the target TPS
                Thread.sleep(intervalMs);

            } catch (InterruptedException e) {
                LOG.info("Producer interrupted");
                running.set(false);
            } catch (Exception e) {
                LOG.error("Error producing transaction: {}", e.getMessage(), e);
            }
        }

        close();
    }

    /**
     * Generate a normal (non-fraudulent) transaction.
     */
    private Transaction generateNormalTransaction() {
        String accountId = NORMAL_ACCOUNTS[random.nextInt(NORMAL_ACCOUNTS.length)];
        String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
        BigDecimal amount = new BigDecimal(random.nextInt(5000) + 10);
        String country = NORMAL_COUNTRIES[random.nextInt(NORMAL_COUNTRIES.length)];

        return buildTransaction(accountId, targetAccountId, amount, country);
    }

    /**
     * Generate a fraudulent transaction with various fraud patterns.
     */
    private Transaction generateFraudulentTransaction() {
        int fraudType = random.nextInt(5);

        switch (fraudType) {
            case 0:
                return generateHighValueTransaction();
            case 1:
                return generateSuspiciousAccountTransaction();
            case 2:
                return generateBlacklistedAccountTransaction();
            case 3:
                return generateHighRiskCountryTransaction();
            case 4:
                return generateBlockedCountryTransaction();
            default:
                return generateHighValueTransaction();
        }
    }

    private Transaction generateHighValueTransaction() {
        String accountId = NORMAL_ACCOUNTS[random.nextInt(NORMAL_ACCOUNTS.length)];
        String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
        BigDecimal amount = new BigDecimal(random.nextInt(190000) + 10000);
        String country = NORMAL_COUNTRIES[random.nextInt(NORMAL_COUNTRIES.length)];

        Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
        LOG.debug("Generated HIGH VALUE transaction: {} - Amount: {}", tx.getTransactionId(), amount);
        return tx;
    }

    private Transaction generateSuspiciousAccountTransaction() {
        String accountId = SUSPICIOUS_ACCOUNTS[random.nextInt(SUSPICIOUS_ACCOUNTS.length)];
        String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
        BigDecimal amount = new BigDecimal(random.nextInt(10000) + 100);
        String country = NORMAL_COUNTRIES[random.nextInt(NORMAL_COUNTRIES.length)];

        Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
        LOG.debug("Generated SUSPICIOUS ACCOUNT transaction: {} - Account: {}", tx.getTransactionId(), accountId);
        return tx;
    }

    private Transaction generateBlacklistedAccountTransaction() {
        String accountId = BLACKLISTED_ACCOUNTS[random.nextInt(BLACKLISTED_ACCOUNTS.length)];
        String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
        BigDecimal amount = new BigDecimal(random.nextInt(50000) + 100);
        String country = NORMAL_COUNTRIES[random.nextInt(NORMAL_COUNTRIES.length)];

        Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
        LOG.debug("Generated BLACKLISTED ACCOUNT transaction: {} - Account: {}", tx.getTransactionId(), accountId);
        return tx;
    }

    private Transaction generateHighRiskCountryTransaction() {
        String accountId = NORMAL_ACCOUNTS[random.nextInt(NORMAL_ACCOUNTS.length)];
        String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
        BigDecimal amount = new BigDecimal(random.nextInt(20000) + 500);
        String country = HIGH_RISK_COUNTRIES[random.nextInt(HIGH_RISK_COUNTRIES.length)];

        Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
        LOG.debug("Generated HIGH-RISK COUNTRY transaction: {} - Country: {}", tx.getTransactionId(), country);
        return tx;
    }

    private Transaction generateBlockedCountryTransaction() {
        String accountId = NORMAL_ACCOUNTS[random.nextInt(NORMAL_ACCOUNTS.length)];
        String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
        BigDecimal amount = new BigDecimal(random.nextInt(30000) + 1000);
        String country = BLOCKED_COUNTRIES[random.nextInt(BLOCKED_COUNTRIES.length)];

        Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
        LOG.debug("Generated BLOCKED COUNTRY transaction: {} - Country: {}", tx.getTransactionId(), country);
        return tx;
    }

    private Transaction buildTransaction(String accountId, String targetAccountId,
                                         BigDecimal amount, String countryCode) {
        return Transaction.builder()
                .transactionId("TX-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase())
                .accountId(accountId)
                .targetAccountId(targetAccountId)
                .amount(amount)
                .currency(CURRENCIES[random.nextInt(CURRENCIES.length)])
                .transactionType(TX_TYPES[random.nextInt(TX_TYPES.length)])
                .timestamp(Instant.now())
                .merchantId("MERCH-" + String.format("%04d", random.nextInt(10000)))
                .location("City-" + random.nextInt(100))
                .countryCode(countryCode)
                .ipAddress(generateRandomIp())
                .deviceId("DEV-" + String.format("%06d", random.nextInt(1000000)))
                .channel(CHANNELS[random.nextInt(CHANNELS.length)])
                .build();
    }

    private String generateRandomIp() {
        return random.nextInt(256) + "." +
               random.nextInt(256) + "." +
               random.nextInt(256) + "." +
               random.nextInt(256);
    }

    public void close() {
        LOG.info("Closing Pub/Sub producer... Total sent: {}, Fraudulent: {}, Failed: {}",
                totalSent.get(), fraudSent.get(), failedCount.get());
        
        if (publisher != null) {
            try {
                publisher.shutdown();
                publisher.awaitTermination(30, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                LOG.warn("Publisher shutdown interrupted");
            }
        }
    }

    public static void main(String[] args) {
        // Get configuration from environment variables or command line
        String projectId = getConfig("GOOGLE_CLOUD_PROJECT", null, args, 0);
        String topicId = getConfig("PUBSUB_TOPIC", DEFAULT_TOPIC, args, 1);
        int tps = Integer.parseInt(getConfig("TPS", String.valueOf(DEFAULT_TPS), args, 2));
        int fraudRate = Integer.parseInt(getConfig("FRAUD_RATE", String.valueOf(DEFAULT_FRAUD_RATE), args, 3));

        // Validate required configuration
        if (projectId == null || projectId.isEmpty()) {
            System.err.println("ERROR: GOOGLE_CLOUD_PROJECT environment variable is required!");
            System.err.println();
            System.err.println("Usage:");
            System.err.println("  Set environment variables:");
            System.err.println("    GOOGLE_CLOUD_PROJECT=your-gcp-project-id");
            System.err.println("    GOOGLE_APPLICATION_CREDENTIALS=/path/to/service-account-key.json");
            System.err.println("    PUBSUB_TOPIC=transactions (optional, default: transactions)");
            System.err.println("    TPS=10 (optional, default: 10)");
            System.err.println("    FRAUD_RATE=15 (optional, default: 15)");
            System.err.println();
            System.err.println("  Or pass as command line arguments:");
            System.err.println("    java -cp ... PubSubTransactionProducer <project-id> <topic> <tps> <fraud-rate>");
            System.exit(1);
        }

        System.out.println("==============================================");
        System.out.println("   Pub/Sub Transaction Producer Starting      ");
        System.out.println("==============================================");
        System.out.println("GCP Project: " + projectId);
        System.out.println("Pub/Sub Topic: " + topicId);
        System.out.println("Target TPS: " + tps);
        System.out.println("Fraud Rate: " + fraudRate + "%");
        System.out.println("==============================================");
        System.out.println("Press Ctrl+C to stop");
        System.out.println();

        try {
            PubSubTransactionProducer producer = new PubSubTransactionProducer(projectId, topicId, tps, fraudRate);
            producer.start();
        } catch (IOException e) {
            System.err.println("Failed to initialize Pub/Sub producer: " + e.getMessage());
            e.printStackTrace();
            System.exit(1);
        }
    }

    private static String getConfig(String envName, String defaultValue, String[] args, int argIndex) {
        // Priority: command line args > environment variable > default
        if (args != null && args.length > argIndex && args[argIndex] != null && !args[argIndex].isEmpty()) {
            return args[argIndex];
        }
        String envValue = System.getenv(envName);
        if (envValue != null && !envValue.isEmpty()) {
            return envValue;
        }
        return defaultValue;
    }
}