package org.springframework.integration.channel;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-5.0.0.M6.jar:org/springframework/integration/channel/FluxMessageChannel.class */
public class FluxMessageChannel extends AbstractMessageChannel implements Publisher<Message<?>>, ReactiveStreamsSubscribableChannel {
    private final List<Subscriber<? super Message<?>>> subscribers = new ArrayList();
    private final Map<Publisher<Message<?>>, ConnectableFlux<Message<?>>> publishers = new ConcurrentHashMap();
    private final Flux<Message<?>> flux = Flux.create(fluxSink -> {
        this.sink = fluxSink;
    }, FluxSink.OverflowStrategy.IGNORE).publish().autoConnect();
    private FluxSink<Message<?>> sink;

    @Override // org.springframework.integration.channel.AbstractMessageChannel
    protected boolean doSend(Message<?> message, long j) {
        Assert.state(this.subscribers.size() > 0, (Supplier<String>) () -> {
            return "The [" + this + "] doesn't have subscribers to accept messages";
        });
        this.sink.next(message);
        return true;
    }

    @Override // org.reactivestreams.Publisher
    public void subscribe(Subscriber<? super Message<?>> subscriber) {
        this.subscribers.add(subscriber);
        this.flux.doOnCancel(() -> {
            this.subscribers.remove(subscriber);
        }).retry().subscribe(subscriber);
        this.publishers.values().forEach((v0) -> {
            v0.connect();
        });
    }

    @Override // org.springframework.integration.channel.ReactiveStreamsSubscribableChannel
    public void subscribeTo(Publisher<Message<?>> publisher) {
        ConnectableFlux<Message<?>> publish = Flux.from(publisher).doOnComplete(() -> {
            this.publishers.remove(publisher);
        }).doOnNext(this::send).publish();
        this.publishers.put(publisher, publish);
        if (this.subscribers.isEmpty()) {
            return;
        }
        publish.connect();
    }
}
