package org.activiti.cloud.services.messages.tests;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.activiti.api.process.model.builders.MessageEventPayloadBuilder;
import org.activiti.api.process.model.events.BPMNMessageEvent;
import org.activiti.api.process.model.events.MessageDefinitionEvent;
import org.activiti.api.process.model.events.MessageSubscriptionEvent;
import org.activiti.api.process.model.payloads.MessageEventPayload;
import org.activiti.cloud.services.messages.core.aggregator.MessageConnectorAggregator;
import org.activiti.cloud.services.messages.core.channels.MessageConnectorProcessor;
import org.activiti.cloud.services.messages.core.controlbus.ControlBusGateway;
import org.activiti.cloud.services.messages.core.correlation.Correlations;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactories;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.cloud.stream.test.binder.MessageCollector;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.BridgeFrom;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.integration.store.MessageGroup;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageDeliveryException;
import org.springframework.messaging.support.ChannelInterceptor;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.transaction.PlatformTransactionManager;

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE, properties = {"spring.cloud.stream.bindings.input.contentType=application/x-java-object", "spring.cloud.stream.bindings.output.contentType=application/x-java-object"})
@DirtiesContext
@Import({TestConfigurationContext.class})
/* loaded from: input_file:org/activiti/cloud/services/messages/tests/AbstractMessagesCoreIntegrationTests.class */
public abstract class AbstractMessagesCoreIntegrationTests {
    protected ObjectMapper objectMapper = new ObjectMapper();

    @Autowired
    protected MessageConnectorProcessor channels;

    @Autowired
    protected MessageCollector collector;

    @Autowired
    protected MessageGroupStore messageGroupStore;

    @Autowired
    protected MessageConnectorAggregator aggregatingMessageHandler;

    @Autowired
    protected QueueChannel errorQueue;

    @Autowired
    protected QueueChannel discardQueue;

    @Autowired
    protected ControlBusGateway controlBus;

    @Autowired
    protected PlatformTransactionManager transactionManager;

    @Autowired
    protected AbstractMessageChannel output;

    @TestConfiguration
    /* loaded from: input_file:org/activiti/cloud/services/messages/tests/AbstractMessagesCoreIntegrationTests$TestConfigurationContext.class */
    static class TestConfigurationContext {
        TestConfigurationContext() {
        }

        @Bean
        @BridgeFrom("errorChannel")
        MessageChannel errorQueue() {
            return (MessageChannel) MessageChannels.queue().get();
        }

        @Bean
        @BridgeFrom("discardChannel")
        MessageChannel discardQueue() {
            return (MessageChannel) MessageChannels.queue().get();
        }
    }

    /* loaded from: input_file:org/activiti/cloud/services/messages/tests/AbstractMessagesCoreIntegrationTests$Try.class */
    static class Try {

        @FunctionalInterface
        /* loaded from: input_file:org/activiti/cloud/services/messages/tests/AbstractMessagesCoreIntegrationTests$Try$ExceptionWrapper.class */
        public interface ExceptionWrapper<E> {
            E wrap(Exception exc);
        }

        @FunctionalInterface
        /* loaded from: input_file:org/activiti/cloud/services/messages/tests/AbstractMessagesCoreIntegrationTests$Try$RunnableExceptionWrapper.class */
        public interface RunnableExceptionWrapper {
            void run() throws Exception;
        }

        Try() {
        }

        public static <T> T call(Callable<T> callable) throws RuntimeException {
            return (T) call(callable, (v1) -> {
                return new RuntimeException(v1);
            });
        }

        public static void run(RunnableExceptionWrapper runnableExceptionWrapper) {
            try {
                runnableExceptionWrapper.run();
            } catch (Exception e) {
                sneakyThrow(e);
            }
        }

        public static <T, E extends Throwable> T call(Callable<T> callable, ExceptionWrapper<E> exceptionWrapper) throws Throwable {
            try {
                return callable.call();
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e2) {
                throw exceptionWrapper.wrap(e2);
            }
        }

