package org.activiti.cloud.services.notifications.graphql.events.consumer;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.activiti.cloud.services.notifications.graphql.events.model.EngineEvent;
import org.activiti.cloud.services.notifications.graphql.events.transformer.Transformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:org/activiti/cloud/services/notifications/graphql/events/consumer/EngineEventsConsumerMessageHandler.class */
public class EngineEventsConsumerMessageHandler {
    private static Logger logger = LoggerFactory.getLogger(EngineEventsConsumerMessageHandler.class);
    private final FluxSink<Message<List<EngineEvent>>> processorSink;
    private final Transformer transformer;

    public EngineEventsConsumerMessageHandler(Transformer transformer, FluxSink<Message<List<EngineEvent>>> fluxSink) {
        this.processorSink = fluxSink;
        this.transformer = transformer;
    }

    @StreamListener
    public void receive(@Input("graphQLEngineEventsConsumerSource") Flux<Message<List<Map<String, Object>>>> flux) {
        Flux flatMapSequential = flux.flatMapSequential(message -> {
            List<Map<String, Object>> list = (List) message.getPayload();
            logger.info("Recieved source message with routingKey: {}", (String) message.getHeaders().get("routingKey"));
            return Flux.fromIterable(this.transformer.transform(list)).collectList().map(list2 -> {
                return MessageBuilder.createMessage(list2, message.getHeaders());
            });
        });
        FluxSink<Message<List<EngineEvent>>> fluxSink = this.processorSink;
        Objects.requireNonNull(fluxSink);
        flatMapSequential.doOnNext((v1) -> {
            r1.next(v1);
        }).doOnError(th -> {
            logger.error("Error handling message ", th);
        }).retry().subscribe();
    }
}
