/*
 * Decompiled with CFR 0.152.
 */
package org.activiti.cloud.common.messaging.config;

import java.lang.reflect.Type;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.activiti.cloud.common.messaging.ActivitiCloudMessagingProperties;
import org.activiti.cloud.common.messaging.config.AbstractFunctionalBindingConfiguration;
import org.activiti.cloud.common.messaging.config.FunctionAnnotationService;
import org.activiti.cloud.common.messaging.config.FunctionBindingConfiguration;
import org.activiti.cloud.common.messaging.functional.Connector;
import org.activiti.cloud.common.messaging.functional.ConnectorBinding;
import org.activiti.cloud.common.messaging.functional.ConsumerConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.boot.autoconfigure.AutoConfiguration;
import org.springframework.cloud.function.context.FunctionRegistration;
import org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry;
import org.springframework.cloud.stream.config.BinderFactoryAutoConfiguration;
import org.springframework.cloud.stream.function.FunctionConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.core.GenericHandler;
import org.springframework.integration.core.GenericSelector;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlowBuilder;
import org.springframework.integration.dsl.IntegrationFlowDefinition;
import org.springframework.integration.dsl.StandardIntegrationFlow;
import org.springframework.integration.dsl.context.IntegrationFlowContext;
import org.springframework.integration.filter.ExpressionEvaluatingSelector;
import org.springframework.integration.handler.LoggingHandler;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.util.StringUtils;

