package org.springframework.cloud.stream.reactive;

import java.io.Closeable;
import java.io.IOException;
import java.lang.annotation.Annotation;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.BeanInitializationException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.cloud.stream.binding.MessageChannelStreamListenerResultAdapter;
import org.springframework.cloud.stream.binding.StreamAnnotationCommonMethodUtils;
import org.springframework.cloud.stream.binding.StreamListenerParameterAdapter;
import org.springframework.cloud.stream.binding.StreamListenerResultAdapter;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.SmartLifecycle;
import org.springframework.core.MethodParameter;
import org.springframework.core.annotation.AnnotatedElementUtils;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.core.annotation.SynthesizingMethodParameter;
import org.springframework.util.Assert;
import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
import org.springframework.util.ReflectionUtils;
import org.springframework.util.StringUtils;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-reactive-2.1.4.RELEASE.jar:org/springframework/cloud/stream/reactive/StreamEmitterAnnotationBeanPostProcessor.class */
public class StreamEmitterAnnotationBeanPostProcessor implements BeanPostProcessor, SmartInitializingSingleton, ApplicationContextAware, SmartLifecycle {
    private static final Log log = LogFactory.getLog(StreamEmitterAnnotationBeanPostProcessor.class);
    private Collection<StreamListenerParameterAdapter> parameterAdapters;
    private Collection<StreamListenerResultAdapter> resultAdapters;
    private ConfigurableApplicationContext applicationContext;
    private volatile boolean running;
    private final List<Closeable> closeableFluxResources = new ArrayList();
    private final Lock lock = new ReentrantLock();
    private MultiValueMap<Object, Method> mappedStreamEmitterMethods = new LinkedMultiValueMap();

    private static void validateStreamEmitterMethod(Method method, int i, String str) {
        if (StringUtils.hasText(str)) {
            Assert.isTrue(i == 0, "@Output annotations are not permitted on method parameters while using the @StreamEmitter and a method-level output specification");
        } else {
            Assert.isTrue(i > 0, "No method level or parameter level @Output annotations are detected. @StreamEmitter requires a method or parameter level @Output annotation.");
        }
        if (!method.getReturnType().equals(Void.TYPE)) {
            Assert.isTrue(StringUtils.hasText(str), "A method annotated with @StreamEmitter having a return type should also have an outbound target specified at the method level.");
            Assert.isTrue(method.getParameterCount() == 0, "A method annotated with @StreamEmitter having a return type should not have any method arguments");
        } else {
            if (StringUtils.hasText(str)) {
                return;
            }
            int length = method.getParameterTypes().length;
            for (int i2 = 0; i2 < length; i2++) {
                MethodParameter methodParameter = new MethodParameter(method, i2);
                if (!methodParameter.hasParameterAnnotation(Output.class)) {
                    throw new IllegalArgumentException("A method annotated with @StreamEmitter and void return type without method level @Output annotation requires @Output on each of the method parameter");
                }
                Assert.isTrue(StringUtils.hasText((String) AnnotationUtils.getValue(methodParameter.getParameterAnnotation(Output.class))), "The @Output annotation must have the name of an input as value");
            }
        }
    }

