package org.springframework.cloud.sleuth.instrument.web.client;

import brave.Span;
import brave.Tracer;
import brave.Tracing;
import brave.http.HttpClientAdapter;
import brave.http.HttpClientHandler;
import brave.http.HttpTracing;
import brave.propagation.Propagation;
import brave.propagation.TraceContext;
import java.util.Collections;
import java.util.function.Function;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.BeanFactory;
import org.springframework.cloud.sleuth.instrument.reactor.ReactorSleuth;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.web.client.RestClientException;
import org.springframework.web.reactive.function.client.ClientRequest;
import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeFunction;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.context.Context;

/* JADX INFO: Access modifiers changed from: package-private */
/* compiled from: TraceWebClientBeanPostProcessor.java */
/* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-core-2.1.0.RELEASE.jar:org/springframework/cloud/sleuth/instrument/web/client/TraceExchangeFilterFunction.class */
public final class TraceExchangeFilterFunction implements ExchangeFilterFunction {
    private static final String CLIENT_SPAN_KEY = "sleuth.webclient.clientSpan";
    private static final String CANCELLED_SUBSCRIPTION_ERROR = "CANCELLED";
    final BeanFactory beanFactory;
    final Function<? super Publisher<DataBuffer>, ? extends Publisher<DataBuffer>> scopePassingTransformer;
    Tracer tracer;
    HttpTracing httpTracing;
    HttpClientHandler<ClientRequest, ClientResponse> handler;
    TraceContext.Injector<ClientRequest.Builder> injector;
    private static final Log log = LogFactory.getLog((Class<?>) TraceExchangeFilterFunction.class);
    static final Propagation.Setter<ClientRequest.Builder, String> SETTER = new Propagation.Setter<ClientRequest.Builder, String>() { // from class: org.springframework.cloud.sleuth.instrument.web.client.TraceExchangeFilterFunction.1
        @Override // brave.propagation.Propagation.Setter
        public void put(ClientRequest.Builder builder, String str, String str2) {
            builder.headers(httpHeaders -> {
                if (TraceExchangeFilterFunction.log.isTraceEnabled()) {
                    TraceExchangeFilterFunction.log.trace("Replacing [" + str + "] with value [" + str2 + "]");
                }
                httpHeaders.merge(str, Collections.singletonList(str2), (list, list2) -> {
                    return list2;
                });
            });
        }

        public String toString() {
            return "ClientRequest.Builder::header";
        }
    };

    /* JADX INFO: Access modifiers changed from: package-private */
    /* compiled from: TraceWebClientBeanPostProcessor.java */
    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-core-2.1.0.RELEASE.jar:org/springframework/cloud/sleuth/instrument/web/client/TraceExchangeFilterFunction$HttpAdapter.class */
    public static final class HttpAdapter extends HttpClientAdapter<ClientRequest, ClientResponse> {
        HttpAdapter() {
        }

        @Override // brave.http.HttpAdapter
        public String method(ClientRequest clientRequest) {
            return clientRequest.method().name();
        }

        @Override // brave.http.HttpAdapter
        public String url(ClientRequest clientRequest) {
            return clientRequest.url().toString();
        }

        @Override // brave.http.HttpAdapter
        public String requestHeader(ClientRequest clientRequest, String str) {
            String first = clientRequest.headers().getFirst(str);
            if (first != null) {
                return first.toString();
            }
            return null;
        }

        @Override // brave.http.HttpAdapter
        public Integer statusCode(ClientResponse clientResponse) {
            return Integer.valueOf(clientResponse.statusCode().value());
        }
    }

    /* compiled from: TraceWebClientBeanPostProcessor.java */
    /* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-core-2.1.0.RELEASE.jar:org/springframework/cloud/sleuth/instrument/web/client/TraceExchangeFilterFunction$MonoWebClientTrace.class */
    private static final class MonoWebClientTrace extends Mono<ClientResponse> {
        final ExchangeFunction next;
        final ClientRequest request;
        final Tracer tracer;
        final HttpClientHandler<ClientRequest, ClientResponse> handler;
        final TraceContext.Injector<ClientRequest.Builder> injector;
        final Tracing tracing;
        final Function<? super Publisher<DataBuffer>, ? extends Publisher<DataBuffer>> scopePassingTransformer;

