Паттерн Event sourcing

Event sourcing разработчики никогда не теряют кошелек, так как всегда могут восстановить историю событий за день

Контекст
ПРоблемный Толя:
Мы хотим отслеживать, как изменяются данные со временем. Но обычные CRUD-системы затирают старые значения. Нам нужно видеть всю историю изменений!
Плюс, откаты и восстановление — это боль.
Что делать?
Выручающая
Варвара:
Тебе поможет Event Sourcing. Вместо хранения текущего состояния, ты сохраняешь все события, которые к нему привели.
ПРоблемный Толя:
Подожди, то есть вообще не хранить текущие данные?
Выручающая
Варвара:
Да! Храни только события вроде OrderCreated, ItemAdded, PaymentConfirmed.
А текущее состояние восстанавливается проигрыванием событий.
Можно добавить снапшот - текущее состояние, чтобы ускорить работу при восстановлении.
ПРоблемный Толя:
А как это помогает?
Выручающая
Варвара:
  1. Полный лог того, что произошло - аудит
  2. Просто "отмени" событие или переиграй до нужного момента чтобы откатиться
  3. Публикуешь событие - другие системы реагируют
Главное — правильно проектировать события, они твоя единственная истина.
Проблема
Как поддерживать целостность и отслеживать изменения состояний в распределенных системах
Решение
Вместо хранения текущего состояния сущности вести историю изменений состояний данной сущности.
Для этого разработать статусную модель, к примеру:

OrderCreated, OrderApproved, OrderShipped, OrederRejectes, ...

И вести историю состояний по каждому ордеру, при это история не может меняться, только добавляться.
В случае восстановления актуального состояния мы проходимся по всей истории до последнего события, либо еще делаем shipment - табличку на чтение, в которой храним последнее состояние для ускорения восстановления.
Пример Event sourcing
Read Model для таблицы order_summary содержит:
order_id
customer_id
total_items
Но теперь бизнесу нужно, чтобы там ещё была сумма заказа:
order_id
customer_id
total_items
total_price
Проблема:
В read-модели total_price нет, а в Event Store есть события:
ItemAdded {sku: "A100", qty: 2, price: 10}
ItemAdded {sku: "B222", qty: 1, price: 45}
DiscountApplied {amount: 5}
Решение:
Пишем новый projection handler, где для ItemAdded мы делаем:
@EventHandler
public void on(ItemAdded e) {
    OrderSummary summary = repo.find(e.getOrderId());
    summary.totalItems += e.getQty();
    summary.totalPrice += e.getPrice() * e.getQty();
    repo.save(summary);
}
Стираем read-модель (например, очищаем таблицу order_summary).
Запускаем replay всех событий:
К примеру поднимаем новый консьюмер с auto.offset.reset=earliest.
В кастомном коде:
List<Event> events = eventStore.loadAll();
for(Event e : events) {
    projectionHandler.apply(e);
}
Пример таблицы для Event Store
Код replay events (rebuild aggregate). Кода в качестве event store выступает Kafka
public class OrderProjectionRebuilder {

    private final OrderProjectionService projectionService;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public OrderProjectionRebuilder(OrderProjectionService projectionService) {
        this.projectionService = projectionService;
    }

    public void replayAll() {
        Properties props = createConsumerProps();
        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(List.of("order-events"));

            // Можно предварительно подчистить read-модель
            // projectionService.clearAll();  // например, repo.deleteAll();

            boolean keepRunning = true;
            while (keepRunning) {
                ConsumerRecords<String, String> records =
                        consumer.poll(Duration.ofSeconds(1));

                if (records.isEmpty()) {
                    // если долго пусто – считаем, что дошли до конца топика
                    // и выходим; можно сделать более умную логику
                    keepRunning = false;
                }

                for (ConsumerRecord<String, String> record : records) {
                    String json = record.value();
                    try {
                        OrderEvent event = objectMapper.readValue(json, OrderEvent.class);
                        projectionService.apply(event);
                    } catch (Exception ex) {
                        // логируем и решаем, что делать с битым событием
                        System.err.println("Failed to process event: " + json);
                        ex.printStackTrace();
                    }
                }
            }
        }
    }

    private Properties createConsumerProps() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "order-projection-rebuild-" + System.currentTimeMillis());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return props;
    }
}
Как это выглядит в живой системе
  1. Программист допустил ошибку в расчёте read-модели.
  2. Ошибка замечена через неделю.
  3. В классической CRUD-та же ошибка уже попортила “настоящие” данные.
  4. Нужно чинить руками или восстанавливать бэкапы.

В Event Sourcing:
  1. Чиним логику в projection handler.
  2. Очищаем read-модель.
  3. Запускаем replay.
  4. Через минуту/час проекция пересчитана как будто ошибки никогда не было.
Исторические события остаются, но состояние выстроено заново уже по исправленной логике.
Преимущества
  • Возможность воссоздать состояние системы в любой момент времени путем воспроизведения событий, что полезно при восстановлении после сбоев или для тестирования.
  • Можно вернуться в прошлое состояние, «проиграв» события до определенного момента.
    Полезно для отладки, восстановления после багов или анализа поведения системы.
  • Реплей и проекции - Можно повторно воспроизвести события для:
    Построения новых read-моделей (CQRS)
    Миграции данных
  • Упрощает аудит, отладку и анализ. Это позволяет легко понять, как система пришла к текущему состоянию
  • События можно сразу публиковать в очереди (Kafka, RabbitMQ и т.д.). Проще интегрироваться с другими сервисами
  • События неизменяемы (append-only)
    Повышает надежность
    Позволяет эффективно кэшировать
    Делает данные удобными для репликации и масштабирования
Недостатки
  • Повышенная сложность разработки
    Состояние не хранится напрямую — нужно восстанавливать его из цепочки событий.
    Нужны агрегаты, обработчики событий, read-модели и прочая инфраструктура.
  • События нельзя менять задним числом
    Самое важное сразу разработать хорошую событийную модель.
  • Нельзя просто изменить структуру события
    Вводить версии событий
    Делать миграции
    Использовать upcaster(в рантайме преобразовывать старые версии событий)
  • Производительность восстановления
    При большом числе событий может быть медленно восстанавливать агрегат
    Поэтому часто используют:
    • Снапшоты
    • Или read-модели (CQRS)
Онлайн-школа профессионального программирования на java для коммерческих разработчиков и соискателей.
2 главные задачи, которые мы решаем:
  1. Трудоустройство и успешное прохождение испыталки.
  2. Переход на современный стек middle+
Наше главное достояние:
Менторская поддержка 24/7 и обучение в формате живого общения
Получить консультацию по обучению