    @Override // org.springframework.context.ApplicationContextAware
    public final void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        Assert.isTrue(applicationContext instanceof ConfigurableApplicationContext, "ConfigurableApplicationContext is required");
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }

    @Override // org.springframework.beans.factory.SmartInitializingSingleton
    public void afterSingletonsInstantiated() {
        this.parameterAdapters = this.applicationContext.getBeansOfType(StreamListenerParameterAdapter.class).values();
        this.resultAdapters = new ArrayList(this.applicationContext.getBeansOfType(StreamListenerResultAdapter.class).values());
        this.resultAdapters.add(new MessageChannelStreamListenerResultAdapter());
    }

    @Override // org.springframework.beans.factory.config.BeanPostProcessor
    public Object postProcessAfterInitialization(Object obj, String str) throws BeansException {
        ReflectionUtils.doWithMethods(AopUtils.getTargetClass(obj), method -> {
            if (AnnotatedElementUtils.isAnnotated(method, (Class<? extends Annotation>) StreamEmitter.class)) {
                this.mappedStreamEmitterMethods.add(obj, method);
            }
        }, ReflectionUtils.USER_DECLARED_METHODS);
        return obj;
    }

    @Override // org.springframework.context.Lifecycle
    public void start() {
        try {
            this.lock.lock();
            if (!this.running) {
                this.mappedStreamEmitterMethods.forEach((obj, list) -> {
                    list.forEach(method -> {
                        Assert.isTrue(method.getAnnotation(Input.class) == null, "A method annotated with @StreamEmitter cannot contain @Input annotations");
                        String outboundBindingTargetName = StreamAnnotationCommonMethodUtils.getOutboundBindingTargetName(method);
                        validateStreamEmitterMethod(method, StreamAnnotationCommonMethodUtils.outputAnnotationCount(method), outboundBindingTargetName);
                        invokeSetupMethodOnToTargetChannel(method, obj, outboundBindingTargetName);
                    });
                });
                this.running = true;
            }
        } finally {
            this.lock.unlock();
        }
    }

    private void invokeSetupMethodOnToTargetChannel(Method method, Object obj, String str) {
        Object[] objArr = new Object[method.getParameterCount()];
        Object obj2 = null;
        for (int i = 0; i < objArr.length; i++) {
            SynthesizingMethodParameter synthesizingMethodParameter = new SynthesizingMethodParameter(method, i);
            Class<?> parameterType = synthesizingMethodParameter.getParameterType();
            Object obj3 = null;
            if (synthesizingMethodParameter.hasParameterAnnotation(Output.class)) {
                obj3 = AnnotationUtils.getValue(synthesizingMethodParameter.getParameterAnnotation(Output.class));
            } else if (objArr.length == 1 && StringUtils.hasText(str)) {
                obj3 = str;
            }
            if (obj3 == null) {
                throw new IllegalStateException("At least one output must be specified");
            }
            obj2 = this.applicationContext.getBean((String) obj3);
            Iterator<StreamListenerParameterAdapter> it = this.parameterAdapters.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                StreamListenerParameterAdapter next = it.next();
                if (next.supports(obj2.getClass(), synthesizingMethodParameter)) {
                    objArr[i] = next.adapt(obj2, synthesizingMethodParameter);
                    if (objArr[i] instanceof FluxSender) {
                        this.closeableFluxResources.add((FluxSender) objArr[i]);
                    }
                }
            }
            Assert.notNull(objArr[i], "Cannot convert argument " + i + " of " + method + "from " + obj2.getClass() + " to " + parameterType);
        }
        try {
            Object invoke = method.invoke(obj, objArr);
            if (Void.TYPE.equals(method.getReturnType())) {
                return;
            }
            if (obj2 == null) {
                obj2 = this.applicationContext.getBean(str);
            }
            boolean z = false;
            Iterator<StreamListenerResultAdapter> it2 = this.resultAdapters.iterator();
            while (true) {
                if (!it2.hasNext()) {
                    break;
                }
                StreamListenerResultAdapter next2 = it2.next();
                if (next2.supports(invoke.getClass(), obj2.getClass())) {
                    this.closeableFluxResources.add(next2.adapt(invoke, obj2));
                    z = true;
                    break;
                }
            }
            Assert.state(z, "No suitable adapters are found that can convert the return type");
        } catch (Exception e) {
            throw new BeanInitializationException("Cannot setup StreamEmitter for " + method, e);
        }
    }

    @Override // org.springframework.context.SmartLifecycle
    public boolean isAutoStartup() {
        return true;
    }

    @Override // org.springframework.context.SmartLifecycle
    public void stop(Runnable runnable) {
        stop();
        if (runnable != null) {
            runnable.run();
        }
    }

    @Override // org.springframework.context.Lifecycle
    public void stop() {
        try {
            this.lock.lock();
            if (this.running) {
                Iterator<Closeable> it = this.closeableFluxResources.iterator();
                while (it.hasNext()) {
                    try {
                        it.next().close();
                    } catch (IOException e) {
                        log.error("Error closing reactive source", e);
                    }
                }
                this.running = false;
            }
        } finally {
            this.lock.unlock();
        }
    }

    @Override // org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.running;
    }

    @Override // org.springframework.context.SmartLifecycle, org.springframework.context.Phased
    public int getPhase() {
        return 0;
    }
}
