package com.hazelcast.spi.impl.eventservice.impl;

import com.hazelcast.instance.EndpointQualifier;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.internal.metrics.MetricsProvider;
import com.hazelcast.internal.metrics.MetricsRegistry;
import com.hazelcast.internal.metrics.Probe;
import com.hazelcast.internal.metrics.ProbeLevel;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.InvocationUtil;
import com.hazelcast.internal.util.counters.MwCounter;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Address;
import com.hazelcast.nio.Connection;
import com.hazelcast.nio.Packet;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.EventFilter;
import com.hazelcast.spi.EventRegistration;
import com.hazelcast.spi.EventService;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.NodeEngineImpl;
import com.hazelcast.spi.impl.eventservice.InternalEventService;
import com.hazelcast.spi.impl.eventservice.impl.operations.DeregistrationOperationSupplier;
import com.hazelcast.spi.impl.eventservice.impl.operations.OnJoinRegistrationOperation;
import com.hazelcast.spi.impl.eventservice.impl.operations.RegistrationOperationSupplier;
import com.hazelcast.spi.impl.eventservice.impl.operations.SendEventOperation;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.spi.properties.HazelcastProperties;
import com.hazelcast.util.EmptyStatement;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.ThreadUtil;
import com.hazelcast.util.UuidUtil;
import com.hazelcast.util.executor.StripedExecutor;
import com.hazelcast.util.function.Supplier;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

/* loaded from: input_file:WEB-INF/lib/hazelcast-3.12.13.jar:com/hazelcast/spi/impl/eventservice/impl/EventServiceImpl.class */
public class EventServiceImpl implements InternalEventService, MetricsProvider {
    public static final String SERVICE_NAME = "hz:core:eventService";
    public static final String EVENT_SYNC_FREQUENCY_PROP = "hazelcast.event.sync.frequency";
    private static final EventRegistration[] EMPTY_REGISTRATIONS = new EventRegistration[0];
    private static final int EVENT_SYNC_FREQUENCY = 100000;
    private static final int SEND_RETRY_COUNT = 50;
    private static final int WARNING_LOG_FREQUENCY = 1000;
    private static final int MAX_RETRIES = 100;
    final ILogger logger;
    final NodeEngineImpl nodeEngine;
    private final ConcurrentMap<String, EventServiceSegment> segments;
    private final StripedExecutor eventExecutor;
    private final long eventQueueTimeoutMs;

    @Probe(name = "threadCount")
    private final int eventThreadCount;

    @Probe(name = "queueCapacity")
    private final int eventQueueCapacity;

    @Probe(name = "totalFailureCount")
    private final MwCounter totalFailures = MwCounter.newMwCounter();

    @Probe(name = "rejectedCount")
    private final MwCounter rejectedCount = MwCounter.newMwCounter();

    @Probe(name = "syncDeliveryFailureCount")
    private final MwCounter syncDeliveryFailureCount = MwCounter.newMwCounter();
    private final int sendEventSyncTimeoutMillis;
    private final InternalSerializationService serializationService;
    private final int eventSyncFrequency;

    public EventServiceImpl(NodeEngineImpl nodeEngineImpl) {
        this.nodeEngine = nodeEngineImpl;
        this.serializationService = (InternalSerializationService) nodeEngineImpl.getSerializationService();
        this.logger = nodeEngineImpl.getLogger(EventService.class.getName());
        HazelcastProperties properties = nodeEngineImpl.getProperties();
        this.eventThreadCount = properties.getInteger(GroupProperty.EVENT_THREAD_COUNT);
        this.eventQueueCapacity = properties.getInteger(GroupProperty.EVENT_QUEUE_CAPACITY);
        this.eventQueueTimeoutMs = properties.getMillis(GroupProperty.EVENT_QUEUE_TIMEOUT_MILLIS);
        this.sendEventSyncTimeoutMillis = properties.getInteger(GroupProperty.EVENT_SYNC_TIMEOUT_MILLIS);
        this.eventSyncFrequency = loadEventSyncFrequency();
        this.eventExecutor = new StripedExecutor(nodeEngineImpl.getNode().getLogger(EventServiceImpl.class), ThreadUtil.createThreadName(nodeEngineImpl.getHazelcastInstance().getName(), "event"), this.eventThreadCount, this.eventQueueCapacity);
        this.segments = new ConcurrentHashMap();
    }

