package org.activiti.cloud.acc.core.steps.notifications;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.time.Duration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import net.thucydides.core.annotations.Step;
import org.activiti.cloud.acc.core.config.RuntimeTestsConfigurationProperties;
import org.activiti.cloud.acc.core.rest.RuntimeDirtyContextHandler;
import org.activiti.cloud.acc.core.rest.feign.EnableRuntimeFeignContext;
import org.activiti.cloud.acc.shared.service.BaseService;
import org.assertj.core.api.Assertions;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import reactor.core.publisher.Mono;
import reactor.core.publisher.ReplayProcessor;
import reactor.netty.http.client.HttpClient;
import reactor.test.StepVerifier;

@EnableRuntimeFeignContext
/* loaded from: input_file:org/activiti/cloud/acc/core/steps/notifications/NotificationsSteps.class */
public class NotificationsSteps {
    private static final String GRAPHQL_WS = "graphql-ws";
    private static final String AUTHORIZATION = "Authorization";
    private static final Duration TIMEOUT = Duration.ofMillis(90000);

    @Autowired
    private RuntimeDirtyContextHandler dirtyContextHandler;

    @Autowired
    private RuntimeTestsConfigurationProperties properties;

    @Autowired
    @Qualifier("runtimeBundleBaseService")
    private BaseService baseService;

    @Autowired
    private ObjectMapper objectMapper;

    @Step
    public void checkServicesHealth() {
        Assertions.assertThat(this.baseService.isServiceUp()).isTrue();
    }

    public String getRuntimeBundleServiceName() {
        return this.properties.getRuntimeBundleServiceName();
    }

    @Step
    public ReplayProcessor<String> subscribe(String str, final String str2, final Map<String, Object> map, Consumer<Subscription> consumer) throws InterruptedException {
        ReplayProcessor<String> create = ReplayProcessor.create();
        HttpClient.WebsocketSender uri = HttpClient.create().wiretap(true).headers(httpHeaders -> {
            httpHeaders.add(AUTHORIZATION, "Bearer " + str);
        }).websocket(GRAPHQL_WS).uri(this.properties.getGraphqlWsUrl());
        try {
            String writeValueAsString = this.objectMapper.writeValueAsString(new LinkedHashMap<String, Object>() { // from class: org.activiti.cloud.acc.core.steps.notifications.NotificationsSteps.1
                {
                    put("type", "start");
                    put("id", "1");
                    put("payload", new LinkedHashMap<String, Object>() { // from class: org.activiti.cloud.acc.core.steps.notifications.NotificationsSteps.1.1
                        {
                            put("query", str2);
                            put("variables", map);
                        }
                    });
                }
            });
            uri.handle((websocketInbound, websocketOutbound) -> {
                websocketOutbound.sendString(Mono.just(writeValueAsString)).then().log("send").subscribe();
                return websocketInbound.aggregateFrames().receive().asString().log("receive").subscribeWith(create).doOnCancel(() -> {
                    Mono sendClose = websocketOutbound.sendClose();
                    Objects.requireNonNull(create);
                    sendClose.doOnTerminate(create::onComplete).block(Duration.ofSeconds(2L));
                }).doOnSubscribe(consumer);
            }).log("handle").subscribe();
            return create;
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    @Step
    public void verifyData(ReplayProcessor<String> replayProcessor, String... strArr) {
        StepVerifier.create(replayProcessor).expectNext(strArr).expectComplete().verify(TIMEOUT);
    }
}