        /* compiled from: TraceWebClientBeanPostProcessor.java */
        /* loaded from: input_file:BOOT-INF/lib/spring-cloud-sleuth-core-2.1.0.RELEASE.jar:org/springframework/cloud/sleuth/instrument/web/client/TraceExchangeFilterFunction$MonoWebClientTrace$WebClientTracerSubscriber.class */
        static final class WebClientTracerSubscriber implements CoreSubscriber<ClientResponse> {
            final CoreSubscriber<? super ClientResponse> actual;
            final Context context;
            final Span span;
            final Tracer.SpanInScope ws;
            final HttpClientHandler<ClientRequest, ClientResponse> handler;
            final Function<? super Publisher<DataBuffer>, ? extends Publisher<DataBuffer>> scopePassingTransformer;
            final Tracing tracing;
            boolean done;

            WebClientTracerSubscriber(CoreSubscriber<? super ClientResponse> coreSubscriber, Context context, Span span, MonoWebClientTrace monoWebClientTrace) {
                this.actual = coreSubscriber;
                this.span = span;
                this.handler = monoWebClientTrace.handler;
                this.tracing = monoWebClientTrace.tracing;
                this.scopePassingTransformer = monoWebClientTrace.scopePassingTransformer;
                if (!context.hasKey(Span.class)) {
                    context = context.put(Span.class, span);
                    if (TraceExchangeFilterFunction.log.isDebugEnabled()) {
                        TraceExchangeFilterFunction.log.debug("Reactor Context got injected with the client span " + span);
                    }
                }
                this.context = context.put(TraceExchangeFilterFunction.CLIENT_SPAN_KEY, span);
                this.ws = monoWebClientTrace.tracer.withSpanInScope(span);
            }

            @Override // reactor.core.CoreSubscriber, org.reactivestreams.Subscriber
            public void onSubscribe(final Subscription subscription) {
                this.actual.onSubscribe(new Subscription() { // from class: org.springframework.cloud.sleuth.instrument.web.client.TraceExchangeFilterFunction.MonoWebClientTrace.WebClientTracerSubscriber.1
                    @Override // org.reactivestreams.Subscription
                    public void request(long j) {
                        subscription.request(j);
                    }

                    @Override // org.reactivestreams.Subscription
                    public void cancel() {
                        WebClientTracerSubscriber.this.terminateSpanOnCancel();
                        subscription.cancel();
                    }
                });
            }

