TransactionProducer.java
package com.hsbc.fraud.producer;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.hsbc.fraud.model.Transaction;
import com.hsbc.fraud.model.TransactionType;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
/**
* Standalone Transaction Producer that generates simulated transactions and sends to Kafka.
* Can be run independently on the Kafka server or any machine with Kafka connectivity.
*
* Usage:
* java -cp fraud-detection-system-1.0.0.jar com.hsbc.fraud.producer.TransactionProducer
*
* Environment variables:
* KAFKA_BOOTSTRAP_SERVERS - Kafka broker address (default: 10.113.192.45:9092)
* KAFKA_TOPIC - Target topic (default: transactions)
* TPS - Transactions per second (default: 10)
* FRAUD_RATE - Percentage of fraudulent transactions (default: 15)
*/
public class TransactionProducer {
private static final Logger LOG = LoggerFactory.getLogger(TransactionProducer.class);
// Configuration with defaults
private static final String DEFAULT_BOOTSTRAP_SERVERS = "10.113.192.45:9092";
private static final String DEFAULT_TOPIC = "transactions";
private static final int DEFAULT_TPS = 10; // transactions per second
private static final int DEFAULT_FRAUD_RATE = 15; // percentage
// Account pools
private static final String[] NORMAL_ACCOUNTS = {
"ACC-001", "ACC-002", "ACC-003", "ACC-004", "ACC-005",
"ACC-006", "ACC-007", "ACC-008", "ACC-009", "ACC-010",
"ACC-011", "ACC-012", "ACC-013", "ACC-014", "ACC-015"
};
private static final String[] SUSPICIOUS_ACCOUNTS = {
"SUSP-001", "SUSP-002", "WATCH-001"
};
private static final String[] BLACKLISTED_ACCOUNTS = {
"BLACK-001", "BLACK-002", "FRAUD-001"
};
private static final String[] NORMAL_COUNTRIES = {
"US", "UK", "DE", "FR", "JP", "CA", "AU", "SG", "HK", "CN"
};
private static final String[] HIGH_RISK_COUNTRIES = {
"XX", "YY" // Fictional high-risk countries
};
private static final String[] BLOCKED_COUNTRIES = {
"ZZ" // Fictional blocked country
};
private static final String[] CHANNELS = {"ONLINE", "ATM", "POS", "MOBILE"};
private static final String[] CURRENCIES = {"USD", "EUR", "GBP", "CNY", "JPY"};
private static final TransactionType[] TX_TYPES = TransactionType.values();
private final KafkaProducer<String, String> producer;
private final ObjectMapper objectMapper;
private final String topic;
private final int tps;
private final int fraudRate;
private final Random random;
private final AtomicBoolean running;
private final AtomicLong totalSent;
private final AtomicLong fraudSent;
public TransactionProducer(String bootstrapServers, String topic, int tps, int fraudRate) {
this.topic = topic;
this.tps = tps;
this.fraudRate = fraudRate;
this.random = new Random();
this.running = new AtomicBoolean(true);
this.totalSent = new AtomicLong(0);
this.fraudSent = new AtomicLong(0);
// Configure Kafka producer
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.ACKS_CONFIG, "1");
props.put(ProducerConfig.RETRIES_CONFIG, 3);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 5);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
// 减少超时时间以便快速发现连接问题
props.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, 10000); // 10 seconds
props.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, 30000); // 30 seconds
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000); // 10 seconds
this.producer = new KafkaProducer<>(props);
// 测试连接
LOG.info("Testing Kafka connection to {}...", bootstrapServers);
testConnection(bootstrapServers, topic);
// Configure JSON serializer
this.objectMapper = new ObjectMapper();
this.objectMapper.registerModule(new JavaTimeModule());
this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
LOG.info("TransactionProducer initialized - Kafka: {}, Topic: {}, TPS: {}, FraudRate: {}%",
bootstrapServers, topic, tps, fraudRate);
}
/**
* Start producing transactions at the configured rate.
*/
public void start() {
LOG.info("Starting transaction producer...");
long intervalMs = 1000 / tps; // interval between transactions in ms
long lastLogTime = System.currentTimeMillis();
// Add shutdown hook
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
LOG.info("Shutdown signal received, stopping producer...");
running.set(false);
}));
while (running.get()) {
try {
// Decide if this should be a fraudulent transaction
boolean isFraud = random.nextInt(100) < fraudRate;
Transaction transaction = isFraud ? generateFraudulentTransaction() : generateNormalTransaction();
String json = objectMapper.writeValueAsString(transaction);
ProducerRecord<String, String> record = new ProducerRecord<>(
topic,
transaction.getAccountId(), // Use account as key for partitioning
json
);
producer.send(record, (metadata, exception) -> {
if (exception != null) {
LOG.error("Failed to send transaction: {}", exception.getMessage());
} else {
totalSent.incrementAndGet();
if (isFraud) {
fraudSent.incrementAndGet();
}
}
});
// Log stats every 10 seconds
long now = System.currentTimeMillis();
if (now - lastLogTime >= 10000) {
LOG.info("Stats - Total sent: {}, Fraudulent: {} ({}%), Rate: {} tx/s",
totalSent.get(),
fraudSent.get(),
totalSent.get() > 0 ? (fraudSent.get() * 100 / totalSent.get()) : 0,
tps);
lastLogTime = now;
}
// Sleep to maintain the target TPS
Thread.sleep(intervalMs);
} catch (InterruptedException e) {
LOG.info("Producer interrupted");
running.set(false);
} catch (Exception e) {
LOG.error("Error producing transaction: {}", e.getMessage(), e);
}
}
close();
}
/**
* Generate a normal (non-fraudulent) transaction.
*/
private Transaction generateNormalTransaction() {
String accountId = NORMAL_ACCOUNTS[random.nextInt(NORMAL_ACCOUNTS.length)];
String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
BigDecimal amount = new BigDecimal(random.nextInt(5000) + 10); // $10 - $5010
String country = NORMAL_COUNTRIES[random.nextInt(NORMAL_COUNTRIES.length)];
return buildTransaction(accountId, targetAccountId, amount, country);
}
/**
* Generate a fraudulent transaction with various fraud patterns.
*/
private Transaction generateFraudulentTransaction() {
int fraudType = random.nextInt(5);
switch (fraudType) {
case 0:
// High-value transaction
return generateHighValueTransaction();
case 1:
// Suspicious account transaction
return generateSuspiciousAccountTransaction();
case 2:
// Blacklisted account transaction
return generateBlacklistedAccountTransaction();
case 3:
// High-risk country transaction
return generateHighRiskCountryTransaction();
case 4:
// Blocked country transaction
return generateBlockedCountryTransaction();
default:
return generateHighValueTransaction();
}
}
private Transaction generateHighValueTransaction() {
String accountId = NORMAL_ACCOUNTS[random.nextInt(NORMAL_ACCOUNTS.length)];
String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
// High value: $10,000 - $200,000
BigDecimal amount = new BigDecimal(random.nextInt(190000) + 10000);
String country = NORMAL_COUNTRIES[random.nextInt(NORMAL_COUNTRIES.length)];
Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
LOG.debug("Generated HIGH VALUE transaction: {} - Amount: {}", tx.getTransactionId(), amount);
return tx;
}
private Transaction generateSuspiciousAccountTransaction() {
String accountId = SUSPICIOUS_ACCOUNTS[random.nextInt(SUSPICIOUS_ACCOUNTS.length)];
String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
BigDecimal amount = new BigDecimal(random.nextInt(10000) + 100);
String country = NORMAL_COUNTRIES[random.nextInt(NORMAL_COUNTRIES.length)];
Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
LOG.debug("Generated SUSPICIOUS ACCOUNT transaction: {} - Account: {}", tx.getTransactionId(), accountId);
return tx;
}
private Transaction generateBlacklistedAccountTransaction() {
String accountId = BLACKLISTED_ACCOUNTS[random.nextInt(BLACKLISTED_ACCOUNTS.length)];
String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
BigDecimal amount = new BigDecimal(random.nextInt(50000) + 100);
String country = NORMAL_COUNTRIES[random.nextInt(NORMAL_COUNTRIES.length)];
Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
LOG.debug("Generated BLACKLISTED ACCOUNT transaction: {} - Account: {}", tx.getTransactionId(), accountId);
return tx;
}
private Transaction generateHighRiskCountryTransaction() {
String accountId = NORMAL_ACCOUNTS[random.nextInt(NORMAL_ACCOUNTS.length)];
String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
BigDecimal amount = new BigDecimal(random.nextInt(20000) + 500);
String country = HIGH_RISK_COUNTRIES[random.nextInt(HIGH_RISK_COUNTRIES.length)];
Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
LOG.debug("Generated HIGH-RISK COUNTRY transaction: {} - Country: {}", tx.getTransactionId(), country);
return tx;
}
private Transaction generateBlockedCountryTransaction() {
String accountId = NORMAL_ACCOUNTS[random.nextInt(NORMAL_ACCOUNTS.length)];
String targetAccountId = "ACC-" + String.format("%03d", random.nextInt(1000));
BigDecimal amount = new BigDecimal(random.nextInt(30000) + 1000);
String country = BLOCKED_COUNTRIES[random.nextInt(BLOCKED_COUNTRIES.length)];
Transaction tx = buildTransaction(accountId, targetAccountId, amount, country);
LOG.debug("Generated BLOCKED COUNTRY transaction: {} - Country: {}", tx.getTransactionId(), country);
return tx;
}
private Transaction buildTransaction(String accountId, String targetAccountId,
BigDecimal amount, String countryCode) {
return Transaction.builder()
.transactionId("TX-" + UUID.randomUUID().toString().substring(0, 8).toUpperCase())
.accountId(accountId)
.targetAccountId(targetAccountId)
.amount(amount)
.currency(CURRENCIES[random.nextInt(CURRENCIES.length)])
.transactionType(TX_TYPES[random.nextInt(TX_TYPES.length)])
.timestamp(Instant.now())
.merchantId("MERCH-" + String.format("%04d", random.nextInt(10000)))
.location("City-" + random.nextInt(100))
.countryCode(countryCode)
.ipAddress(generateRandomIp())
.deviceId("DEV-" + String.format("%06d", random.nextInt(1000000)))
.channel(CHANNELS[random.nextInt(CHANNELS.length)])
.build();
}
private String generateRandomIp() {
return random.nextInt(256) + "." +
random.nextInt(256) + "." +
random.nextInt(256) + "." +
random.nextInt(256);
}
/**
* Test connection to Kafka and create topic if needed.
*/
private void testConnection(String bootstrapServers, String topic) {
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
adminProps.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
adminProps.put(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 10000);
try (AdminClient adminClient = AdminClient.create(adminProps)) {
// 尝试列出主题以测试连接
ListTopicsResult topicsResult = adminClient.listTopics();
Set<String> topics = topicsResult.names().get(10, TimeUnit.SECONDS);
LOG.info("Successfully connected to Kafka! Found {} topics", topics.size());
// 检查目标主题是否存在
if (!topics.contains(topic)) {
LOG.warn("Topic '{}' does not exist. Creating it...", topic);
try {
NewTopic newTopic = new NewTopic(topic, 4, (short) 1);
adminClient.createTopics(Collections.singletonList(newTopic)).all().get(10, TimeUnit.SECONDS);
LOG.info("Topic '{}' created successfully!", topic);
} catch (Exception e) {
LOG.warn("Could not create topic (may already exist or auto-create is enabled): {}", e.getMessage());
}
} else {
LOG.info("Topic '{}' exists.", topic);
}
} catch (TimeoutException e) {
LOG.error("==============================================");
LOG.error("KAFKA CONNECTION TIMEOUT!");
LOG.error("Cannot connect to Kafka at: {}", bootstrapServers);
LOG.error("Please check:");
LOG.error(" 1. Kafka broker is running");
LOG.error(" 2. Network connectivity (try: telnet {} {})",
bootstrapServers.split(":")[0],
bootstrapServers.contains(":") ? bootstrapServers.split(":")[1] : "9092");
LOG.error(" 3. Firewall settings");
LOG.error(" 4. Kafka advertised.listeners configuration");
LOG.error("==============================================");
throw new RuntimeException("Cannot connect to Kafka: " + e.getMessage(), e);
} catch (ExecutionException | InterruptedException e) {
LOG.error("Failed to connect to Kafka: {}", e.getMessage());
throw new RuntimeException("Cannot connect to Kafka: " + e.getMessage(), e);
}
}
public void close() {
LOG.info("Closing producer... Total transactions sent: {}, Fraudulent: {}",
totalSent.get(), fraudSent.get());
producer.flush();
producer.close();
}
public static void main(String[] args) {
// Get configuration from environment variables or command line
String bootstrapServers = getConfig("KAFKA_BOOTSTRAP_SERVERS", DEFAULT_BOOTSTRAP_SERVERS, args, 0);
String topic = getConfig("KAFKA_TOPIC", DEFAULT_TOPIC, args, 1);
int tps = Integer.parseInt(getConfig("TPS", String.valueOf(DEFAULT_TPS), args, 2));
int fraudRate = Integer.parseInt(getConfig("FRAUD_RATE", String.valueOf(DEFAULT_FRAUD_RATE), args, 3));
System.out.println("==============================================");
System.out.println(" Transaction Producer Starting ");
System.out.println("==============================================");
System.out.println("Kafka Brokers: " + bootstrapServers);
System.out.println("Topic: " + topic);
System.out.println("Target TPS: " + tps);
System.out.println("Fraud Rate: " + fraudRate + "%");
System.out.println("==============================================");
System.out.println("Press Ctrl+C to stop");
System.out.println();
TransactionProducer producer = new TransactionProducer(bootstrapServers, topic, tps, fraudRate);
producer.start();
}
private static String getConfig(String envName, String defaultValue, String[] args, int argIndex) {
// Priority: command line args > environment variable > default
if (args != null && args.length > argIndex && args[argIndex] != null && !args[argIndex].isEmpty()) {
return args[argIndex];
}
String envValue = System.getenv(envName);
if (envValue != null && !envValue.isEmpty()) {
return envValue;
}
return defaultValue;
}
}