/*
 * Decompiled with CFR 0.152.
 */
package org.alfresco.solr.tracker;

import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.Semaphore;
import java.util.stream.Collectors;
import org.alfresco.httpclient.AuthenticationException;
import org.alfresco.solr.InformationServer;
import org.alfresco.solr.client.NodeMetaData;
import org.alfresco.solr.client.SOLRAPIClient;
import org.alfresco.solr.client.Transaction;
import org.alfresco.solr.tracker.AbstractWorker;
import org.alfresco.solr.tracker.ActivatableTracker;
import org.alfresco.solr.tracker.ModelTracker;
import org.alfresco.solr.tracker.Tracker;
import org.alfresco.solr.utils.Utils;
import org.json.JSONException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CascadeTracker
extends ActivatableTracker {
    protected static final Logger LOGGER = LoggerFactory.getLogger(CascadeTracker.class);
    private static final int DEFAULT_CASCADE_TRACKER_MAX_PARALLELISM = 32;
    private static final int DEFAULT_CASCADE_NODE_BATCH_SIZE = 10;
    private static Map<String, Semaphore> RUN_LOCK_BY_CORE = new ConcurrentHashMap<String, Semaphore>();
    private static Map<String, Semaphore> WRITE_LOCK_BY_CORE = new ConcurrentHashMap<String, Semaphore>();
    private int cascadeBatchSize;
    private ForkJoinPool forkJoinPool;
    private int cascadeTrackerParallelism;

    @Override
    public Semaphore getWriteLock() {
        return WRITE_LOCK_BY_CORE.get(this.coreName);
    }

    @Override
    public Semaphore getRunLock() {
        return RUN_LOCK_BY_CORE.get(this.coreName);
    }

    public CascadeTracker(Properties p, SOLRAPIClient client, String coreName, InformationServer informationServer) {
        super(p, client, coreName, informationServer, Tracker.Type.CASCADE);
        this.cascadeTrackerParallelism = Integer.parseInt(p.getProperty("alfresco.cascade.tracker.maxParallelism", String.valueOf(32)));
        this.cascadeBatchSize = Integer.parseInt(p.getProperty("alfresco.cascade.tracker.nodeBatchSize", String.valueOf(10)));
        this.forkJoinPool = new ForkJoinPool(this.cascadeTrackerParallelism);
        RUN_LOCK_BY_CORE.put(coreName, new Semaphore(1, true));
        WRITE_LOCK_BY_CORE.put(coreName, new Semaphore(1, true));
    }

    CascadeTracker() {
        super(Tracker.Type.CASCADE);
    }

    @Override
    protected void doTrack(String iterationId) throws IOException, JSONException {
        ModelTracker modelTracker = this.infoSrv.getAdminHandler().getTrackerRegistry().getModelTracker();
        if (modelTracker != null && modelTracker.hasModels()) {
            this.trackRepository(iterationId);
        }
    }

    @Override
    public void maintenance() {
    }

    @Override
    public boolean hasMaintenance() {
        return false;
    }

    private void trackRepository(String iterationId) throws IOException, JSONException {
        this.checkShutdown();
        this.processCascades(iterationId);
    }

    private void updateTransactionsAfterWorker(List<Transaction> txsIndexed) throws IOException {
        for (Transaction tx : txsIndexed) {
            this.infoSrv.updateTransaction(tx);
        }
    }

    @Override
    public void invalidateState() {
        super.invalidateState();
        this.infoSrv.setCleanCascadeTxnFloor(-1L);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void processCascades(String iterationId) throws IOException {
        int num = 50;
        List<Transaction> txBatch = null;
        long totalUpdatedDocs = 0L;
        do {
            try {
                this.getWriteLock().acquire();
                txBatch = this.infoSrv.getCascades(num);
                if (txBatch.size() > 0) {
                    LOGGER.info("{}-[CORE {}] Found {} transactions, transactions from {} to {}", new Object[]{Thread.currentThread().getId(), this.coreName, txBatch.size(), txBatch.get(0), txBatch.get(txBatch.size() - 1)});
                } else {
                    LOGGER.info("{}-[CORE {}] No transaction found", (Object)Thread.currentThread().getId(), (Object)this.coreName);
                }
                if (txBatch.size() == 0) {
                    return;
                }
                ArrayList<Long> txIds = new ArrayList<Long>();
                HashSet<Long> txIdSet = new HashSet<Long>();
                for (Transaction tx : txBatch) {
                    txIds.add(tx.getId());
                    txIdSet.add(tx.getId());
                }
                List<NodeMetaData> nodeMetaDatas = this.infoSrv.getCascadeNodes(txIds);
                Integer processedCascades = 0;
                if (nodeMetaDatas.size() > 0) {
                    List nodeBatches = Lists.partition(nodeMetaDatas, (int)this.cascadeBatchSize);
                    processedCascades = (Integer)((ForkJoinTask)this.forkJoinPool.submit(() -> nodeBatches.parallelStream().map(batch -> {
                        CascadeIndexWorker worker = new CascadeIndexWorker((List<NodeMetaData>)batch, this.infoSrv);
                        worker.run();
                        if (LOGGER.isTraceEnabled()) {
                            String nodes = Utils.notNullOrEmpty(batch).stream().map(NodeMetaData::getId).map(Object::toString).collect(Collectors.joining(","));
                            LOGGER.trace("[{} / {} / {} / {}] Worker has been created for nodes {}", new Object[]{this.coreName, this.trackerId, iterationId, worker.hashCode(), nodes});
                        }
                        return batch.size();
                    }).reduce(0, Integer::sum))).get();
                }
                this.updateTransactionsAfterWorker(txBatch);
                totalUpdatedDocs += (long)processedCascades.intValue();
            }
            catch (AuthenticationException e) {
                throw new IOException(e);
            }
            catch (JSONException e) {
                throw new IOException(e);
            }
            catch (InterruptedException e) {
                throw new IOException(e);
            }
            catch (ExecutionException e) {
                e.printStackTrace();
            }
            finally {
                this.getWriteLock().release();
            }
        } while (txBatch.size() > 0);
        LOGGER.info("{}-[CORE {}] Updated {} DOCs", new Object[]{Thread.currentThread().getId(), this.coreName, totalUpdatedDocs});
    }

    class CascadeIndexWorker
    extends AbstractWorker {
        InformationServer infoServer;
        List<NodeMetaData> nodes;

        CascadeIndexWorker(List<NodeMetaData> nodes, InformationServer infoServer) {
            this.infoServer = infoServer;
            this.nodes = nodes;
        }

        @Override
        protected void doWork() throws IOException, AuthenticationException, JSONException {
            this.infoServer.cascadeNodes(this.nodes, true);
        }

        @Override
        protected void onFail(Throwable failCausedBy) {
            CascadeTracker.this.setRollback(true, failCausedBy);
        }
    }
}

