package org.activiti.cloud.services.notifications.graphql.events.consumer;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.activiti.cloud.services.notifications.graphql.events.RoutingKeyResolver;
import org.activiti.cloud.services.notifications.graphql.events.SpELTemplateRoutingKeyResolver;
import org.activiti.cloud.services.notifications.graphql.events.model.EngineEvent;
import org.activiti.cloud.services.notifications.graphql.events.transformer.EngineEventsTransformer;
import org.activiti.cloud.services.notifications.graphql.events.transformer.Transformer;
import org.reactivestreams.Subscriber;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.context.SmartLifecycle;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.context.annotation.PropertySources;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.TopicProcessor;

@EnableConfigurationProperties({EngineEventsConsumerProperties.class})
@Configuration
@PropertySources({@PropertySource({"classpath:META-INF/graphql-events.properties"}), @PropertySource(value = {"classpath:graphql-events.properties"}, ignoreResourceNotFound = true)})
@ConditionalOnProperty(name = {"spring.activiti.cloud.services.notifications.graphql.events.enabled"}, matchIfMissing = true)
@EnableBinding({EngineEventsConsumerChannels.class})
/* loaded from: input_file:BOOT-INF/lib/activiti-cloud-services-notifications-graphql-events-7.1.434.jar:org/activiti/cloud/services/notifications/graphql/events/consumer/EngineEventsConsumerAutoConfiguration.class */
public class EngineEventsConsumerAutoConfiguration {

    @Configuration
    /* loaded from: input_file:BOOT-INF/lib/activiti-cloud-services-notifications-graphql-events-7.1.434.jar:org/activiti/cloud/services/notifications/graphql/events/consumer/EngineEventsConsumerAutoConfiguration$DefaultEngineEventsConsumerConfiguration.class */
    public static class DefaultEngineEventsConsumerConfiguration {
        private final EngineEventsConsumerProperties properties;

        @Autowired
        public DefaultEngineEventsConsumerConfiguration(EngineEventsConsumerProperties engineEventsConsumerProperties) {
            this.properties = engineEventsConsumerProperties;
        }

        @ConditionalOnMissingBean
        @Bean
        public RoutingKeyResolver routingKeyResolver() {
            return new SpELTemplateRoutingKeyResolver();
        }

        @ConditionalOnMissingBean
        @Bean
        public Transformer engineEventsTransformer() {
            return new EngineEventsTransformer(Arrays.asList(this.properties.getProcessEngineEventAttributeKeys().split(",")), this.properties.getProcessEngineEventTypeKey());
        }

        @ConditionalOnMissingBean
        @Bean
        public EngineEventsConsumerMessageHandler engineEventsMessageHandler(Transformer transformer, FluxSink<Message<List<EngineEvent>>> fluxSink) {
            return new EngineEventsConsumerMessageHandler(transformer, fluxSink);
        }
    }

    @Configuration
    /* loaded from: input_file:BOOT-INF/lib/activiti-cloud-services-notifications-graphql-events-7.1.434.jar:org/activiti/cloud/services/notifications/graphql/events/consumer/EngineEventsConsumerAutoConfiguration$EngineEventsFluxProcessorConfiguration.class */
    public static class EngineEventsFluxProcessorConfiguration implements SmartLifecycle {
        private boolean running;
        private final List<Subscriber<Message<List<EngineEvent>>>> subscribers = new ArrayList();
        private TopicProcessor<Message<List<EngineEvent>>> engineEventsProcessor = TopicProcessor.builder().autoCancel(false).share(true).bufferSize(1024).build();

        @Autowired
        public EngineEventsFluxProcessorConfiguration() {
        }

        @Autowired(required = false)
        public void setSubscribers(List<Subscriber<Message<List<EngineEvent>>>> list) {
            this.subscribers.addAll(list);
        }

        @ConditionalOnMissingBean
        @Bean
        public Flux<Message<List<EngineEvent>>> engineEventsFlux() {
            return this.engineEventsProcessor.publish().autoConnect(0);
        }

        @ConditionalOnMissingBean
        @Bean
        public FluxSink<Message<List<EngineEvent>>> engineEventsSink() {
            return this.engineEventsProcessor.sink();
        }

        @Override // org.springframework.context.Lifecycle
        public void start() {
            this.subscribers.forEach(subscriber -> {
                this.engineEventsProcessor.subscribe(subscriber);
            });
            this.running = true;
        }

        @Override // org.springframework.context.Lifecycle
        public void stop() {
            try {
                this.engineEventsProcessor.onComplete();
            } finally {
                this.running = false;
            }
        }

        @Override // org.springframework.context.Lifecycle
        public boolean isRunning() {
            return this.running;
        }
    }
}