            @Override // org.reactivestreams.Subscriber
            public void onNext(ClientResponse clientResponse) {
                this.done = true;
                try {
                    this.actual.onNext(ClientResponse.from(clientResponse).body(clientResponse.bodyToFlux(DataBuffer.class).transform(this.scopePassingTransformer)).build());
                } finally {
                    terminateSpan(clientResponse, null);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onError(Throwable th) {
                try {
                    this.actual.onError(th);
                } finally {
                    terminateSpan(null, th);
                }
            }

            @Override // org.reactivestreams.Subscriber
            public void onComplete() {
                try {
                    this.actual.onComplete();
                } finally {
                    if (!this.done) {
                        terminateSpan(null, null);
                    }
                }
            }

            @Override // reactor.core.CoreSubscriber
            public Context currentContext() {
                return this.context;
            }

            void handleReceive(Span span, Tracer.SpanInScope spanInScope, ClientResponse clientResponse, Throwable th) {
                this.handler.handleReceive(clientResponse, th, span);
                spanInScope.close();
            }

            void terminateSpanOnCancel() {
                if (TraceExchangeFilterFunction.log.isDebugEnabled()) {
                    TraceExchangeFilterFunction.log.debug("Subscription was cancelled. Will close the span [" + this.span + "]");
                }
                this.span.tag("error", TraceExchangeFilterFunction.CANCELLED_SUBSCRIPTION_ERROR);
                handleReceive(this.span, this.ws, null, null);
            }

            void terminateSpan(@Nullable ClientResponse clientResponse, @Nullable Throwable th) {
                if (clientResponse == null || clientResponse.statusCode() == null) {
                    if (TraceExchangeFilterFunction.log.isDebugEnabled()) {
                        TraceExchangeFilterFunction.log.debug("No response was returned. Will close the span [" + this.span + "]");
                    }
                    handleReceive(this.span, this.ws, clientResponse, th);
                } else {
                    if (clientResponse.statusCode().is4xxClientError() || clientResponse.statusCode().is5xxServerError()) {
                        if (TraceExchangeFilterFunction.log.isDebugEnabled()) {
                            TraceExchangeFilterFunction.log.debug("Non positive status code was returned from the call. Will close the span [" + this.span + "]");
                        }
                        th = new RestClientException("Status code of the response is [" + clientResponse.statusCode().value() + "] and the reason is [" + clientResponse.statusCode().getReasonPhrase() + "]");
                    }
                    handleReceive(this.span, this.ws, clientResponse, th);
                }
            }
        }

        MonoWebClientTrace(ExchangeFunction exchangeFunction, ClientRequest clientRequest, TraceExchangeFilterFunction traceExchangeFilterFunction) {
            this.next = exchangeFunction;
            this.request = clientRequest;
            this.tracer = traceExchangeFilterFunction.tracer();
            this.handler = traceExchangeFilterFunction.handler();
            this.injector = traceExchangeFilterFunction.injector();
            this.tracing = traceExchangeFilterFunction.httpTracing().tracing();
            this.scopePassingTransformer = traceExchangeFilterFunction.scopePassingTransformer;
        }

        @Override // reactor.core.publisher.Mono
        public void subscribe(CoreSubscriber<? super ClientResponse> coreSubscriber) {
            ClientRequest.Builder from = ClientRequest.from(this.request);
            this.next.exchange(from.build()).subscribe((CoreSubscriber) new WebClientTracerSubscriber(coreSubscriber, coreSubscriber.currentContext(), findOrCreateSpan(from), this));
        }

        private Span findOrCreateSpan(ClientRequest.Builder builder) {
            if (TraceExchangeFilterFunction.log.isDebugEnabled()) {
                TraceExchangeFilterFunction.log.debug("Instrumenting WebClient call");
            }
            Span handleSend = this.handler.handleSend(this.injector, builder, this.request, this.tracer.nextSpan());
            if (TraceExchangeFilterFunction.log.isDebugEnabled()) {
                TraceExchangeFilterFunction.log.debug("Handled send of " + handleSend);
            }
            return handleSend;
        }
    }

    public static ExchangeFilterFunction create(BeanFactory beanFactory) {
        return new TraceExchangeFilterFunction(beanFactory);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public TraceExchangeFilterFunction(BeanFactory beanFactory) {
        this.beanFactory = beanFactory;
        this.scopePassingTransformer = ReactorSleuth.scopePassingSpanOperator(beanFactory);
    }

    public Mono<ClientResponse> filter(ClientRequest clientRequest, ExchangeFunction exchangeFunction) {
        return new MonoWebClientTrace(exchangeFunction, clientRequest, this);
    }

    HttpClientHandler<ClientRequest, ClientResponse> handler() {
        if (this.handler == null) {
            this.handler = HttpClientHandler.create((HttpTracing) this.beanFactory.getBean(HttpTracing.class), new HttpAdapter());
        }
        return this.handler;
    }

    Tracer tracer() {
        if (this.tracer == null) {
            this.tracer = httpTracing().tracing().tracer();
        }
        return this.tracer;
    }

    HttpTracing httpTracing() {
        if (this.httpTracing == null) {
            this.httpTracing = (HttpTracing) this.beanFactory.getBean(HttpTracing.class);
        }
        return this.httpTracing;
    }

    TraceContext.Injector<ClientRequest.Builder> injector() {
        if (this.injector == null) {
            this.injector = ((HttpTracing) this.beanFactory.getBean(HttpTracing.class)).tracing().propagation().injector(SETTER);
        }
        return this.injector;
    }
}
