package org.activiti.cloud.services.messages.core.integration;

import java.util.List;
import java.util.Objects;
import org.activiti.api.process.model.payloads.MessageEventPayload;
import org.activiti.cloud.services.messages.core.aggregator.MessageConnectorAggregator;
import org.activiti.cloud.services.messages.core.config.MessageAggregatorProperties;
import org.activiti.cloud.services.messages.core.correlation.Correlations;
import org.aopalliance.aop.Advice;
import org.springframework.integration.annotation.Filter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.annotation.Transformer;
import org.springframework.integration.dsl.IntegrationFlowAdapter;
import org.springframework.integration.dsl.IntegrationFlowDefinition;
import org.springframework.integration.dsl.Transformers;
import org.springframework.integration.handler.AbstractMessageProducingHandler;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.integration.handler.advice.HandleMessageAdvice;
import org.springframework.integration.handler.advice.IdempotentReceiverInterceptor;
import org.springframework.integration.router.AbstractMessageRouter;
import org.springframework.messaging.Message;

/* loaded from: input_file:org/activiti/cloud/services/messages/core/integration/MessageConnectorIntegrationFlow.class */
public class MessageConnectorIntegrationFlow extends IntegrationFlowAdapter {
    private static final String MESSAGE_GATEWAY = "messageGateway";
    private static final String AGGREGATOR = "aggregator";
    private static final String ENRICH_HEADERS = "enrichHeaders";
    private static final String FILTER_MESSAGE = "filterMessage";
    public static final String DISCARD_CHANNEL = "discardChannel";
    public static final String REPLY_CHANNEL = "replyChannel";
    public static final String ERROR_CHANNEL = "errorChannel";
    private final MessageConnectorAggregator aggregator;
    private final IdempotentReceiverInterceptor interceptor;
    private final HandleMessageAdvice[] advices;
    private final MessageAggregatorProperties properties;
    private final AbstractMessageRouter router;

    public MessageConnectorIntegrationFlow(MessageConnectorAggregator messageConnectorAggregator, IdempotentReceiverInterceptor idempotentReceiverInterceptor, List<? extends HandleMessageAdvice> list, MessageAggregatorProperties messageAggregatorProperties, AbstractMessageRouter abstractMessageRouter) {
        this.aggregator = messageConnectorAggregator;
        this.interceptor = idempotentReceiverInterceptor;
        this.advices = (HandleMessageAdvice[]) list.toArray(new HandleMessageAdvice[0]);
        this.properties = messageAggregatorProperties;
        this.router = abstractMessageRouter;
    }

    protected IntegrationFlowDefinition<?> buildFlow() {
        return from("messageConnectorInput-in-0").headerFilter(this.properties.getInputHeadersToRemove()).gateway(integrationFlowDefinition -> {
            integrationFlowDefinition.log(LoggingHandler.Level.DEBUG).enrichHeaders(headerEnricherSpec -> {
                headerEnricherSpec.headerChannelsToString(this.properties.getHeaderChannelsTimeToLiveExpression());
            }).filter(Message.class, this::filterMessage, filterEndpointSpec -> {
                filterEndpointSpec.id(FILTER_MESSAGE).discardChannel(DISCARD_CHANNEL);
            }).enrichHeaders(headerEnricherSpec2 -> {
                headerEnricherSpec2.id(ENRICH_HEADERS).headerFunction("correlationId", this::enrichHeaders);
            }).transform(Transformers.fromJson(MessageEventPayload.class)).handle(aggregator(), genericEndpointSpec -> {
                genericEndpointSpec.id(AGGREGATOR).advice(this.advices);
            }).route(output());
        }, gatewayEndpointSpec -> {
            gatewayEndpointSpec.transactional().id(MESSAGE_GATEWAY).requiresReply(false).async(true).replyTimeout(0L).advice(new Advice[]{this.interceptor});
        });
    }

    public AbstractMessageRouter output() {
        return this.router;
    }

    public AbstractMessageProducingHandler aggregator() {
        return this.aggregator;
    }

    @ServiceActivator
    public void aggregator(Message<?> message) {
        this.aggregator.handleMessage(message);
    }

    @Filter
    public boolean filterMessage(Message<?> message) {
        return Objects.nonNull(message.getHeaders().get(MessageEventHeaders.MESSAGE_EVENT_TYPE));
    }

    @Transformer
    public String enrichHeaders(Message<?> message) {
        return Correlations.getCorrelationId(message);
    }
}