    private static int loadEventSyncFrequency() {
        try {
            int parseInt = Integer.parseInt(System.getProperty(EVENT_SYNC_FREQUENCY_PROP));
            if (parseInt <= 0) {
                parseInt = 100000;
            }
            return parseInt;
        } catch (Exception e) {
            return 100000;
        }
    }

    @Override // com.hazelcast.internal.metrics.MetricsProvider
    public void provideMetrics(MetricsRegistry metricsRegistry) {
        metricsRegistry.scanAndRegister(this, "event");
    }

    @Override // com.hazelcast.spi.impl.eventservice.InternalEventService
    public void close(EventRegistration eventRegistration) {
        Object listener = ((Registration) eventRegistration).getListener();
        if (listener instanceof Closeable) {
            try {
                ((Closeable) listener).close();
            } catch (IOException e) {
                EmptyStatement.ignore(e);
            }
        }
    }

    @Override // com.hazelcast.spi.EventService
    public int getEventThreadCount() {
        return this.eventThreadCount;
    }

    @Override // com.hazelcast.spi.EventService
    public int getEventQueueCapacity() {
        return this.eventQueueCapacity;
    }

    @Override // com.hazelcast.spi.EventService
    @Probe(name = "eventQueueSize", level = ProbeLevel.MANDATORY)
    public int getEventQueueSize() {
        return this.eventExecutor.getWorkQueueSize();
    }

    @Probe(level = ProbeLevel.MANDATORY)
    private long eventsProcessed() {
        return this.eventExecutor.processedCount();
    }

    @Override // com.hazelcast.spi.EventService
    public EventRegistration registerLocalListener(String str, String str2, Object obj) {
        return registerListenerInternal(str, str2, TrueEventFilter.INSTANCE, obj, true);
    }

    @Override // com.hazelcast.spi.EventService
    public EventRegistration registerLocalListener(String str, String str2, EventFilter eventFilter, Object obj) {
        return registerListenerInternal(str, str2, eventFilter, obj, true);
    }

    @Override // com.hazelcast.spi.EventService
    public EventRegistration registerListener(String str, String str2, Object obj) {
        return registerListenerInternal(str, str2, TrueEventFilter.INSTANCE, obj, false);
    }

    @Override // com.hazelcast.spi.EventService
    public EventRegistration registerListener(String str, String str2, EventFilter eventFilter, Object obj) {
        return registerListenerInternal(str, str2, eventFilter, obj, false);
    }

    private EventRegistration registerListenerInternal(String str, String str2, EventFilter eventFilter, Object obj, boolean z) {
        if (obj == null) {
            throw new IllegalArgumentException("Listener required!");
        }
        if (eventFilter == null) {
            throw new IllegalArgumentException("EventFilter required!");
        }
        EventServiceSegment segment = getSegment(str, true);
        Registration registration = new Registration(UuidUtil.newUnsecureUuidString(), str, str2, eventFilter, this.nodeEngine.getThisAddress(), obj, z);
        if (!segment.addRegistration(str2, registration)) {
            return null;
        }
        if (!z) {
            invokeOnAllMembers(new RegistrationOperationSupplier(registration, this.nodeEngine.getClusterService()));
        }
        return registration;
    }

    public boolean handleRegistration(Registration registration) {
        if (this.nodeEngine.getThisAddress().equals(registration.getSubscriber())) {
            return false;
        }
        return getSegment(registration.getServiceName(), true).addRegistration(registration.getTopic(), registration);
    }

    @Override // com.hazelcast.spi.EventService
    public boolean deregisterListener(String str, String str2, Object obj) {
        EventServiceSegment segment = getSegment(str, false);
        if (segment == null) {
            return false;
        }
        Registration removeRegistration = segment.removeRegistration(str2, String.valueOf(obj));
        if (removeRegistration != null && !removeRegistration.isLocalOnly()) {
            invokeOnAllMembers(new DeregistrationOperationSupplier(removeRegistration, this.nodeEngine.getClusterService()));
        }
        return removeRegistration != null;
    }

    private void invokeOnAllMembers(Supplier<Operation> supplier) {
        try {
            InvocationUtil.invokeOnStableClusterSerial(this.nodeEngine, supplier, 100).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw ExceptionUtil.rethrow(e);
        } catch (ExecutionException e2) {
            throw ExceptionUtil.rethrow(e2);
        }
    }

