package com.hazelcast.impl;

import com.hazelcast.config.Config;
import com.hazelcast.config.MapStoreConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IQueue;
import com.hazelcast.core.MapStore;
import com.hazelcast.core.Transaction;
import com.hazelcast.util.RandomBlockJUnit4ClassRunner;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(RandomBlockJUnit4ClassRunner.class)
/* loaded from: input_file:com/hazelcast/impl/ClusterQueueTest.class */
public class ClusterQueueTest {
    @BeforeClass
    public static void init() throws Exception {
        System.setProperty("hazelcast.wait.seconds.before.join", "1");
        System.setProperty("hazelcast.version.check.enabled", "false");
        Hazelcast.shutdownAll();
    }

    @After
    public void cleanup() throws Exception {
        Hazelcast.shutdownAll();
    }

    @Test
    public void testOffer() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(new Config());
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(new Config());
        IQueue queue = newHazelcastInstance.getQueue("default");
        IQueue queue2 = newHazelcastInstance2.getQueue("default");
        for (int i = 0; i < 100; i++) {
            Assert.assertTrue(queue.offer("item" + i, 100L, TimeUnit.SECONDS));
            Assert.assertTrue(queue2.offer("item" + i, 100L, TimeUnit.SECONDS));
        }
        Assert.assertEquals("item0", queue.peek());
        Assert.assertEquals("item0", queue2.peek());
        for (int i2 = 0; i2 < 100; i2++) {
            Assert.assertEquals("item" + i2, queue.poll());
            Assert.assertEquals("item" + i2, queue2.poll());
        }
    }

    @Test
    public void testDeadTaker() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(new Config());
        final HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(new Config());
        IQueue queue = newHazelcastInstance.getQueue("default");
        final IQueue queue2 = newHazelcastInstance2.getQueue("default");
        new Thread(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    queue2.take();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(2000L);
                    newHazelcastInstance2.getLifecycleService().kill();
                    countDownLatch.countDown();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        Assert.assertTrue(countDownLatch.await(100L, TimeUnit.SECONDS));
        queue.offer("item");
        Assert.assertEquals(1L, queue.size());
        Assert.assertEquals("item", queue.poll());
    }

    @Test
    public void testPollTxn() throws Exception {
        final HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(new Config());
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(new Config());
        final HazelcastInstance newHazelcastInstance3 = Hazelcast.newHazelcastInstance(new Config());
        final IQueue queue = newHazelcastInstance.getQueue("default");
        IQueue queue2 = newHazelcastInstance2.getQueue("default");
        final IQueue queue3 = newHazelcastInstance3.getQueue("default");
        Assert.assertTrue(queue2.offer("item"));
        Assert.assertEquals(1L, newHazelcastInstance2.getMap("q:default").size());
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        new Thread(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.3
            @Override // java.lang.Runnable
            public void run() {
                newHazelcastInstance3.getTransaction().begin();
                Assert.assertEquals("item", queue3.poll());
                countDownLatch.countDown();
            }
        }).start();
        Assert.assertTrue(countDownLatch.await(50L, TimeUnit.SECONDS));
        newHazelcastInstance3.getLifecycleService().shutdown();
        Thread.sleep(4000L);
        Assert.assertEquals(1L, newHazelcastInstance2.getMap("q:default").size());
        Assert.assertEquals("item", queue2.poll());
        Hazelcast.newHazelcastInstance(new Config()).getQueue("default");
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        Assert.assertTrue(queue2.offer("item2"));
        new Thread(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.4
            @Override // java.lang.Runnable
            public void run() {
                newHazelcastInstance.getTransaction().begin();
                Assert.assertEquals("item2", queue.poll());
                countDownLatch2.countDown();
            }
        }).start();
        Assert.assertTrue(countDownLatch2.await(100L, TimeUnit.SECONDS));
        newHazelcastInstance.getLifecycleService().shutdown();
        Thread.sleep(4000L);
        Assert.assertEquals("item2", queue2.poll());
    }

    @Test
    public void testShutdown() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(new Config());
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(new Config());
        IQueue queue = newHazelcastInstance.getQueue("default");
        IQueue queue2 = newHazelcastInstance2.getQueue("default");
        int i = 0;
        while (i < 40) {
            int i2 = i;
            int i3 = i + 1;
            Assert.assertTrue(queue.offer("item" + i2, 100L, TimeUnit.SECONDS));
            i = i3 + 1;
            Assert.assertTrue(queue2.offer("item" + i3, 100L, TimeUnit.SECONDS));
        }
        newHazelcastInstance.getLifecycleService().shutdown();
        int i4 = 40;
        while (i4 < 100) {
            int i5 = i4;
            i4++;
            Assert.assertTrue(queue2.offer("item" + i5, 100L, TimeUnit.SECONDS));
        }
        for (int i6 = 0; i6 < 100; i6++) {
            Assert.assertEquals("item" + i6, queue2.poll());
        }
    }

    @Test
    public void testPollNull() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(new Config());
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(new Config());
        IQueue queue = newHazelcastInstance.getQueue("default");
        IQueue queue2 = newHazelcastInstance2.getQueue("default");
        for (int i = 0; i < 100; i++) {
            Assert.assertNull(queue.poll());
            Assert.assertNull(queue2.poll());
        }
        Assert.assertNull(queue.poll(2L, TimeUnit.SECONDS));
        Assert.assertNull(queue2.poll(2L, TimeUnit.SECONDS));
    }

    @Test
    public void testTake() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(new Config());
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(new Config());
        final IQueue queue = newHazelcastInstance.getQueue("default");
        final IQueue queue2 = newHazelcastInstance2.getQueue("default");
        new Thread(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(3000L);
                    for (int i = 0; i < 100; i++) {
                        Assert.assertTrue(queue.offer("item"));
                        Assert.assertTrue(queue2.offer("item"));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(50);
        final CountDownLatch countDownLatch = new CountDownLatch(200);
        for (int i = 0; i < 100; i++) {
            newFixedThreadPool.execute(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.6
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Assert.assertEquals("item", queue.take());
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            newFixedThreadPool.execute(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.7
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Assert.assertEquals("item", queue2.take());
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        Assert.assertTrue(countDownLatch.await(100L, TimeUnit.SECONDS));
        newFixedThreadPool.shutdown();
    }

    @Test
    public void testPollLong() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(new Config());
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(new Config());
        final IQueue queue = newHazelcastInstance.getQueue("default");
        final IQueue queue2 = newHazelcastInstance2.getQueue("default");
        Thread.sleep(1000L);
        new Thread(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.8
            @Override // java.lang.Runnable
            public void run() {
                for (int i = 0; i < 100; i++) {
                    Assert.assertTrue(queue.offer("item"));
                    Assert.assertTrue(queue2.offer("item"));
                }
            }
        }).start();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(50);
        final CountDownLatch countDownLatch = new CountDownLatch(200);
        Thread.sleep(3000L);
        for (int i = 0; i < 100; i++) {
            newFixedThreadPool.execute(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.9
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Assert.assertEquals("item", queue.poll(5L, TimeUnit.SECONDS));
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            newFixedThreadPool.execute(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.10
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Assert.assertEquals("item", queue2.poll(5L, TimeUnit.SECONDS));
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        Assert.assertTrue(countDownLatch.await(100L, TimeUnit.SECONDS));
        newFixedThreadPool.shutdown();
    }

    @Test
    public void testOfferLong() throws Exception {
        Config config = new Config();
        config.getQueueConfig("default").setBackingMapRef("default").setMaxSizePerJVM(100);
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(config);
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance(config);
        final IQueue queue = newHazelcastInstance.getQueue("default");
        final IQueue queue2 = newHazelcastInstance2.getQueue("default");
        for (int i = 0; i < 100; i++) {
            Assert.assertTrue(queue.offer("item" + i, 100L, TimeUnit.SECONDS));
            Assert.assertTrue(queue2.offer("item" + i, 100L, TimeUnit.SECONDS));
        }
        Assert.assertFalse(queue.offer("item"));
        Assert.assertFalse(queue2.offer("item"));
        Assert.assertFalse(queue.offer("item", 2L, TimeUnit.SECONDS));
        Assert.assertFalse(queue2.offer("item", 2L, TimeUnit.SECONDS));
        new Thread(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.11
            @Override // java.lang.Runnable
            public void run() {
                try {
                    Thread.sleep(3000L);
                    for (int i2 = 0; i2 < 100; i2++) {
                        Assert.assertEquals("item" + i2, queue.poll(2L, TimeUnit.SECONDS));
                        Assert.assertEquals("item" + i2, queue2.poll(2L, TimeUnit.SECONDS));
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(50);
        final CountDownLatch countDownLatch = new CountDownLatch(200);
        for (int i2 = 0; i2 < 100; i2++) {
            newFixedThreadPool.execute(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.12
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Assert.assertTrue(queue.offer("item", 30L, TimeUnit.SECONDS));
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            newFixedThreadPool.execute(new Runnable() { // from class: com.hazelcast.impl.ClusterQueueTest.13
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        Assert.assertTrue(queue2.offer("item", 30L, TimeUnit.SECONDS));
                        countDownLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
        Assert.assertTrue(countDownLatch.await(100L, TimeUnit.SECONDS));
        newFixedThreadPool.shutdown();
    }

    @Test
    public void testQueueAfterShutdown() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance((Config) null);
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance((Config) null);
        IQueue queue = newHazelcastInstance.getQueue("default");
        newHazelcastInstance2.getQueue("default").offer("item");
        Assert.assertEquals(1L, queue.size());
        Assert.assertEquals(1L, r0.size());
        Assert.assertEquals("item", queue.take());
        Assert.assertEquals(0L, queue.size());
        Assert.assertEquals(0L, r0.size());
        newHazelcastInstance.getLifecycleService().shutdown();
        Assert.assertEquals(0L, r0.size());
    }

    @Test
    public void testQueueAfterShutdown2() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance((Config) null);
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance((Config) null);
        IQueue queue = newHazelcastInstance.getQueue("default");
        IQueue queue2 = newHazelcastInstance2.getQueue("default");
        queue.offer("item");
        Assert.assertEquals(1L, queue.size());
        Assert.assertEquals(1L, queue2.size());
        Assert.assertEquals("item", queue2.take());
        Assert.assertEquals(0L, queue.size());
        Assert.assertEquals(0L, queue2.size());
        newHazelcastInstance2.getLifecycleService().shutdown();
        Assert.assertEquals(0L, queue.size());
    }

    @Test
    public void queueEntriesShouldBeConsistentAfterShutdown() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance((Config) null);
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance((Config) null);
        IQueue queue = newHazelcastInstance.getQueue("q");
        IQueue queue2 = newHazelcastInstance2.getQueue("q");
        for (int i = 0; i < 5; i++) {
            queue.offer("item" + i);
        }
        Assert.assertEquals(5L, queue.size());
        Assert.assertEquals(5L, queue2.size());
        Assert.assertEquals("item0", queue2.poll());
        Assert.assertEquals("item1", queue2.poll());
        Assert.assertEquals("item2", queue2.poll());
        Thread.sleep(10000L);
        Assert.assertEquals(2L, queue.size());
        Assert.assertEquals(2L, queue2.size());
        newHazelcastInstance.getLifecycleService().shutdown();
        Thread.sleep(5000L);
        Assert.assertEquals(2L, queue2.size());
    }

    @Test
    public void queueEntriesShouldBeConsistentAfterShutdown2() throws Exception {
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance((Config) null);
        HazelcastInstance newHazelcastInstance2 = Hazelcast.newHazelcastInstance((Config) null);
        IQueue queue = newHazelcastInstance.getQueue("q");
        IQueue queue2 = newHazelcastInstance2.getQueue("q");
        for (int i = 0; i < 5; i++) {
            queue.offer("item" + i);
        }
        Assert.assertEquals(5L, queue.size());
        Assert.assertEquals(5L, queue2.size());
        Assert.assertEquals("item0", queue.poll());
        Assert.assertEquals("item1", queue.poll());
        Assert.assertEquals("item2", queue.poll());
        Thread.sleep(10000L);
        Assert.assertEquals(2L, queue.size());
        Assert.assertEquals(2L, queue2.size());
        newHazelcastInstance.getLifecycleService().shutdown();
        Thread.sleep(5000L);
        Assert.assertEquals(2L, queue2.size());
    }

    @Test
    public void testSuperClientWithQueues() {
        Config config = new Config();
        config.setLiteMember(true);
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(new Config());
        IQueue queue = Hazelcast.newHazelcastInstance(config).getQueue("default");
        IQueue queue2 = newHazelcastInstance.getQueue("default");
        for (int i = 0; i < 12000; i++) {
            String str = "item" + i;
            queue.offer(str);
            Assert.assertEquals(str, queue2.poll());
        }
        for (int i2 = 0; i2 < 5000; i2++) {
            String str2 = "item" + i2;
            queue2.offer(str2);
            Assert.assertEquals(str2, queue.poll());
        }
    }

    @Test(timeout = 100000)
    public void storedQueueWithExistingItemsAndTransactionRollback() throws InterruptedException {
        final ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        long j = 1;
        while (true) {
            long j2 = j;
            if (j2 >= 7) {
                break;
            }
            concurrentHashMap.put(Long.valueOf(j2), "Event" + j2);
            j = j2 + 1;
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        Config config = new Config();
        config.getMapConfig("queue-map").setMapStoreConfig(new MapStoreConfig().setWriteDelaySeconds(1).setImplementation(new MapStore<Long, String>() { // from class: com.hazelcast.impl.ClusterQueueTest.14
            public String load(Long l) {
                return (String) concurrentHashMap.get(l);
            }

            public Map<Long, String> loadAll(Collection<Long> collection) {
                HashMap hashMap = new HashMap();
                for (Long l : collection) {
                    String load = load(l);
                    if (load != null) {
                        hashMap.put(l, load);
                    }
                }
                return hashMap;
            }

            public Set<Long> loadAllKeys() {
                return concurrentHashMap.keySet();
            }

            public void store(Long l, String str) {
                countDownLatch.countDown();
            }

            public void storeAll(Map<Long, String> map) {
                for (Map.Entry<Long, String> entry : map.entrySet()) {
                    store(entry.getKey(), entry.getValue());
                }
            }

            public void delete(Long l) {
                concurrentHashMap.remove(l);
            }

            public void deleteAll(Collection<Long> collection) {
                Iterator it = concurrentHashMap.keySet().iterator();
                while (it.hasNext()) {
                    delete((Long) it.next());
                }
            }
        }));
        config.getQueueConfig("tasks").setBackingMapRef("queue-map");
        HazelcastInstance newHazelcastInstance = Hazelcast.newHazelcastInstance(config);
        IQueue queue = newHazelcastInstance.getQueue("tasks");
        Assert.assertEquals(concurrentHashMap.size(), queue.size());
        Transaction transaction = newHazelcastInstance.getTransaction();
        transaction.begin();
        Assert.assertEquals(concurrentHashMap.get(1L), queue.poll());
        Assert.assertEquals(concurrentHashMap.get(2L), queue.take());
        CMap cMap = TestUtil.getCMap(newHazelcastInstance, "q:tasks");
        Assert.assertEquals(2L, cMap.mapRecords.size());
        for (Record record : cMap.mapRecords.values()) {
            Assert.assertTrue(record.isActive());
            Assert.assertFalse(record.isRemovable());
            Assert.assertNotNull(record.getValueData());
            if (record.getKey().equals(1L)) {
                Assert.assertEquals("Event1", record.getValue());
            } else if (record.getKey().equals(2L)) {
                Assert.assertEquals("Event2", record.getValue());
            } else {
                Assert.fail("Invalid key: " + record.getKey());
            }
        }
        transaction.rollback();
        Assert.assertFalse(countDownLatch.await(5L, TimeUnit.SECONDS));
        Assert.assertEquals(2L, cMap.mapRecords.size());
        Assert.assertEquals(6L, concurrentHashMap.size());
        Assert.assertEquals(6L, queue.size());
        for (int i = 1; i < 7; i++) {
            Assert.assertEquals("Event" + i, queue.poll());
        }
        Assert.assertEquals(0L, cMap.size());
        Iterator it = cMap.mapRecords.values().iterator();
        while (it.hasNext()) {
            Assert.assertFalse(((Record) it.next()).isActive());
        }
    }
}
