package org.alfresco.repo.event2;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.alfresco.repo.event.v1.model.RepoEvent;
import org.alfresco.util.PropertyCheck;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.InitializingBean;

/* loaded from: input_file:org/alfresco/repo/event2/EventGeneratorQueue.class */
public class EventGeneratorQueue implements InitializingBean {
    protected static final Log LOGGER = LogFactory.getLog(EventGeneratorQueue.class);
    protected Executor enqueueThreadPoolExecutor;
    protected Executor dequeueThreadPoolExecutor;
    protected Event2MessageProducer event2MessageProducer;
    protected BlockingQueue<EventInMaking> queue = new LinkedBlockingQueue();
    protected Runnable listener = createListener();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/alfresco/repo/event2/EventGeneratorQueue$EventInMaking.class */
    public static class EventInMaking {
        private Callable<RepoEvent<?>> maker;
        private volatile RepoEvent<?> event;
        private CountDownLatch latch = new CountDownLatch(1);

        public EventInMaking(Callable<RepoEvent<?>> callable) {
            this.maker = callable;
        }

        public void make() throws Exception {
            try {
                this.event = this.maker.call();
            } finally {
                this.latch.countDown();
            }
        }

        public RepoEvent<?> getEventWhenReady() throws InterruptedException {
            this.latch.await(30L, TimeUnit.SECONDS);
            return this.event;
        }

        public String toString() {
            return this.maker.toString();
        }
    }

    public void afterPropertiesSet() throws Exception {
        PropertyCheck.mandatory(this, "enqueueThreadPoolExecutor", this.enqueueThreadPoolExecutor);
        PropertyCheck.mandatory(this, "dequeueThreadPoolExecutor", this.dequeueThreadPoolExecutor);
        PropertyCheck.mandatory(this, "event2MessageProducer", this.event2MessageProducer);
    }

    public void setEvent2MessageProducer(Event2MessageProducer event2MessageProducer) {
        this.event2MessageProducer = event2MessageProducer;
    }

    public void setEnqueueThreadPoolExecutor(Executor executor) {
        this.enqueueThreadPoolExecutor = executor;
    }

    public void setDequeueThreadPoolExecutor(Executor executor) {
        this.dequeueThreadPoolExecutor = executor;
        executor.execute(this.listener);
    }

    public void accept(Callable<RepoEvent<?>> callable) {
        EventInMaking eventInMaking = new EventInMaking(callable);
        this.queue.offer(eventInMaking);
        this.enqueueThreadPoolExecutor.execute(() -> {
            try {
                eventInMaking.make();
            } catch (Exception e) {
                LOGGER.error("Unexpected error while enqueuing maker function for repository event" + e);
            }
        });
    }

    private Runnable createListener() {
        return new Runnable() { // from class: org.alfresco.repo.event2.EventGeneratorQueue.1
            @Override // java.lang.Runnable
            public void run() {
                while (!Thread.interrupted()) {
                    try {
                        try {
                            RepoEvent<?> eventWhenReady = EventGeneratorQueue.this.queue.take().getEventWhenReady();
                            if (eventWhenReady != null) {
                                EventGeneratorQueue.this.event2MessageProducer.send(eventWhenReady);
                            }
                        } catch (Exception e) {
                            EventGeneratorQueue.LOGGER.error("Unexpected error while dequeuing and sending repository event" + e);
                        }
                    } finally {
                        EventGeneratorQueue.LOGGER.warn("Unexpected: rescheduling the listener thread.");
                        EventGeneratorQueue.this.dequeueThreadPoolExecutor.execute(EventGeneratorQueue.this.listener);
                    }
                }
            }
        };
    }
}