    @Override // com.hazelcast.spi.EventService
    public void deregisterAllListeners(String str, String str2) {
        EventServiceSegment segment = getSegment(str, false);
        if (segment != null) {
            segment.removeRegistrations(str2);
        }
    }

    public StripedExecutor getEventExecutor() {
        return this.eventExecutor;
    }

    @Override // com.hazelcast.spi.EventService
    public EventRegistration[] getRegistrationsAsArray(String str, String str2) {
        EventServiceSegment segment = getSegment(str, false);
        if (segment == null) {
            return EMPTY_REGISTRATIONS;
        }
        Collection<Registration> registrations = segment.getRegistrations(str2, false);
        return (registrations == null || registrations.isEmpty()) ? EMPTY_REGISTRATIONS : (EventRegistration[]) registrations.toArray(new Registration[0]);
    }

    @Override // com.hazelcast.spi.EventService
    public Collection<EventRegistration> getRegistrations(String str, String str2) {
        EventServiceSegment segment = getSegment(str, false);
        if (segment == null) {
            return Collections.emptySet();
        }
        Collection<Registration> registrations = segment.getRegistrations(str2, false);
        return (registrations == null || registrations.isEmpty()) ? Collections.emptySet() : Collections.unmodifiableCollection(registrations);
    }

    @Override // com.hazelcast.spi.EventService
    public boolean hasEventRegistration(String str, String str2) {
        EventServiceSegment segment = getSegment(str, false);
        if (segment == null) {
            return false;
        }
        return segment.hasRegistration(str2);
    }

    @Override // com.hazelcast.spi.EventService
    public void publishEvent(String str, String str2, Object obj, int i) {
        publishEvent(str, getRegistrations(str, str2), obj, i);
    }

    @Override // com.hazelcast.spi.EventService
    public void publishEvent(String str, EventRegistration eventRegistration, Object obj, int i) {
        if (!(eventRegistration instanceof Registration)) {
            throw new IllegalArgumentException();
        }
        if (isLocal(eventRegistration)) {
            executeLocal(str, obj, eventRegistration, i);
        } else {
            sendEvent(eventRegistration.getSubscriber(), new EventEnvelope(eventRegistration.getId(), str, obj), i);
        }
    }

    @Override // com.hazelcast.spi.EventService
    public void publishEvent(String str, Collection<EventRegistration> collection, Object obj, int i) {
        Data data = null;
        for (EventRegistration eventRegistration : collection) {
            if (!(eventRegistration instanceof Registration)) {
                throw new IllegalArgumentException();
            }
            if (isLocal(eventRegistration)) {
                executeLocal(str, obj, eventRegistration, i);
            } else {
                if (data == null) {
                    data = this.serializationService.toData(obj);
                }
                sendEvent(eventRegistration.getSubscriber(), new EventEnvelope(eventRegistration.getId(), str, data), i);
            }
        }
    }

    @Override // com.hazelcast.spi.EventService
    public void publishRemoteEvent(String str, Collection<EventRegistration> collection, Object obj, int i) {
        if (collection.isEmpty()) {
            return;
        }
        Data data = this.serializationService.toData(obj);
        for (EventRegistration eventRegistration : collection) {
            if (!(eventRegistration instanceof Registration)) {
                throw new IllegalArgumentException();
            }
            if (!isLocal(eventRegistration)) {
                sendEvent(eventRegistration.getSubscriber(), new EventEnvelope(eventRegistration.getId(), str, data), i);
            }
        }
    }

    private void executeLocal(String str, Object obj, EventRegistration eventRegistration, int i) {
        if (this.nodeEngine.isRunning()) {
            Registration registration = (Registration) eventRegistration;
            try {
                if (registration.getListener() != null) {
                    this.eventExecutor.execute(new LocalEventDispatcher(this, str, obj, registration.getListener(), i, this.eventQueueTimeoutMs));
                } else {
                    this.logger.warning("Something seems wrong! Listener instance is null! -> " + registration);
                }
            } catch (RejectedExecutionException e) {
                this.rejectedCount.inc();
                if (this.eventExecutor.isLive()) {
                    logFailure("EventQueue overloaded! %s failed to publish to %s:%s", obj, registration.getServiceName(), registration.getTopic());
                }
            }
        }
    }

