package org.activiti.cloud.query.controller;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import javax.transaction.Transactional;
import org.activiti.cloud.query.QueryApplication;
import org.activiti.cloud.query.configuration.QueryConfiguration;
import org.activiti.cloud.query.model.Tweet;
import org.activiti.cloud.query.repository.ExtendedProcessInstanceRepository;
import org.activiti.cloud.query.repository.ExtendedVariableRepository;
import org.activiti.cloud.services.query.model.ProcessVariableEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.context.config.annotation.RefreshScope;
import org.springframework.data.domain.Pageable;
import org.springframework.http.MediaType;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;

@RefreshScope
@RestController
/* loaded from: input_file:BOOT-INF/classes/org/activiti/cloud/query/controller/ReactiveProcessedFeedController.class */
public class ReactiveProcessedFeedController {

    @Autowired
    private ExtendedProcessInstanceRepository repository;

    @Autowired
    private ExtendedVariableRepository variableRepository;
    private Logger logger = LoggerFactory.getLogger((Class<?>) QueryApplication.class);
    private Map<String, List<Tweet>> cacheProcessedTweetsForFlux = new HashMap();
    private Map<String, List<Tweet>> cacheDiscardedTweetsForFlux = new HashMap();

    @Autowired
    private QueryConfiguration queryConfiguration;

    @RequestMapping(path = {"/reactive/processed/{campaign}"}, produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public Flux<Tweet> getProcessedTweets(@PathVariable("campaign") String str, Pageable pageable) {
        if (this.cacheProcessedTweetsForFlux.get(str) == null) {
            this.cacheProcessedTweetsForFlux.put(str, new CopyOnWriteArrayList());
        }
        return Flux.interval(Duration.ofSeconds(5L)).flatMapIterable(l -> {
            return this.cacheProcessedTweetsForFlux.get(str);
        });
    }

    @RequestMapping(path = {"/reactive/discarded/{campaign}"}, produces = {MediaType.TEXT_EVENT_STREAM_VALUE})
    public Flux<Tweet> getDiscardedTweets(@PathVariable("campaign") String str, Pageable pageable) {
        if (this.cacheDiscardedTweetsForFlux.get(str) == null) {
            this.cacheDiscardedTweetsForFlux.put(str, new CopyOnWriteArrayList());
        }
        return Flux.interval(Duration.ofSeconds(5L)).flatMapIterable(l -> {
            return this.cacheDiscardedTweetsForFlux.get(str);
        });
    }

    @Scheduled(fixedRateString = "${query.refresh}")
    @Transactional
    public void refreshCampaignFeed() {
        for (String str : this.cacheProcessedTweetsForFlux.keySet()) {
            List<ProcessVariableEntity> findAllCompletedAndMatchedSince = this.variableRepository.findAllCompletedAndMatchedSince(str, new Date(System.currentTimeMillis() - this.queryConfiguration.getRefresh()));
            ArrayList arrayList = new ArrayList();
            for (ProcessVariableEntity processVariableEntity : findAllCompletedAndMatchedSince) {
                if (processVariableEntity.getName().equalsIgnoreCase("matched") && processVariableEntity.getType().equalsIgnoreCase("string") && processVariableEntity.getValue() != null && (processVariableEntity.getValue() instanceof String) && ((String) processVariableEntity.getValue()).equalsIgnoreCase("true")) {
                    arrayList.add(processVariableEntity.getProcessInstance());
                }
            }
            this.cacheProcessedTweetsForFlux.get(str).addAll(ControllersUtil.createTweetsFromProcessInstances(new ArrayList(arrayList)));
            this.cacheDiscardedTweetsForFlux.get(str).addAll(ControllersUtil.createTweetsFromProcessInstances(this.repository.findAllCompletedAndDiscardedSince(str, new Date(System.currentTimeMillis() - this.queryConfiguration.getRefresh()))));
        }
    }
}
