package org.apache.camel.processor.loadbalancer;

import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.spi.ReactiveExecutor;

/* loaded from: input_file:BOOT-INF/lib/camel-core-processor-4.4.1.jar:org/apache/camel/processor/loadbalancer/TopicLoadBalancer.class */
public class TopicLoadBalancer extends LoadBalancerSupport {

    /* loaded from: input_file:BOOT-INF/lib/camel-core-processor-4.4.1.jar:org/apache/camel/processor/loadbalancer/TopicLoadBalancer$State.class */
    protected class State {
        final Exchange exchange;
        final AsyncCallback callback;
        final AsyncProcessor[] processors;
        int index;

        public State(Exchange exchange, AsyncCallback asyncCallback, AsyncProcessor[] asyncProcessorArr) {
            this.exchange = exchange;
            this.callback = asyncCallback;
            this.processors = asyncProcessorArr;
        }

        public void run() {
            if (this.index >= this.processors.length) {
                this.callback.done(false);
                return;
            }
            AsyncProcessor[] asyncProcessorArr = this.processors;
            int i = this.index;
            this.index = i + 1;
            AsyncProcessor asyncProcessor = asyncProcessorArr[i];
            Exchange copyExchangeStrategy = TopicLoadBalancer.this.copyExchangeStrategy(asyncProcessor, this.exchange);
            asyncProcessor.process(copyExchangeStrategy, z -> {
                done(copyExchangeStrategy);
            });
        }

        public void done(Exchange exchange) {
            if (exchange.getException() == null) {
                this.exchange.getContext().getCamelContextExtension().getReactiveExecutor().schedule(this::run);
            } else {
                this.exchange.setException(exchange.getException());
                this.callback.done(false);
            }
        }
    }

    @Override // org.apache.camel.AsyncProcessor
    public boolean process(Exchange exchange, AsyncCallback asyncCallback) {
        AsyncProcessor[] doGetProcessors = doGetProcessors();
        ReactiveExecutor reactiveExecutor = exchange.getContext().getCamelContextExtension().getReactiveExecutor();
        State state = new State(exchange, asyncCallback, doGetProcessors);
        reactiveExecutor.schedule(state::run);
        return false;
    }

    protected Exchange copyExchangeStrategy(Processor processor, Exchange exchange) {
        return exchange.copy();
    }
}
