package org.gytheio.messaging.camel;

import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.gytheio.content.ContentIOException;
import org.gytheio.messaging.Reply;
import org.gytheio.messaging.Request;
import org.gytheio.messaging.RequestReplyMessageProducer;

/* loaded from: input_file:org/gytheio/messaging/camel/CamelRequestReplyMessageProducer.class */
public class CamelRequestReplyMessageProducer<RQ extends Request<RP>, RP extends Reply> extends CamelMessageProducer implements RequestReplyMessageProducer<RQ, RP> {
    private static final Log logger = LogFactory.getLog(CamelRequestReplyMessageProducer.class);
    private static final long DEFAULT_PENDING_REQUEST_POLLING_INTERVAL_MS = 500;
    private static final long DEFAULT_PENDING_REQUEST_TIMEOUT_MS = 20000;
    protected Map<String, RP> pendingRequests = new HashMap();
    protected long pollingIntervalMs = DEFAULT_PENDING_REQUEST_POLLING_INTERVAL_MS;
    protected long timeoutMs = DEFAULT_PENDING_REQUEST_TIMEOUT_MS;
    protected ExecutorService executorService;

    /* loaded from: input_file:org/gytheio/messaging/camel/CamelRequestReplyMessageProducer$ReplyCallable.class */
    public class ReplyCallable implements Callable<RP> {
        private String requestId;

        public ReplyCallable(String str) {
            this.requestId = str;
        }

        @Override // java.util.concurrent.Callable
        public RP call() throws Exception {
            RP rp;
            long time = new Date().getTime();
            do {
                try {
                    if (new Date().getTime() - time > CamelRequestReplyMessageProducer.this.timeoutMs && CamelRequestReplyMessageProducer.this.timeoutMs != -1) {
                        throw new ContentIOException("Timeout while waiting for reply");
                    }
                    if (CamelRequestReplyMessageProducer.logger.isDebugEnabled()) {
                        CamelRequestReplyMessageProducer.logger.debug("Polling for pending request " + this.requestId + " completion in " + CamelRequestReplyMessageProducer.this.pollingIntervalMs + "ms...");
                    }
                    Thread.sleep(CamelRequestReplyMessageProducer.this.pollingIntervalMs);
                    rp = CamelRequestReplyMessageProducer.this.pendingRequests.get(this.requestId);
                } catch (InterruptedException e) {
                    return null;
                }
            } while (rp == null);
            CamelRequestReplyMessageProducer.this.pendingRequests.remove(this.requestId);
            return rp;
        }
    }

    public void setPollingIntervalMs(long j) {
        this.pollingIntervalMs = j;
    }

    public void setTimeoutMs(long j) {
        this.timeoutMs = j;
    }

    public void setExecutorService(ExecutorService executorService) {
        this.executorService = executorService;
    }

    public void init() {
        if (this.executorService == null) {
            this.executorService = Executors.newCachedThreadPool();
        }
    }

    public Future<RP> asyncRequest(RQ rq) {
        send(rq);
        this.pendingRequests.put(rq.getRequestId(), null);
        FutureTask futureTask = new FutureTask(new ReplyCallable(rq.getRequestId()));
        this.executorService.execute(futureTask);
        return futureTask;
    }

    public void onReceive(Object obj) {
        Reply reply = (Reply) obj;
        if (logger.isDebugEnabled()) {
            logger.debug("Received reply for request " + reply.getRequestId());
        }
        if (this.pendingRequests.containsKey(reply.getRequestId())) {
            this.pendingRequests.put(reply.getRequestId(), reply);
        } else {
            logger.error("Unknown pending request: " + reply.getRequestId());
        }
    }
}
