package org.springframework.cloud.stream.function;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.cloud.function.context.FunctionType;
import org.springframework.cloud.function.context.catalog.FunctionInspector;
import org.springframework.cloud.stream.binder.ConsumerProperties;
import org.springframework.cloud.stream.binder.ProducerProperties;
import org.springframework.cloud.stream.config.BindingServiceProperties;
import org.springframework.cloud.stream.converter.CompositeMessageConverterFactory;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.converter.CompositeMessageConverter;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;
import org.springframework.util.MimeType;
import org.springframework.util.ReflectionUtils;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:BOOT-INF/lib/spring-cloud-stream-2.1.4.RELEASE.jar:org/springframework/cloud/stream/function/FunctionInvoker.class */
class FunctionInvoker<I, O> implements Function<Flux<Message<I>>, Flux<Message<O>>> {
    private static final Log logger = LogFactory.getLog((Class<?>) FunctionInvoker.class);
    private static final Field MESSAGE_HEADERS_FIELD = ReflectionUtils.findField(MessageHeaders.class, "headers");
    private final Class<?> inputClass;
    private final Class<?> outputClass;
    private final Function<Flux<?>, Flux<?>> userFunction;
    private final CompositeMessageConverter messageConverter;
    private final MessageChannel errorChannel;
    private final boolean isInputArgumentMessage;
    private final ConsumerProperties consumerProperties;
    private final ProducerProperties producerProperties;
    private final BindingServiceProperties bindingServiceProperties;
    private final StreamFunctionProperties functionProperties;

    FunctionInvoker(StreamFunctionProperties streamFunctionProperties, FunctionCatalogWrapper functionCatalogWrapper, FunctionInspector functionInspector, CompositeMessageConverterFactory compositeMessageConverterFactory) {
        this(streamFunctionProperties, functionCatalogWrapper, functionInspector, compositeMessageConverterFactory, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public FunctionInvoker(StreamFunctionProperties streamFunctionProperties, FunctionCatalogWrapper functionCatalogWrapper, FunctionInspector functionInspector, CompositeMessageConverterFactory compositeMessageConverterFactory, MessageChannel messageChannel) {
        this.functionProperties = streamFunctionProperties;
        Object lookup = functionCatalogWrapper.lookup(streamFunctionProperties.getDefinition());
        this.userFunction = lookup instanceof Consumer ? new FluxedConsumerWrapper<>((Consumer) lookup) : (Function) lookup;
        Assert.isInstanceOf(Function.class, this.userFunction);
        this.messageConverter = compositeMessageConverterFactory.getMessageConverterForAllRegistered();
        FunctionType type = functionInspector.getRegistration(lookup).getType();
        this.isInputArgumentMessage = type.isMessage();
        this.inputClass = type.getInputType();
        this.outputClass = type.getOutputType();
        this.errorChannel = messageChannel;
        this.bindingServiceProperties = streamFunctionProperties.getBindingServiceProperties();
        this.consumerProperties = this.bindingServiceProperties.getConsumerProperties(streamFunctionProperties.getInputDestinationName());
        this.producerProperties = this.bindingServiceProperties.getProducerProperties(streamFunctionProperties.getOutputDestinationName());
    }

    @Override // java.util.function.Function
    public Flux<Message<O>> apply(Flux<Message<I>> flux) {
        AtomicReference atomicReference = new AtomicReference();
        return flux.concatMap(message -> {
            Flux just = Flux.just(message);
            atomicReference.getClass();
            Flux map = just.doOnNext((v1) -> {
                r1.set(v1);
            }).map(this::resolveArgument);
            Function<Flux<?>, Flux<?>> function = this.userFunction;
            function.getClass();
            return map.transform((v1) -> {
                return r1.apply(v1);
            }).retryBackoff(this.consumerProperties.getMaxAttempts(), Duration.ofMillis(this.consumerProperties.getBackOffInitialInterval()), Duration.ofMillis(this.consumerProperties.getBackOffMaxInterval())).onErrorResume(th -> {
                onError(th, (Message) atomicReference.get());
                return Mono.empty();
            });
        }).log().map(obj -> {
            return toMessage(obj, (Message) atomicReference.get());
        });
    }

    private void onError(Throwable th, Message<I> message) {
        if (this.errorChannel == null) {
            logger.error(th);
            return;
        }
        ErrorMessage errorMessage = new ErrorMessage(th, (Message<?>) message);
        logger.error(errorMessage);
        this.errorChannel.send(errorMessage);
    }

    private <T> Message<O> toMessage(T t, Message<I> message) {
        Message<O> message2;
        if (logger.isDebugEnabled()) {
            logger.debug("Converting result back to message using the original message: " + message);
        }
        if (this.producerProperties.isUseNativeEncoding()) {
            if (logger.isDebugEnabled()) {
                logger.debug("Native encoding enabled wrapping result to message using the original message: " + message);
            }
            message2 = wrapOutputToMessage(t, message);
        } else {
            message2 = (Message) (t instanceof Message ? t : this.messageConverter.toMessage(t, message.getHeaders()));
            if (message2 == null && t.getClass().isAssignableFrom(this.outputClass)) {
                message2 = wrapOutputToMessage(t, message);
            } else if (this.bindingServiceProperties != null && this.bindingServiceProperties.getBindingProperties(this.functionProperties.getOutputDestinationName()) != null && !message2.getHeaders().containsKey("contentType")) {
                ((Map) ReflectionUtils.getField(MESSAGE_HEADERS_FIELD, message2.getHeaders())).put("contentType", MimeType.valueOf(this.bindingServiceProperties.getBindingProperties("output").getContentType()));
            }
            Assert.notNull(message2, "Failed to convert result value '" + t + "' to message.");
        }
        return message2;
    }

    private <T> Message<O> wrapOutputToMessage(T t, Message<I> message) {
        return MessageBuilder.withPayload(t).copyHeaders((Map<String, ?>) message.getHeaders()).removeHeader("contentType").build();
    }

    private <T> T resolveArgument(Message<I> message) {
        if (logger.isDebugEnabled()) {
            logger.debug("Resolving input argument from message: " + message);
        }
        Object fromMessage = shouldConvertFromMessage(message) ? this.messageConverter.fromMessage(message, this.inputClass) : message;
        Assert.notNull(fromMessage, "Failed to resolve argument type '" + this.inputClass + "' from message: " + message);
        if (this.isInputArgumentMessage && !(fromMessage instanceof Message)) {
            fromMessage = MessageBuilder.withPayload(fromMessage).copyHeaders((Map<String, ?>) message.getHeaders()).build();
        } else if (!this.isInputArgumentMessage && (fromMessage instanceof Message)) {
            fromMessage = ((Message) fromMessage).getPayload();
        }
        return (T) fromMessage;
    }

    private boolean shouldConvertFromMessage(Message<?> message) {
        return (this.inputClass.isAssignableFrom(Message.class) || this.inputClass.isAssignableFrom(message.getPayload().getClass()) || this.inputClass.isAssignableFrom(Object.class)) ? false : true;
    }

    static {
        MESSAGE_HEADERS_FIELD.setAccessible(true);
    }
}
