package com.hazelcast.impl.executor;

import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.StandardLoggerFactory;
import com.hazelcast.util.RandomBlockJUnit4ClassRunner;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(RandomBlockJUnit4ClassRunner.class)
/* loaded from: input_file:com/hazelcast/impl/executor/ParallelExecutorServiceImplStressTest.class */
public class ParallelExecutorServiceImplStressTest {
    public static final int TASK_DURATION_MS = 50;
    public static final int POOL_SIZE = 10;
    public static final int CONCURRENCY_LEVEL = 20;
    public static final int TASK_COUNT = 10000;
    private AtomicLong violationCounter;
    private CountDownLatch completedLatch;
    private ThreadPoolExecutor executorService;
    private ParallelExecutorService parallelExecutorService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/impl/executor/ParallelExecutorServiceImplStressTest$Segment.class */
    public static class Segment {
        final long segment;
        final AtomicLong activeCounter = new AtomicLong();
        final AtomicLong sequenceIdGenerator = new AtomicLong();
        final AtomicLong expectedSequenceNumber = new AtomicLong();
        final Random random = new Random();

        Segment(long j) {
            this.segment = j;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/hazelcast/impl/executor/ParallelExecutorServiceImplStressTest$TestRunnable.class */
    public class TestRunnable implements Runnable {
        private final Segment segment;
        private final long sequenceNumber;

        TestRunnable(Segment segment) {
            this.segment = segment;
            this.sequenceNumber = segment.sequenceIdGenerator.getAndIncrement();
        }

        @Override // java.lang.Runnable
        public void run() {
            if (this.segment.activeCounter.getAndIncrement() != 0) {
                System.out.println("ERROR: Concurrent execution within a segment has taken place");
                ParallelExecutorServiceImplStressTest.this.violationCounter.incrementAndGet();
            }
            if (this.segment.expectedSequenceNumber.getAndIncrement() != this.sequenceNumber) {
                System.out.println("ERROR: An out of order execution within a segment has taken place");
                ParallelExecutorServiceImplStressTest.this.violationCounter.incrementAndGet();
            }
            if (this.sequenceNumber % 100 == 0) {
                System.out.println("Segment [" + this.segment.segment + "] is at element [" + this.sequenceNumber + "]");
            }
            ParallelExecutorServiceImplStressTest.sleepMs(this.segment.random.nextInt(50));
            this.segment.activeCounter.decrementAndGet();
            ParallelExecutorServiceImplStressTest.this.completedLatch.countDown();
        }
    }

    @Before
    public void setUp() {
        ILogger logger = new StandardLoggerFactory().getLogger(ParallelExecutorServiceImplStressTest.class.getName());
        this.executorService = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue());
        this.parallelExecutorService = new ParallelExecutorService(logger, this.executorService);
        this.violationCounter = new AtomicLong();
        this.completedLatch = new CountDownLatch(TASK_COUNT);
    }

    @After
    public void tearDown() throws InterruptedException {
        this.executorService.shutdownNow();
        Assert.assertTrue("ExecutorService failed to terminate within timeout window", this.executorService.awaitTermination(10L, TimeUnit.SECONDS));
    }

    @Test
    public void testBlocking() throws Exception {
        test(10);
    }

    @Test
    public void testNonBlocking() throws Exception {
        test(Integer.MAX_VALUE);
    }

    public void test(int i) throws Exception {
        ParallelExecutor newBlockingParallelExecutor = this.parallelExecutorService.newBlockingParallelExecutor(20, i);
        Random random = new Random();
        Segment[] segmentArr = new Segment[20];
        for (int i2 = 0; i2 < segmentArr.length; i2++) {
            segmentArr[i2] = new Segment(i2);
        }
        for (int i3 = 0; i3 < 10000; i3++) {
            sleepMs(random.nextInt(2));
            int nextInt = random.nextInt(20);
            newBlockingParallelExecutor.execute(new TestRunnable(segmentArr[nextInt]), nextInt);
        }
        Assert.assertTrue("The tasks where not executed in the given timeout", this.completedLatch.await(120L, TimeUnit.SECONDS));
        Assert.assertEquals(this.violationCounter.get(), 0L);
    }

    static void sleepMs(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
