package org.apache.camel.component.aws2.sqs;

import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.ExchangePropertyKey;
import org.apache.camel.Processor;
import org.apache.camel.spi.HeaderFilterStrategy;
import org.apache.camel.spi.ScheduledPollConsumerScheduler;
import org.apache.camel.spi.Synchronization;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.support.DefaultScheduledPollConsumerScheduler;
import org.apache.camel.support.ScheduledBatchPollingConsumer;
import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.core.exception.SdkException;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequest;
import software.amazon.awssdk.services.sqs.model.ChangeMessageVisibilityBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.MessageNotInflightException;
import software.amazon.awssdk.services.sqs.model.QueueDeletedRecentlyException;
import software.amazon.awssdk.services.sqs.model.QueueDoesNotExistException;
import software.amazon.awssdk.services.sqs.model.ReceiptHandleIsInvalidException;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SqsException;

/* loaded from: input_file:docker/prediction-applier/alfresco-hxinsight-connector-prediction-applier-0.0.6-app.jar:BOOT-INF/lib/camel-aws2-sqs-4.4.1.jar:org/apache/camel/component/aws2/sqs/Sqs2Consumer.class */
public class Sqs2Consumer extends ScheduledBatchPollingConsumer {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Sqs2Consumer.class);
    private TimeoutExtender timeoutExtender;
    private ScheduledFuture<?> scheduledFuture;
    private ScheduledExecutorService scheduledExecutor;
    private transient String sqsConsumerToString;
    private Collection<String> attributeNames;
    private Collection<String> messageAttributeNames;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:docker/prediction-applier/alfresco-hxinsight-connector-prediction-applier-0.0.6-app.jar:BOOT-INF/lib/camel-aws2-sqs-4.4.1.jar:org/apache/camel/component/aws2/sqs/Sqs2Consumer$TimeoutExtender.class */
    public class TimeoutExtender implements Runnable {
        private static final int MAX_REQUESTS = 10;
        private final int repeatSeconds;
        private final AtomicBoolean run = new AtomicBoolean(true);
        private final Map<String, ChangeMessageVisibilityBatchRequestEntry> entries = new ConcurrentHashMap();

        TimeoutExtender(int i) {
            this.repeatSeconds = i;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void add(Exchange exchange) {
            exchange.getExchangeExtension().addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.aws2.sqs.Sqs2Consumer.TimeoutExtender.1
                @Override // org.apache.camel.spi.Synchronization
                public void onComplete(Exchange exchange2) {
                    remove(exchange2);
                }

                @Override // org.apache.camel.spi.Synchronization
                public void onFailure(Exchange exchange2) {
                    remove(exchange2);
                }

                private void remove(Exchange exchange2) {
                    Sqs2Consumer.LOG.trace("Removing exchangeId {} from the TimeoutExtender, processing done", exchange2.getExchangeId());
                    TimeoutExtender.this.entries.remove(exchange2.getExchangeId());
                }
            });
            this.entries.put(exchange.getExchangeId(), (ChangeMessageVisibilityBatchRequestEntry) ChangeMessageVisibilityBatchRequestEntry.builder().id(exchange.getExchangeId()).visibilityTimeout(Integer.valueOf(this.repeatSeconds)).receiptHandle((String) exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class)).mo5335build());
        }

        public void cancel() {
            this.run.set(false);
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.run.get()) {
                LinkedList linkedList = new LinkedList(this.entries.values());
                while (!linkedList.isEmpty()) {
                    LinkedList linkedList2 = new LinkedList();
                    while (!linkedList.isEmpty() && linkedList2.size() < 10) {
                        linkedList2.add((ChangeMessageVisibilityBatchRequestEntry) linkedList.poll());
                    }
                    ChangeMessageVisibilityBatchRequest changeMessageVisibilityBatchRequest = (ChangeMessageVisibilityBatchRequest) ChangeMessageVisibilityBatchRequest.builder().queueUrl(Sqs2Consumer.this.getQueueUrl()).entries(linkedList2).mo5335build();
                    try {
                        Sqs2Consumer.LOG.trace("Extending visibility window by {} seconds for request entries {}", Integer.valueOf(this.repeatSeconds), linkedList2);
                        Sqs2Consumer.this.getEndpoint().getClient().changeMessageVisibilityBatch(changeMessageVisibilityBatchRequest);
                        Sqs2Consumer.LOG.debug("Extended visibility window for request entries {}", linkedList2);
                    } catch (MessageNotInflightException | ReceiptHandleIsInvalidException e) {
                    } catch (SqsException e2) {
                        if (!e2.getMessage().contains("Message does not exist or is not available for visibility timeout change")) {
                            logException(e2, linkedList2);
                        }
                    } catch (Exception e3) {
                        logException(e3, linkedList2);
                    }
                }
            }
        }

        private void logException(Exception exc, List<ChangeMessageVisibilityBatchRequestEntry> list) {
            Sqs2Consumer.LOG.warn("Extending visibility window failed for entries {}. Will not attempt to extend visibility further. This exception will be ignored.", list, exc);
        }
    }

    public Sqs2Consumer(Sqs2Endpoint sqs2Endpoint, Processor processor) {
        super(sqs2Endpoint, processor);
        if (getConfiguration().getAttributeNames() != null) {
            this.attributeNames = Arrays.asList(getConfiguration().getAttributeNames().split(","));
        }
        if (getConfiguration().getMessageAttributeNames() != null) {
            this.messageAttributeNames = Arrays.asList(getConfiguration().getMessageAttributeNames().split(","));
        }
    }

    @Override // org.apache.camel.support.ScheduledPollConsumer
    protected int poll() throws Exception {
        ReceiveMessageResponse receiveMessage;
        this.shutdownRunningTask = null;
        this.pendingExchanges = 0;
        ReceiveMessageRequest.Builder queueUrl = ReceiveMessageRequest.builder().queueUrl(getQueueUrl());
        queueUrl.maxNumberOfMessages(getMaxMessagesPerPoll() > 0 ? Integer.valueOf(getMaxMessagesPerPoll()) : null);
        queueUrl.visibilityTimeout(getConfiguration().getVisibilityTimeout());
        queueUrl.waitTimeSeconds(getConfiguration().getWaitTimeSeconds());
        if (this.attributeNames != null) {
            queueUrl.attributeNamesWithStrings(this.attributeNames);
        }
        if (this.messageAttributeNames != null) {
            queueUrl.messageAttributeNames(this.messageAttributeNames);
        }
        LOG.trace("Receiving messages with request [{}]...", queueUrl);
        ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) queueUrl.mo5335build();
        try {
            receiveMessage = getClient().receiveMessage(receiveMessageRequest);
        } catch (QueueDoesNotExistException e) {
            LOG.info("Queue does not exist....recreating now...");
            reConnectToQueue();
            receiveMessage = getClient().receiveMessage(receiveMessageRequest);
        }
        if (LOG.isTraceEnabled()) {
            LOG.trace("Received {} messages", Integer.valueOf(receiveMessage.messages().size()));
        }
        forceConsumerAsReady();
        return processBatch(CastUtils.cast((Queue<?>) createExchanges(receiveMessage.messages())));
    }

    public void reConnectToQueue() {
        try {
            if (getEndpoint().getConfiguration().isAutoCreateQueue()) {
                getEndpoint().createQueue(getClient());
            }
        } catch (QueueDeletedRecentlyException e) {
            LOG.debug("Queue recently deleted, will retry in 30 seconds.");
            try {
                Thread.sleep(30000L);
                getEndpoint().createQueue(getClient());
            } catch (InterruptedException e2) {
                LOG.warn("Interrupted while retrying queue connection.", (Throwable) e2);
                Thread.currentThread().interrupt();
            } catch (Exception e3) {
                LOG.warn("failed to retry queue connection.", (Throwable) e3);
            }
        } catch (Exception e4) {
            LOG.warn("Could not connect to queue in amazon.", (Throwable) e4);
        }
    }

    protected Queue<Exchange> createExchanges(List<Message> list) {
        if (LOG.isTraceEnabled()) {
            LOG.trace("Received {} messages in this poll", Integer.valueOf(list.size()));
        }
        LinkedList linkedList = new LinkedList();
        Iterator<Message> it = list.iterator();
        while (it.hasNext()) {
            linkedList.add(createExchange(it.next()));
        }
        return linkedList;
    }

    @Override // org.apache.camel.BatchConsumer
    public int processBatch(Queue<Object> queue) throws Exception {
        int size = queue.size();
        int i = 0;
        while (i < size && isBatchAllowed()) {
            Exchange exchange = (Exchange) ObjectHelper.cast(Exchange.class, queue.poll());
            exchange.setProperty(ExchangePropertyKey.BATCH_INDEX, Integer.valueOf(i));
            exchange.setProperty(ExchangePropertyKey.BATCH_SIZE, Integer.valueOf(size));
            exchange.setProperty(ExchangePropertyKey.BATCH_COMPLETE, Boolean.valueOf(i == size - 1));
            this.pendingExchanges = (size - i) - 1;
            if (this.timeoutExtender != null) {
                this.timeoutExtender.add(exchange);
            }
            exchange.getExchangeExtension().addOnCompletion(new Synchronization() { // from class: org.apache.camel.component.aws2.sqs.Sqs2Consumer.1
                @Override // org.apache.camel.spi.Synchronization
                public void onComplete(Exchange exchange2) {
                    Sqs2Consumer.this.processCommit(exchange2);
                }

                @Override // org.apache.camel.spi.Synchronization
                public void onFailure(Exchange exchange2) {
                    Sqs2Consumer.this.processRollback(exchange2);
                }

                public String toString() {
                    return "SqsConsumerOnCompletion";
                }
            });
            getAsyncProcessor().process(exchange, defaultConsumerCallback(exchange, true));
            i++;
        }
        return size;
    }

    protected void processCommit(Exchange exchange) {
        try {
            if (shouldDelete(exchange)) {
                String str = (String) exchange.getIn().getHeader(Sqs2Constants.RECEIPT_HANDLE, String.class);
                DeleteMessageRequest.Builder receiptHandle = DeleteMessageRequest.builder().queueUrl(getQueueUrl()).receiptHandle(str);
                LOG.trace("Deleting message with receipt handle {}...", str);
                getClient().deleteMessage((DeleteMessageRequest) receiptHandle.mo5335build());
                LOG.trace("Deleted message with receipt handle {}...", str);
            }
        } catch (SdkException e) {
            getExceptionHandler().handleException("Error occurred during deleting message. This exception is ignored.", exchange, e);
        }
    }

    private boolean shouldDelete(Exchange exchange) {
        return getConfiguration().isDeleteAfterRead() || (exchange.getProperty(Sqs2Constants.SQS_DELETE_FILTERED) != null && getConfiguration().isDeleteIfFiltered() && passedThroughFilter(exchange));
    }

    private boolean passedThroughFilter(Exchange exchange) {
        return ((Boolean) exchange.getProperty(Sqs2Constants.SQS_DELETE_FILTERED, (Object) false, Boolean.class)).booleanValue();
    }

    protected void processRollback(Exchange exchange) {
        Exception exception = exchange.getException();
        if (exception != null) {
            getExceptionHandler().handleException("Error during processing exchange. Will attempt to process the message on next poll.", exchange, exception);
        }
    }

    protected Sqs2Configuration getConfiguration() {
        return getEndpoint().getConfiguration();
    }

    protected SqsClient getClient() {
        return getEndpoint().getClient();
    }

    protected String getQueueUrl() {
        return getEndpoint().getQueueUrl();
    }

    @Override // org.apache.camel.support.DefaultConsumer, org.apache.camel.EndpointAware
    public Sqs2Endpoint getEndpoint() {
        return (Sqs2Endpoint) super.getEndpoint();
    }

    public Exchange createExchange(Message message) {
        return createExchange(getEndpoint().getExchangePattern(), message);
    }

    private Exchange createExchange(ExchangePattern exchangePattern, Message message) {
        Exchange createExchange = createExchange(true);
        createExchange.setPattern(exchangePattern);
        org.apache.camel.Message in = createExchange.getIn();
        in.setBody(message.body());
        in.setHeaders(new HashMap(message.attributesAsStrings()));
        in.setHeader(Sqs2Constants.MESSAGE_ID, message.messageId());
        in.setHeader(Sqs2Constants.MD5_OF_BODY, message.md5OfBody());
        in.setHeader(Sqs2Constants.RECEIPT_HANDLE, message.receiptHandle());
        in.setHeader(Sqs2Constants.ATTRIBUTES, message.attributes());
        in.setHeader(Sqs2Constants.MESSAGE_ATTRIBUTES, message.messageAttributes());
        HeaderFilterStrategy headerFilterStrategy = getEndpoint().getHeaderFilterStrategy();
        for (Map.Entry<String, MessageAttributeValue> entry : message.messageAttributes().entrySet()) {
            String key = entry.getKey();
            Object fromMessageAttributeValue = Sqs2MessageHelper.fromMessageAttributeValue(entry.getValue());
            if (!headerFilterStrategy.applyFilterToExternalHeaders(key, fromMessageAttributeValue, createExchange)) {
                in.setHeader(key, fromMessageAttributeValue);
            }
        }
        return createExchange;
    }

    @Override // org.apache.camel.support.DefaultConsumer
    public String toString() {
        if (this.sqsConsumerToString == null) {
            this.sqsConsumerToString = "SqsConsumer[" + URISupport.sanitizeUri(getEndpoint().getEndpointUri()) + "]";
        }
        return this.sqsConsumerToString;
    }

    @Override // org.apache.camel.support.ScheduledPollConsumer
    protected void afterConfigureScheduler(ScheduledPollConsumerScheduler scheduledPollConsumerScheduler, boolean z) {
        if (z && (scheduledPollConsumerScheduler instanceof DefaultScheduledPollConsumerScheduler)) {
            DefaultScheduledPollConsumerScheduler defaultScheduledPollConsumerScheduler = (DefaultScheduledPollConsumerScheduler) scheduledPollConsumerScheduler;
            defaultScheduledPollConsumerScheduler.setConcurrentConsumers(getConfiguration().getConcurrentConsumers());
            defaultScheduledPollConsumerScheduler.setPoolSize(Math.max(defaultScheduledPollConsumerScheduler.getPoolSize(), getConfiguration().getConcurrentConsumers()));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ScheduledPollConsumer, org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    public void doStart() throws Exception {
        if (getConfiguration().isExtendMessageVisibility() && this.scheduledExecutor == null) {
            ThreadPoolProfile threadPoolProfile = new ThreadPoolProfile("SqsTimeoutExtender");
            threadPoolProfile.setPoolSize(1);
            threadPoolProfile.setAllowCoreThreadTimeOut(false);
            threadPoolProfile.setMaxQueueSize(-1);
            this.scheduledExecutor = getEndpoint().getCamelContext().getExecutorServiceManager().newScheduledThreadPool(this, "SqsTimeoutExtender", threadPoolProfile);
            Integer visibilityTimeout = getConfiguration().getVisibilityTimeout();
            if (visibilityTimeout != null && visibilityTimeout.intValue() > 0) {
                int intValue = visibilityTimeout.intValue();
                int doubleValue = (int) (visibilityTimeout.doubleValue() * 1.5d);
                this.timeoutExtender = new TimeoutExtender(doubleValue);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Scheduled TimeoutExtender task to start after {} delay, and run with {}/{} period/repeat (seconds)", Integer.valueOf(intValue), Integer.valueOf(intValue), Integer.valueOf(doubleValue));
                }
                this.scheduledFuture = this.scheduledExecutor.scheduleAtFixedRate(this.timeoutExtender, intValue, intValue, TimeUnit.SECONDS);
            }
        }
        super.doStart();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.camel.support.ScheduledPollConsumer, org.apache.camel.support.DefaultConsumer, org.apache.camel.support.service.BaseService
    public void doShutdown() throws Exception {
        if (this.timeoutExtender != null) {
            this.timeoutExtender.cancel();
            this.timeoutExtender = null;
        }
        if (this.scheduledFuture != null) {
            this.scheduledFuture.cancel(true);
            this.scheduledFuture = null;
        }
        if (this.scheduledExecutor != null) {
            getEndpoint().getCamelContext().getExecutorServiceManager().shutdownNow(this.scheduledExecutor);
            this.scheduledExecutor = null;
        }
        super.doShutdown();
    }
}
