package org.activiti.cloud.services.messages.core.advice;

import java.util.Collection;
import java.util.stream.Collectors;
import org.activiti.cloud.services.messages.core.support.LockTemplate;
import org.activiti.cloud.services.messages.core.support.Predicates;
import org.springframework.integration.aggregator.CorrelationStrategy;
import org.springframework.integration.store.MessageGroupStore;
import org.springframework.integration.util.UUIDConverter;
import org.springframework.messaging.Message;

/* loaded from: input_file:org/activiti/cloud/services/messages/core/advice/SubscriptionCancelledHandlerAdvice.class */
public class SubscriptionCancelledHandlerAdvice extends AbstractMessageConnectorHandlerAdvice {
    private final MessageGroupStore messageStore;
    private final LockTemplate lockTemplate;
    private final CorrelationStrategy correlationStrategy;

    public SubscriptionCancelledHandlerAdvice(MessageGroupStore messageGroupStore, CorrelationStrategy correlationStrategy, LockTemplate lockTemplate) {
        this.messageStore = messageGroupStore;
        this.lockTemplate = lockTemplate;
        this.correlationStrategy = correlationStrategy;
    }

    @Override // org.activiti.cloud.services.messages.core.advice.AbstractMessageConnectorHandlerAdvice
    public <T> T doHandle(Message<?> message) {
        Object correlationKey = this.correlationStrategy.getCorrelationKey(message);
        this.lockTemplate.lockInterruptibly(UUIDConverter.getUUID(correlationKey).toString(), () -> {
            Collection collection = (Collection) this.messageStore.getMessageGroup(correlationKey).getMessages().stream().filter(Predicates.not(Predicates.START_MESSAGE_DEPLOYED)).collect(Collectors.toList());
            if (collection.isEmpty()) {
                return;
            }
            this.messageStore.removeMessagesFromGroup(correlationKey, collection);
        });
        return null;
    }

    @Override // org.activiti.cloud.services.messages.core.advice.AbstractMessageConnectorHandlerAdvice
    public boolean canHandle(Message<?> message) {
        return Predicates.MESSAGE_SUBSCRIPTION_CANCELLED.test(message);
    }
}