        private static <T extends Throwable> void sneakyThrow(Throwable th) throws Throwable {
            throw th;
        }
    }

    @Timeout(20000)
    @Test
    public void shouldProcessMessageEventsConcurrently() throws InterruptedException, JsonProcessingException {
        String str = "start";
        Integer num = 100;
        Message<MessageEventPayload> startMessageDeployedEvent = startMessageDeployedEvent("start");
        String correlationId = correlationId(startMessageDeployedEvent);
        removeMessageGroup(correlationId);
        send(startMessageDeployedEvent);
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(num.intValue());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        IntStream.range(0, num.intValue()).forEach(i -> {
            sendAsync(messageSentEvent(str), countDownLatch, countDownLatch2, newSingleThreadExecutor);
        });
        countDownLatch.countDown();
        try {
            countDownLatch2.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        IntStream.range(0, num.intValue()).mapToObj(i2 -> {
            return (Message) Try.call(() -> {
                return poll(1L, TimeUnit.SECONDS);
            });
        }).forEach(message -> {
            Assertions.assertThat(message).isNotNull();
        });
        newSingleThreadExecutor.shutdownNow();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1);
        Assertions.assertThat(peek()).isNull();
    }

    @Timeout(20000)
    @Test
    public void shouldProcessMessageEventsConcurrentlyInReversedOrder() throws InterruptedException, JsonProcessingException {
        String str = "start";
        Integer num = 100;
        Message<MessageEventPayload> startMessageDeployedEvent = startMessageDeployedEvent("start");
        String correlationId = correlationId(startMessageDeployedEvent);
        removeMessageGroup(correlationId);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CountDownLatch countDownLatch2 = new CountDownLatch(num.intValue());
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        IntStream.range(0, num.intValue()).forEach(i -> {
            sendAsync(messageSentEvent(str), countDownLatch, countDownLatch2, newSingleThreadExecutor);
        });
        countDownLatch.countDown();
        try {
            countDownLatch2.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(num.intValue());
        send(startMessageDeployedEvent);
        IntStream.range(0, num.intValue()).mapToObj(i2 -> {
            return (Message) Try.call(() -> {
                return poll(3L, TimeUnit.SECONDS);
            });
        }).forEach(message -> {
            Assertions.assertThat(message).isNotNull();
        });
        newSingleThreadExecutor.shutdownNow();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1);
        Assertions.assertThat(peek()).isNull();
    }