@AutoConfiguration(after={BinderFactoryAutoConfiguration.class, FunctionBindingConfiguration.class}, before={FunctionConfiguration.class})
public class ConnectorConfiguration
extends AbstractFunctionalBindingConfiguration {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnectorConfiguration.class);
    public static final String CONNECTOR_BINDING_SELECTOR_DISCARD_FLOW = "connectorBindingSelectorDiscardFlow";
    public static final String CONNECTOR_BINDING_SELECTOR_DISCARD_CHANNEL = "connectorBindingSelectorDiscardChannel";
    public static final String NULL_CHANNEL = "nullChannel";
    public static final String RETRY_COUNT = "x-retry-count";

    @Bean(name={"connectorBindingSelectorDiscardFlow"})
    IntegrationFlow functionBindingSelectorDiscardFlow() {
        return ((IntegrationFlowBuilder)((IntegrationFlowBuilder)IntegrationFlow.from((String)CONNECTOR_BINDING_SELECTOR_DISCARD_CHANNEL).log(LoggingHandler.Level.DEBUG, CONNECTOR_BINDING_SELECTOR_DISCARD_FLOW)).channel(NULL_CHANNEL)).get();
    }

    @Bean(name={"connectorBindingPostProcessor"})
    public BeanPostProcessor connectorBindingPostProcessor(final FunctionAnnotationService functionAnnotationService, final IntegrationFlowContext integrationFlowContext, final Function<String, String> resolveExpression, final ActivitiCloudMessagingProperties messagingProperties, final @Value(value="${activiti.connector.retry.default.max:-1}") int defaultMaxRetry, final @Value(value="${activiti.connector.retry.default.delay:0}") Long defaultRetryDelay) {
        return new BeanPostProcessor(){

            public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
                if (Connector.class.isInstance(bean) || ConsumerConnector.class.isInstance(bean)) {
                    AtomicReference responseDestination = new AtomicReference();
                    Optional.ofNullable(functionAnnotationService.findAnnotationOnBean(beanName, ConnectorBinding.class)).ifPresent(connectorBinding -> {
                        Type functionType = ConnectorConfiguration.this.discoverFunctionType(bean, beanName);
                        ActivitiCloudMessagingProperties.FunctionRouterProperties functionRouter = messagingProperties.getFunctionRouter();
                        FunctionRegistration functionRegistration = new FunctionRegistration(bean, new String[0]).type(functionType);
                        String functionBeanName = ConnectorConfiguration.this.registerFunctionRegistration(beanName, functionRegistration);
                        if (functionRouter.isEnabled()) {
                            Optional.ofNullable(connectorBinding.connectorType()).filter(StringUtils::hasText).map(resolveExpression).ifPresentOrElse(connectorType -> functionRouter.register(connectorBinding.input(), functionBeanName, (String)connectorType), () -> functionRouter.register(connectorBinding.input(), functionBeanName));
                        }
                        responseDestination.set(connectorBinding.outputHeader());
                        GenericHandler handler = (message, headers) -> {
                            SimpleFunctionRegistry.FunctionInvocationWrapper function = ConnectorConfiguration.this.functionFromDefinition(beanName);
                            Object result = function.apply(message);
                            Message response = null;
                            if (result != null) {
                                response = MessageBuilder.withPayload((Object)result).build();
                                String destination = (String)headers.get(responseDestination.get(), String.class);
                                if (StringUtils.hasText((String)destination)) {
                                    ConnectorConfiguration.this.getStreamBridge().send(destination, (Object)response);
                                    return null;
                                }
                            }
                            return response;
                        };
                        GenericSelector selector = (GenericSelector)Optional.ofNullable(connectorBinding).map(ConnectorBinding::condition).filter(StringUtils::hasText).map(resolveExpression).map(ExpressionEvaluatingSelector::new).orElseGet(() -> new ExpressionEvaluatingSelector("true"));
                        GenericSelector connectorType2 = (GenericSelector)Optional.ofNullable(connectorBinding).map(ConnectorBinding::connectorType).filter(StringUtils::hasText).map(resolveExpression).map(it -> "headers.containsKey('connectorType') && headers['connectorType']=='" + it + "'").map(ExpressionEvaluatingSelector::new).orElseGet(() -> new ExpressionEvaluatingSelector("true"));
                        StandardIntegrationFlow connectorFlow = ((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)((IntegrationFlowBuilder)IntegrationFlow.from(ConnectorConfiguration.this.getGatewayInterface(Function.class.isInstance(bean)), gateway -> gateway.replyTimeout(0L)).log(LoggingHandler.Level.DEBUG, beanName + ".integrationRequest")).filter(selector, filter -> {
                            int retry;
                            int n = retry = connectorBinding.retry() != 0 ? connectorBinding.retry() : defaultMaxRetry;
                            if (retry > 0) {
                                long retryDelay = connectorBinding.retryDelay() == 0L ? defaultRetryDelay.longValue() : connectorBinding.retryDelay();
                                LOGGER.info("Configure filter retry count to {} with delay {} for bean {}", new Object[]{retry, retryDelay, beanName});
                                filter.discardFlow(flow -> ConnectorConfiguration.this.handleRetryDiscardFlow(flow, retry, retryDelay)).throwExceptionOnRejection(false);
                            } else {
                                LOGGER.debug("Configure default discard for bean {}", (Object)beanName);
                                filter.discardChannel(ConnectorConfiguration.CONNECTOR_BINDING_SELECTOR_DISCARD_CHANNEL).throwExceptionOnRejection(false);
                            }
                        })).filter(connectorType2, filter -> filter.discardChannel(ConnectorConfiguration.CONNECTOR_BINDING_SELECTOR_DISCARD_CHANNEL).throwExceptionOnRejection(false))).handle(Message.class, handler)).log(LoggingHandler.Level.DEBUG, beanName + ".integrationResult")).bridge()).get();
                        String inputChannel = connectorBinding.input();
                        StandardIntegrationFlow inputChannelFlow = ((IntegrationFlowBuilder)IntegrationFlow.from((String)inputChannel).gateway((IntegrationFlow)connectorFlow, spec -> spec.replyTimeout(Long.valueOf(0L)))).get();
                        integrationFlowContext.registration((IntegrationFlow)inputChannelFlow).register();
                    });
                }
                return bean;
            }
        };
    }

    private void handleRetryDiscardFlow(IntegrationFlowDefinition<?> flow, int maxRetry, long retryDelay) {
        flow.handle((payload, headers) -> {
            Message<?> newMessage = this.handleMessagingExceptionIfPossible(payload, headers).orElse(this.buildNewMessage(headers, payload));
            Object destination = headers.get((Object)"spring.cloud.function.destination");
            if (destination != null) {
                int retryCount = ConnectorConfiguration.getRetryCount(headers);
                if (retryCount < maxRetry - 1) {
                    ConnectorConfiguration.safeSleep(retryDelay);
                    this.getStreamBridge().send((String)destination, newMessage);
                } else {
                    LOGGER.error("Cannot retry message because retry limited exceeded: {}", (Object)maxRetry);
                }
            } else {
                LOGGER.error("Cannot retry message because destination from headers is null: {}", (Object)headers);
            }
            return null;
        });
    }

    private static void safeSleep(long retryDelay) {
        try {
            TimeUnit.SECONDS.sleep(retryDelay);
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private Optional<Message<?>> handleMessagingExceptionIfPossible(Object payload, MessageHeaders headers) {
        MessagingException messagingException;
        Message failedMessage;
        if (payload instanceof MessagingException && (failedMessage = (messagingException = (MessagingException)payload).getFailedMessage()) instanceof Message) {
            Message originalMessage = failedMessage;
            LOGGER.debug("Handling failed message for {}", payload);
            return Optional.of(this.buildNewMessage(headers, originalMessage.getPayload()));
        }
        LOGGER.debug("Handled message exception for {}", payload);
        return Optional.empty();
    }

    private Message<?> buildNewMessage(MessageHeaders headers, Object payload) {
        int retryCount = this.handleRetryCount(headers);
        Message message = MessageBuilder.withPayload((Object)payload).copyHeaders((Map)headers).setHeader(RETRY_COUNT, (Object)retryCount).build();
        LOGGER.info("New message for retry #{}: {}", (Object)retryCount, (Object)message);
        return message;
    }

    private int handleRetryCount(MessageHeaders headers) {
        int retryCount = ConnectorConfiguration.getRetryCount(headers);
        return ++retryCount;
    }

    private static int getRetryCount(MessageHeaders headers) {
        int n;
        Object object = headers.getOrDefault((Object)RETRY_COUNT, (Object)0);
        if (object instanceof Integer) {
            Integer count = (Integer)object;
            n = count;
        } else {
            n = 0;
        }
        return n;
    }
}