    private void sendEvent(Address address, EventEnvelope eventEnvelope, int i) {
        String serviceName = eventEnvelope.getServiceName();
        if (!(getSegment(serviceName, true).incrementPublish() % ((long) this.eventSyncFrequency) == 0)) {
            if (this.nodeEngine.getNode().getNetworkingService().getEndpointManager(EndpointQualifier.MEMBER).transmit(new Packet(this.serializationService.toBytes(eventEnvelope), i).setPacketType(Packet.Type.EVENT), address) || !this.nodeEngine.isRunning()) {
                return;
            }
            logFailure("Failed to send event packet to: %s, connection might not be alive.", address);
            return;
        }
        try {
            this.nodeEngine.getOperationService().createInvocationBuilder(serviceName, new SendEventOperation(eventEnvelope, i), address).setTryCount(50).invoke().get(this.sendEventSyncTimeoutMillis, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            this.syncDeliveryFailureCount.inc();
            if (this.logger.isFinestEnabled()) {
                this.logger.finest("Sync event delivery failed. Event: " + eventEnvelope, e);
            }
        }
    }

    public EventServiceSegment getSegment(String str, boolean z) {
        EventServiceSegment eventServiceSegment = this.segments.get(str);
        if (eventServiceSegment == null && z) {
            EventServiceSegment eventServiceSegment2 = new EventServiceSegment(str, this.nodeEngine.getService(str));
            EventServiceSegment putIfAbsent = this.segments.putIfAbsent(str, eventServiceSegment2);
            if (putIfAbsent == null) {
                eventServiceSegment = eventServiceSegment2;
                this.nodeEngine.getMetricsRegistry().scanAndRegister(eventServiceSegment2, "event.[" + str + "]");
            } else {
                eventServiceSegment = putIfAbsent;
            }
        }
        return eventServiceSegment;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isLocal(EventRegistration eventRegistration) {
        return this.nodeEngine.getThisAddress().equals(eventRegistration.getSubscriber());
    }

    @Override // com.hazelcast.spi.EventService
    public void executeEventCallback(Runnable runnable) {
        if (this.nodeEngine.isRunning()) {
            try {
                this.eventExecutor.execute(runnable);
            } catch (RejectedExecutionException e) {
                this.rejectedCount.inc();
                if (this.eventExecutor.isLive()) {
                    logFailure("EventQueue overloaded! Failed to execute event callback: %s", runnable);
                }
            }
        }
    }

    @Override // com.hazelcast.util.function.Consumer
    public void accept(Packet packet) {
        try {
            this.eventExecutor.execute(new RemoteEventProcessor(this, packet));
        } catch (RejectedExecutionException e) {
            this.rejectedCount.inc();
            if (this.eventExecutor.isLive()) {
                Connection conn = packet.getConn();
                logFailure("EventQueue overloaded! Failed to process event packet sent from: %s", conn.getEndPoint() != null ? conn.getEndPoint().toString() : conn.toString());
            }
        }
    }

    @Override // com.hazelcast.spi.PreJoinAwareService
    public Operation getPreJoinOperation() {
        return getOnJoinRegistrationOperation();
    }

    @Override // com.hazelcast.spi.PostJoinAwareService
    public Operation getPostJoinOperation() {
        if (this.nodeEngine.getClusterService().isMaster()) {
            return null;
        }
        return getOnJoinRegistrationOperation();
    }

    private OnJoinRegistrationOperation getOnJoinRegistrationOperation() {
        LinkedList linkedList = new LinkedList();
        Iterator<EventServiceSegment> it = this.segments.values().iterator();
        while (it.hasNext()) {
            it.next().collectRemoteRegistrations(linkedList);
        }
        if (linkedList.isEmpty()) {
            return null;
        }
        return new OnJoinRegistrationOperation(linkedList);
    }

    public void shutdown() {
        this.logger.finest("Stopping event executor...");
        this.eventExecutor.shutdown();
        Iterator<EventServiceSegment> it = this.segments.values().iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
        this.segments.clear();
    }

    public void onMemberLeft(MemberImpl memberImpl) {
        Address address = memberImpl.getAddress();
        Iterator<EventServiceSegment> it = this.segments.values().iterator();
        while (it.hasNext()) {
            it.next().onMemberLeft(address);
        }
    }

    private void logFailure(String str, Object... objArr) {
        this.totalFailures.inc();
        Level level = this.totalFailures.get() % 1000 == 0 ? Level.WARNING : Level.FINEST;
        if (this.logger.isLoggable(level)) {
            this.logger.log(level, String.format(str, objArr));
        }
    }
}
