AlertSink.java
package com.hsbc.fraud.sink;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import com.hsbc.fraud.model.AlertSeverity;
import com.hsbc.fraud.model.FraudAlert;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
/**
* Alert sink implementations for outputting fraud alerts.
* Supports Kafka, logging, and custom sink implementations.
*/
public class AlertSink {
private static final Logger LOG = LoggerFactory.getLogger(AlertSink.class);
/**
* Creates a Kafka sink for fraud alerts.
*/
public static KafkaSink<FraudAlert> createKafkaSink(String bootstrapServers, String topic) {
return KafkaSink.<FraudAlert>builder()
.setBootstrapServers(bootstrapServers)
.setRecordSerializer(new AlertKafkaSerializer(topic))
.build();
}
/**
* Kafka serialization schema for FraudAlert.
*/
public static class AlertKafkaSerializer implements KafkaRecordSerializationSchema<FraudAlert> {
private static final long serialVersionUID = 1L;
private final String topic;
private transient ObjectMapper objectMapper;
public AlertKafkaSerializer(String topic) {
this.topic = topic;
}
@Override
public void open(SerializationSchema.InitializationContext context, KafkaSinkContext sinkContext) {
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
@Override
public ProducerRecord<byte[], byte[]> serialize(FraudAlert alert, KafkaSinkContext context, Long timestamp) {
try {
if (objectMapper == null) {
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
byte[] key = alert.getTransaction().getAccountId().getBytes(StandardCharsets.UTF_8);
byte[] value = objectMapper.writeValueAsBytes(alert);
return new ProducerRecord<>(topic, null, timestamp, key, value);
} catch (JsonProcessingException e) {
LOG.error("Failed to serialize alert: {}", alert.getAlertId(), e);
return null;
}
}
}
/**
* Sink function that logs fraud alerts with appropriate severity levels.
*/
public static class LoggingSink extends RichSinkFunction<FraudAlert> {
private static final long serialVersionUID = 1L;
private static final Logger ALERT_LOG = LoggerFactory.getLogger("FraudAlerts");
private transient ObjectMapper objectMapper;
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.enable(SerializationFeature.INDENT_OUTPUT);
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
@Override
public void invoke(FraudAlert alert, Context context) throws Exception {
String alertJson = objectMapper.writeValueAsString(alert);
// Log with appropriate severity
switch (alert.getSeverity()) {
case CRITICAL:
ALERT_LOG.error("[CRITICAL FRAUD ALERT] {}\n{}", formatAlertSummary(alert), alertJson);
break;
case HIGH:
ALERT_LOG.warn("[HIGH SEVERITY ALERT] {}\n{}", formatAlertSummary(alert), alertJson);
break;
case MEDIUM:
ALERT_LOG.warn("[MEDIUM SEVERITY ALERT] {}", formatAlertSummary(alert));
break;
case LOW:
ALERT_LOG.info("[LOW SEVERITY ALERT] {}", formatAlertSummary(alert));
break;
}
}
private String formatAlertSummary(FraudAlert alert) {
return String.format(
"AlertID=%s | TxID=%s | Account=%s | Type=%s | Score=%.2f | Rules=%s",
alert.getAlertId(),
alert.getTransaction().getTransactionId(),
alert.getTransaction().getAccountId(),
alert.getAlertType(),
alert.getRiskScore(),
alert.getTriggeredRules()
);
}
}
/**
* Sink function that sends alerts to Google Cloud Pub/Sub.
*/
public static class PubSubSink extends RichSinkFunction<FraudAlert> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(PubSubSink.class);
private final String projectId;
private final String topicId;
private transient Publisher publisher;
private transient ObjectMapper objectMapper;
public PubSubSink(String projectId, String topicId) {
this.projectId = projectId;
this.topicId = topicId;
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
TopicName topicName = TopicName.of(projectId, topicId);
try {
publisher = Publisher.newBuilder(topicName).build();
LOG.info("Pub/Sub publisher created for topic: {}", topicName);
} catch (IOException e) {
LOG.error("Failed to create Pub/Sub publisher", e);
throw e;
}
}
@Override
public void invoke(FraudAlert alert, Context context) throws Exception {
if (publisher == null) {
LOG.error("Publisher not initialized, skipping alert: {}", alert.getAlertId());
return;
}
try {
String alertJson = objectMapper.writeValueAsString(alert);
PubsubMessage message = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(alertJson))
.putAttributes("alertId", alert.getAlertId())
.putAttributes("severity", alert.getSeverity().name())
.putAttributes("alertType", alert.getAlertType().name())
.putAttributes("accountId", alert.getTransaction().getAccountId())
.build();
ApiFuture<String> future = publisher.publish(message);
ApiFutures.addCallback(future, new ApiFutureCallback<String>() {
@Override
public void onSuccess(String messageId) {
LOG.info("Published alert {} to Pub/Sub with messageId: {}",
alert.getAlertId(), messageId);
}
@Override
public void onFailure(Throwable t) {
LOG.error("Failed to publish alert {} to Pub/Sub", alert.getAlertId(), t);
}
}, MoreExecutors.directExecutor());
} catch (JsonProcessingException e) {
LOG.error("Failed to serialize alert: {}", alert.getAlertId(), e);
}
}
@Override
public void close() throws Exception {
if (publisher != null) {
publisher.shutdown();
LOG.info("Pub/Sub publisher shutdown completed");
}
super.close();
}
}
/**
* Sink function that sends alerts to an external webhook/API.
*/
public static class WebhookSink extends RichSinkFunction<FraudAlert> {
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(WebhookSink.class);
private final String webhookUrl;
private final AlertSeverity minSeverity;
private transient ObjectMapper objectMapper;
public WebhookSink(String webhookUrl) {
this(webhookUrl, AlertSeverity.MEDIUM);
}
public WebhookSink(String webhookUrl, AlertSeverity minSeverity) {
this.webhookUrl = webhookUrl;
this.minSeverity = minSeverity;
}
@Override
public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
super.open(parameters);
objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
@Override
public void invoke(FraudAlert alert, Context context) throws Exception {
// Only send alerts meeting minimum severity
if (alert.getSeverity().getLevel() < minSeverity.getLevel()) {
return;
}
// In production, this would make an HTTP call to the webhook
// For demo purposes, we just log
LOG.info("Would send alert {} to webhook {}", alert.getAlertId(), webhookUrl);
// Example webhook call (commented out for demo):
/*
HttpClient client = HttpClient.newHttpClient();
HttpRequest request = HttpRequest.newBuilder()
.uri(URI.create(webhookUrl))
.header("Content-Type", "application/json")
.POST(HttpRequest.BodyPublishers.ofString(objectMapper.writeValueAsString(alert)))
.build();
client.sendAsync(request, HttpResponse.BodyHandlers.ofString())
.thenAccept(response -> {
if (response.statusCode() >= 400) {
LOG.error("Failed to send alert: HTTP {}", response.statusCode());
}
});
*/
}
}
}