    @Test
    public void testStartMessageBeforeSent() throws Exception {
        Message<MessageEventPayload> startMessageDeployedEvent = startMessageDeployedEvent("start1");
        String correlationId = correlationId(startMessageDeployedEvent);
        removeMessageGroup(correlationId);
        send(startMessageDeployedEvent);
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1);
        send(messageSentEvent("start1", null, "sent1"));
        Message poll = poll(0L, TimeUnit.SECONDS);
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(poll).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting(new String[]{"name", "variables"}).contains(new Object[]{"start1", Collections.singletonMap("key", "sent1")});
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("name").containsOnly(new Object[]{"start1"});
        send(messageSentEvent("start1", null, "sent2"));
        Message poll2 = poll(0L, TimeUnit.SECONDS);
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(poll2).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting(new String[]{"name", "variables"}).contains(new Object[]{"start1", Collections.singletonMap("key", "sent2")});
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("name").containsOnly(new Object[]{"start1"});
    }

    @Test
    public void testStartMessageAfterSent() throws Exception {
        Message<MessageEventPayload> messageSentEvent = messageSentEvent("start2", null, "sent1");
        String correlationId = correlationId(messageSentEvent);
        this.messageGroupStore.removeMessageGroup(correlationId);
        send(messageSentEvent);
        send(startMessageDeployedEvent("start2"));
        Message poll = poll(0L, TimeUnit.SECONDS);
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(poll).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting(new String[]{"name", "businessKey", "variables"}).contains(new Object[]{"start2", "sent1", Collections.singletonMap("key", "sent1")});
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("name").containsOnly(new Object[]{"start2"});
        send(messageSentEvent("start2", null, "sent2"));
        Message poll2 = poll(0L, TimeUnit.SECONDS);
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(poll2).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting(new String[]{"name", "businessKey", "variables"}).contains(new Object[]{"start2", "sent2", Collections.singletonMap("key", "sent2")});
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("name").containsOnly(new Object[]{"start2"});
    }

    @Test
    public void testSentMessagesWithBuffer() throws Exception {
        Message<MessageEventPayload> messageSentEvent = messageSentEvent("message", "1", "sent1");
        String correlationId = correlationId(messageSentEvent);
        this.messageGroupStore.removeMessageGroup(correlationId);
        send(messageSentEvent);
        send(messageWaitingEvent("message", "1", "waiting1"));
        send(messageWaitingEvent("message", "1", "waiting2"));
        Assertions.assertThat(poll(0L, TimeUnit.SECONDS)).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting("variables").asInstanceOf(InstanceOfAssertFactories.MAP).containsEntry("key", "sent1");
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(2).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "waiting1"), Collections.singletonMap("key", "waiting2")});
        send(messageReceivedEvent("message", "1"));
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "waiting2")});
        send(messageSentEvent("message", "1", "sent2"));
        Assertions.assertThat(poll(1L, TimeUnit.SECONDS)).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting("variables").asInstanceOf(InstanceOfAssertFactories.MAP).containsEntry("key", "sent2");
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "waiting2")});
        send(messageReceivedEvent("message", "1"));
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).isEmpty();
    }

    @Test
    public void testReceiveMessagePayload() throws Exception {
        Message<MessageEventPayload> messageSentEvent = messageSentEvent("message", "1", "businessKey");
        String correlationId = correlationId(messageSentEvent);
        this.messageGroupStore.removeMessageGroup(correlationId);
        send(messageSentEvent);
        send(messageWaitingEvent("message", "1", "businessKey"));
        Assertions.assertThat(poll(0L, TimeUnit.SECONDS)).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting(new String[]{"name", "correlationKey", "variables"}).contains(new Object[]{"message", "1", Collections.singletonMap("key", "businessKey")});
        Assertions.assertThat(peek()).isNull();
        this.messageGroupStore.removeMessageGroup(correlationId);
    }

    @Test
    public void testStartMessagePayload() throws Exception {
        Message<MessageEventPayload> startMessageDeployedEvent = startMessageDeployedEvent("message");
        String correlationId = correlationId(startMessageDeployedEvent);
        this.messageGroupStore.removeMessageGroup(correlationId);
        send(startMessageDeployedEvent);
        send(messageSentEvent("message", null, "businessKey"));
        Assertions.assertThat(poll(0L, TimeUnit.SECONDS)).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting(new String[]{"name", "businessKey", "variables"}).contains(new Object[]{"message", "businessKey", Collections.singletonMap("key", "businessKey")});
        Assertions.assertThat(peek()).isNull();
        this.messageGroupStore.removeMessageGroup(correlationId);
    }

    @Test
    public void testSentMessagesWithBufferInDifferentOrder() throws Exception {
        String correlationId = correlationId(messageWaitingEvent("message", "1"));
        this.messageGroupStore.removeMessageGroup(correlationId);
        send(messageSentEvent("message", "1", "sent1"));
        send(messageSentEvent("message", "1", "sent2"));
        send(messageWaitingEvent("message", "1", "waiting1"));
        Assertions.assertThat(poll(0L, TimeUnit.SECONDS)).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting("variables").asInstanceOf(InstanceOfAssertFactories.MAP).containsEntry("key", "sent1");
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(2).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("variables").contains(new Object[]{Collections.singletonMap("key", "sent2"), Collections.singletonMap("key", "waiting1")});
        send(messageReceivedEvent("message", "1", "received1"));
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "sent2")});
        send(messageWaitingEvent("message", "1", "waiting2"));
        Message poll = poll(0L, TimeUnit.SECONDS);
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(poll).isNotNull().extracting((v0) -> {
            return v0.getPayload();
        }).extracting("variables").asInstanceOf(InstanceOfAssertFactories.MAP).containsEntry("key", "sent2");
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1).extracting((v0) -> {
            return v0.getPayload();
        }).asList().extracting("variables").containsOnly(new Object[]{Collections.singletonMap("key", "waiting2")});
        send(messageReceivedEvent("message", "1", "received2"));
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).isEmpty();
    }

    @Test
    public void testSubscriptionCancelled() throws Exception {
        Message<MessageEventPayload> subscriptionCancelledEvent = subscriptionCancelledEvent("message", "1");
        String correlationId = correlationId(subscriptionCancelledEvent);
        this.messageGroupStore.removeMessageGroup(correlationId);
        send(messageWaitingEvent("message", "1"));
        send(messageWaitingEvent("message", "1"));
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(2);
        send(subscriptionCancelledEvent);
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).isEmpty();
    }

    @Test
    public void testIdempotentMessageInterceptor() throws Exception {
        Message<MessageEventPayload> messageWaitingEvent = messageWaitingEvent("message", "1");
        String correlationId = correlationId(messageWaitingEvent);
        this.messageGroupStore.removeMessageGroup(correlationId);
        send(messageWaitingEvent);
        send(messageWaitingEvent);
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(this.errorQueue.receive(0L)).isNotNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).isNotNull().hasSize(1);
        Message<MessageEventPayload> messageReceivedEvent = messageReceivedEvent("message", "1");
        send(messageReceivedEvent);
        send(messageReceivedEvent);
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(this.errorQueue.receive(0L)).isNotNull();
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(0);
    }

    @Test
    public void testMessageFilterDiscardChannel() throws Exception {
        this.channels.input().send(MessageBuilder.withPayload("message").setHeader("contentType", "text/plain").build());
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(this.discardQueue.receive(0L).getPayload()).isEqualTo("message");
    }

    @Test
    public void testInvalidMessagePayloadDiscardChannel() throws Exception {
        Message build = MessageBuilder.withPayload("payload").setHeader("contentType", "text/plain").setHeader("messageEventType", BPMNMessageEvent.MessageEvents.MESSAGE_SENT.name()).build();
        Throwable catchThrowable = Assertions.catchThrowable(() -> {
            this.channels.input().send(build);
        });
        Assertions.assertThat(peek()).isNull();
        Assertions.assertThat(this.errorQueue.receive(0L)).isNull();
        Assertions.assertThat(catchThrowable).isInstanceOf(MessageTransformationException.class);
    }

    @Test
    public void testControlBusStartStopComponents() throws Exception {
        String str = "test";
        this.controlBus.send("@aggregator.stop()");
        Assertions.assertThat(Assertions.catchThrowable(() -> {
            send(messageSentEvent(str, null, "error"));
        })).isInstanceOf(MessageDeliveryException.class);
        this.controlBus.send("@aggregator.start()");
    }

    @Test
    public void testTransactionException() throws Exception {
        String str = "start1";
        Message<MessageEventPayload> startMessageDeployedEvent = startMessageDeployedEvent("start1");
        String correlationId = correlationId(startMessageDeployedEvent);
        removeMessageGroup(correlationId);
        send(startMessageDeployedEvent);
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1);
        ChannelInterceptor channelInterceptor = new ChannelInterceptor() { // from class: org.activiti.cloud.services.messages.tests.AbstractMessagesCoreIntegrationTests.1
            public Message<?> preSend(Message<?> message, MessageChannel messageChannel) {
                throw new RuntimeException("transaction failed");
            }
        };
        this.output.addInterceptor(channelInterceptor);
        Throwable catchThrowable = Assertions.catchThrowable(() -> {
            send(messageSentEvent(str, null, "error"));
        });
        this.output.removeInterceptor(channelInterceptor);
        Assertions.assertThat(messageGroup(correlationId).getMessages()).hasSize(1);
        Assertions.assertThat(catchThrowable).isInstanceOf(MessageDeliveryException.class);
    }

    protected MessageBuilder<MessageEventPayload> messageBuilder(String str) {
        return messageBuilder(str, null, null);
    }

    protected MessageBuilder<MessageEventPayload> messageBuilder(String str, String str2) {
        return messageBuilder(str, str2, null);
    }

    protected MessageBuilder<MessageEventPayload> messageBuilder(String str, String str2, String str3) {
        return MessageBuilder.withPayload(MessageEventPayloadBuilder.messageEvent(str).withCorrelationKey(str2).withBusinessKey(str3).withVariables(Collections.singletonMap("key", str3)).build()).setHeader("messageEventName", str).setHeader("messageEventCorrelationKey", str2).setHeader("messageEventId", UUID.randomUUID()).setHeader("serviceFullName", "rb");
    }

    protected Message<MessageEventPayload> startMessageDeployedEvent(String str) {
        return messageBuilder(str, null).setHeader("messageEventType", MessageDefinitionEvent.MessageDefinitionEvents.START_MESSAGE_DEPLOYED.name()).build();
    }

    protected Message<MessageEventPayload> messageSentEvent(String str) {
        return messageSentEvent(str, null);
    }

    protected Message<MessageEventPayload> messageSentEvent(String str, String str2) {
        return messageSentEvent(str, str2, null);
    }

    protected Message<MessageEventPayload> messageSentEvent(String str, String str2, String str3) {
        return messageBuilder(str, str2, str3).setHeader("messageEventType", BPMNMessageEvent.MessageEvents.MESSAGE_SENT.name()).build();
    }

    protected Message<MessageEventPayload> messageWaitingEvent(String str) {
        return messageWaitingEvent(str, null);
    }

    protected Message<MessageEventPayload> messageWaitingEvent(String str, String str2, String str3) {
        return messageBuilder(str, str2, str3).setHeader("messageEventType", BPMNMessageEvent.MessageEvents.MESSAGE_WAITING.name()).build();
    }

    protected Message<MessageEventPayload> messageWaitingEvent(String str, String str2) {
        return messageBuilder(str, str2).setHeader("messageEventType", BPMNMessageEvent.MessageEvents.MESSAGE_WAITING.name()).build();
    }

    protected Message<MessageEventPayload> messageReceivedEvent(String str, String str2) {
        return messageReceivedEvent(str, str2, null);
    }

    protected Message<MessageEventPayload> messageReceivedEvent(String str, String str2, String str3) {
        return messageBuilder(str, str2, str3).setHeader("messageEventType", BPMNMessageEvent.MessageEvents.MESSAGE_RECEIVED.name()).build();
    }

    protected Message<MessageEventPayload> subscriptionCancelledEvent(String str, String str2) {
        return messageBuilder(str, str2).setHeader("messageEventType", MessageSubscriptionEvent.MessageSubscriptionEvents.MESSAGE_SUBSCRIPTION_CANCELLED.name()).build();
    }

    protected void send(Message<?> message) {
        try {
            this.channels.input().send(MessageBuilder.withPayload(this.objectMapper.writeValueAsString(message.getPayload())).copyHeaders(message.getHeaders()).build());
        } catch (JsonProcessingException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    protected <T> Message<T> poll(long j, TimeUnit timeUnit) throws InterruptedException {
        return (Message) this.collector.forChannel(this.channels.output()).poll(j, timeUnit);
    }

    protected <T> Message<T> peek() {
        return (Message) this.collector.forChannel(this.channels.output()).peek();
    }

    protected MessageGroup messageGroup(String str) {
        return this.aggregatingMessageHandler.getMessageStore().getMessageGroup(str);
    }

    protected String correlationId(Message<?> message) {
        return Correlations.getCorrelationId(message);
    }

    protected void removeMessageGroup(String str) {
        this.messageGroupStore.removeMessageGroup(str);
    }

    protected void sendAsync(Message<?> message, CountDownLatch countDownLatch, CountDownLatch countDownLatch2, ExecutorService executorService) {
        executorService.execute(() -> {
            try {
                countDownLatch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            Try.run(() -> {
                send(message);
            });
            countDownLatch2.countDown();
        });
    }
}
