Паттерн Event sourcing
Event sourcing разработчики никогда не теряют кошелек, так как всегда могут восстановить историю событий за день
order_id
customer_id
total_items order_id
customer_id
total_items
total_price ItemAdded {sku: "A100", qty: 2, price: 10}
ItemAdded {sku: "B222", qty: 1, price: 45}
DiscountApplied {amount: 5} @EventHandler
public void on(ItemAdded e) {
OrderSummary summary = repo.find(e.getOrderId());
summary.totalItems += e.getQty();
summary.totalPrice += e.getPrice() * e.getQty();
repo.save(summary);
} List<Event> events = eventStore.loadAll();
for(Event e : events) {
projectionHandler.apply(e);
}
public class OrderProjectionRebuilder {
private final OrderProjectionService projectionService;
private final ObjectMapper objectMapper = new ObjectMapper();
public OrderProjectionRebuilder(OrderProjectionService projectionService) {
this.projectionService = projectionService;
}
public void replayAll() {
Properties props = createConsumerProps();
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
consumer.subscribe(List.of("order-events"));
// Можно предварительно подчистить read-модель
// projectionService.clearAll(); // например, repo.deleteAll();
boolean keepRunning = true;
while (keepRunning) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1));
if (records.isEmpty()) {
// если долго пусто – считаем, что дошли до конца топика
// и выходим; можно сделать более умную логику
keepRunning = false;
}
for (ConsumerRecord<String, String> record : records) {
String json = record.value();
try {
OrderEvent event = objectMapper.readValue(json, OrderEvent.class);
projectionService.apply(event);
} catch (Exception ex) {
// логируем и решаем, что делать с битым событием
System.err.println("Failed to process event: " + json);
ex.printStackTrace();
}
}
}
}
}
private Properties createConsumerProps() {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-projection-rebuild-" + System.currentTimeMillis());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return props;
}
}