TransactionSource.java
package com.hsbc.fraud.source;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.hsbc.fraud.model.Transaction;
import com.hsbc.fraud.model.TransactionType;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.Duration;
import java.time.Instant;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Transaction data sources for the fraud detection system.
* Provides Pub/Sub, Kafka, and demo (simulated) data sources.
*/
public class TransactionSource {
private static final Logger LOG = LoggerFactory.getLogger(TransactionSource.class);
/**
* Supported source types for the fraud detection system.
*/
public enum SourceType {
PUBSUB, // Google Cloud Pub/Sub (default)
KAFKA, // Apache Kafka
DEMO // Simulated data for testing
}
/**
* Creates a Pub/Sub source for transactions.
*
* @param env Flink execution environment
* @param projectId GCP project ID
* @param subscriptionId Pub/Sub subscription ID
* @return DataStream of transactions
*/
public static DataStream<Transaction> createPubSubSource(
StreamExecutionEnvironment env,
String projectId,
String subscriptionId) {
LOG.info("Creating Pub/Sub source - Project: {}, Subscription: {}", projectId, subscriptionId);
DataStream<Transaction> transactions = env
.addSource(new PubSubTransactionSource(projectId, subscriptionId))
.name("Pub/Sub Transaction Source");
// Add watermarks for event time processing
return transactions
.assignTimestampsAndWatermarks(
WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((tx, timestamp) ->
tx.getTimestamp() != null ? tx.getTimestamp().toEpochMilli() : System.currentTimeMillis())
);
}
/**
* Creates a Kafka source for transactions.
*/
public static DataStream<Transaction> createKafkaSource(
StreamExecutionEnvironment env,
String bootstrapServers,
String topic) {
KafkaSource<Transaction> kafkaSource = KafkaSource.<Transaction>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topic)
.setGroupId("fraud-detection-group")
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(new TransactionDeserializer())
.build();
return env.fromSource(
kafkaSource,
WatermarkStrategy.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(5))
.withTimestampAssigner((tx, timestamp) ->
tx.getTimestamp() != null ? tx.getTimestamp().toEpochMilli() : System.currentTimeMillis()),
"Kafka Transaction Source"
);
}
/**
* JSON deserializer for Transaction objects.
*/
public static class TransactionDeserializer extends AbstractDeserializationSchema<Transaction> {
private static final long serialVersionUID = 1L;
private transient ObjectMapper objectMapper;
@Override
public void open(InitializationContext context) throws Exception {
super.open(context);
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
}
@Override
public Transaction deserialize(byte[] message) {
try {
if (objectMapper == null) {
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
}
return objectMapper.readValue(message, Transaction.class);
} catch (Exception e) {
LOG.error("Failed to deserialize transaction: {}", new String(message), e);
return null;
}
}
}
/**
* Demo source that generates simulated transactions for testing.
* Generates a mix of normal and suspicious transactions.
*/
public static class DemoTransactionSource extends RichSourceFunction<Transaction> {
private static final long serialVersionUID = 1L;
private volatile boolean running = true;
private transient Random random;
private static final String[] ACCOUNT_IDS = {
"ACC-001", "ACC-002", "ACC-003", "ACC-004", "ACC-005",
"SUSP-001", "SUSP-002", "BLACK-001" // Suspicious/blacklisted accounts
};
private static final String[] COUNTRIES = {
"US", "UK", "DE", "FR", "JP", "XX", "YY", "ZZ" // XX, YY are high-risk, ZZ is blocked
};
private static final String[] CHANNELS = {
"ONLINE", "ATM", "POS", "MOBILE"
};
private static final TransactionType[] TX_TYPES = TransactionType.values();
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
random = new Random();
}
@Override
public void run(SourceContext<Transaction> ctx) throws Exception {
LOG.info("Starting demo transaction source");
int counter = 0;
while (running) {
Transaction transaction = generateTransaction(counter++);
synchronized (ctx.getCheckpointLock()) {
ctx.collect(transaction);
}
// Generate transactions at varying rates
int sleepTime = random.nextInt(1000) + 100; // 100ms to 1.1s
// Occasionally generate burst of transactions (simulating fraud pattern)
if (random.nextInt(100) < 5) { // 5% chance
sleepTime = random.nextInt(100) + 10; // Rapid transactions
}
Thread.sleep(sleepTime);
}
}
private Transaction generateTransaction(int counter) {
String accountId = ACCOUNT_IDS[random.nextInt(ACCOUNT_IDS.length)];
String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
// Generate amounts - occasionally generate high-value transactions
BigDecimal amount;
if (random.nextInt(100) < 10) { // 10% chance of high-value
amount = new BigDecimal(random.nextInt(100000) + 10000);
} else if (random.nextInt(100) < 5) { // 5% chance of very high value
amount = new BigDecimal(random.nextInt(500000) + 50000);
} else {
amount = new BigDecimal(random.nextInt(5000) + 10);
}
// Generate timestamp - occasionally generate off-hours transactions
Instant timestamp = Instant.now();
if (random.nextInt(100) < 10) { // 10% chance of unusual time
// Set time to 2-4 AM
timestamp = timestamp.minusSeconds(random.nextInt(7200) + 7200);
}
return Transaction.builder()
.transactionId("TX-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase())
.accountId(accountId)
.targetAccountId(targetAccountId)
.amount(amount)
.currency("USD")
.transactionType(TX_TYPES[random.nextInt(TX_TYPES.length)])
.timestamp(timestamp)
.merchantId("MERCH-" + String.format("%04d", random.nextInt(10000)))
.location("City-" + random.nextInt(100))
.countryCode(COUNTRIES[random.nextInt(COUNTRIES.length)])
.ipAddress(generateIpAddress())
.deviceId("DEV-" + String.format("%06d", random.nextInt(1000000)))
.channel(CHANNELS[random.nextInt(CHANNELS.length)])
.build();
}
private String generateIpAddress() {
return random.nextInt(256) + "." +
random.nextInt(256) + "." +
random.nextInt(256) + "." +
random.nextInt(256);
}
@Override
public void cancel() {
LOG.info("Stopping demo transaction source");
running = false;
}
}
/**
* Pub/Sub source that reads transactions from Google Cloud Pub/Sub.
* Uses a pull subscription to receive messages.
*/
public static class PubSubTransactionSource extends RichSourceFunction<Transaction> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(PubSubTransactionSource.class);
private final String projectId;
private final String subscriptionId;
private volatile boolean running = true;
private transient Subscriber subscriber;
private transient BlockingQueue<Transaction> messageQueue;
private transient ObjectMapper objectMapper;
public PubSubTransactionSource(String projectId, String subscriptionId) {
this.projectId = projectId;
this.subscriptionId = subscriptionId;
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
LOG.info("Opening Pub/Sub source - Project: {}, Subscription: {}", projectId, subscriptionId);
// Initialize object mapper
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
// Initialize message queue
messageQueue = new LinkedBlockingQueue<>(10000);
// Create subscription name
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId);
// Create message receiver
MessageReceiver receiver = (PubsubMessage message, AckReplyConsumer consumer) -> {
try {
String json = message.getData().toStringUtf8();
Transaction transaction = objectMapper.readValue(json, Transaction.class);
if (transaction != null) {
// Try to add to queue, if full, drop the oldest
if (!messageQueue.offer(transaction)) {
messageQueue.poll(); // Remove oldest
messageQueue.offer(transaction);
LOG.warn("Message queue full, dropped oldest message");
}
}
// Acknowledge the message
consumer.ack();
} catch (Exception e) {
LOG.error("Failed to process Pub/Sub message: {}", e.getMessage(), e);
// Negative acknowledgment - message will be redelivered
consumer.nack();
}
};
// Build and start subscriber
subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.build();
subscriber.startAsync().awaitRunning();
LOG.info("Pub/Sub subscriber started successfully");
}
@Override
public void run(SourceContext<Transaction> ctx) throws Exception {
LOG.info("Starting Pub/Sub transaction source");
while (running) {
try {
// Poll from queue with timeout
Transaction transaction = messageQueue.poll(100, TimeUnit.MILLISECONDS);
if (transaction != null) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(transaction);
}
}
} catch (InterruptedException e) {
LOG.info("Pub/Sub source interrupted");
Thread.currentThread().interrupt();
running = false;
}
}
}
@Override
public void cancel() {
LOG.info("Cancelling Pub/Sub transaction source");
running = false;
if (subscriber != null) {
try {
subscriber.stopAsync().awaitTerminated(30, TimeUnit.SECONDS);
LOG.info("Pub/Sub subscriber stopped");
} catch (TimeoutException e) {
LOG.warn("Timeout while stopping Pub/Sub subscriber", e);
}
}
}
@Override
public void close() throws Exception {
cancel();
super.close();
}
}
}