package org.activiti.cloud.services.notifications.graphql.subscriptions.datafetcher;

import graphql.schema.DataFetchingEnvironment;
import java.util.List;
import java.util.function.Predicate;
import java.util.logging.Level;
import org.activiti.cloud.services.notifications.graphql.events.model.EngineEvent;
import org.springframework.messaging.Message;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SignalType;
import reactor.util.Logger;
import reactor.util.Loggers;

/* loaded from: input_file:BOOT-INF/lib/activiti-cloud-services-notifications-graphql-subscriptions-7.1.414.jar:org/activiti/cloud/services/notifications/graphql/subscriptions/datafetcher/EngineEventsFluxPublisherFactory.class */
public class EngineEventsFluxPublisherFactory implements EngineEventsPublisherFactory {
    private static Logger logger = Loggers.getLogger((Class<?>) EngineEventsFluxPublisherFactory.class);
    private final Flux<Message<List<EngineEvent>>> engineEventsFlux;
    private final EngineEventsPredicateFactory predicateFactory;

    public EngineEventsFluxPublisherFactory(Flux<Message<List<EngineEvent>>> flux, EngineEventsPredicateFactory engineEventsPredicateFactory) {
        this.engineEventsFlux = flux;
        this.predicateFactory = engineEventsPredicateFactory;
    }

    @Override // org.activiti.cloud.services.notifications.graphql.subscriptions.datafetcher.EngineEventsPublisherFactory
    public Flux<List<EngineEvent>> getPublisher(DataFetchingEnvironment dataFetchingEnvironment) {
        Predicate<? super EngineEvent> predicate = this.predicateFactory.getPredicate(dataFetchingEnvironment);
        return Flux.from(this.engineEventsFlux.log(logger, Level.CONFIG, true, new SignalType[0]).flatMapSequential(message -> {
            return Flux.fromIterable((Iterable) message.getPayload()).filter(predicate).collectList().filter(list -> {
                return !list.isEmpty();
            });
        }));
    }
}
