package io.moquette.spi.impl;

import io.moquette.BrokerConstants;
import io.moquette.interception.InterceptHandler;
import io.moquette.interception.Interceptor;
import io.moquette.interception.messages.InterceptAcknowledgedMessage;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptConnectionLostMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.moquette.interception.messages.InterceptSubscribeMessage;
import io.moquette.interception.messages.InterceptUnsubscribeMessage;
import io.moquette.logging.LoggingUtils;
import io.moquette.server.config.IConfig;
import io.moquette.spi.impl.subscriptions.Subscription;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/moquette/spi/impl/BrokerInterceptor.class */
final class BrokerInterceptor implements Interceptor {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) BrokerInterceptor.class);
    private final Map<Class<?>, List<InterceptHandler>> handlers;
    private final ExecutorService executor;

    private BrokerInterceptor(int i, List<InterceptHandler> list) {
        LOG.info("Initializing broker interceptor. InterceptorIds={}", LoggingUtils.getInterceptorIds(list));
        this.handlers = new HashMap();
        for (Class<?> cls : InterceptHandler.ALL_MESSAGE_TYPES) {
            this.handlers.put(cls, new CopyOnWriteArrayList());
        }
        Iterator<InterceptHandler> it = list.iterator();
        while (it.hasNext()) {
            addInterceptHandler(it.next());
        }
        this.executor = Executors.newFixedThreadPool(i);
    }

    BrokerInterceptor(List<InterceptHandler> list) {
        this(1, list);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BrokerInterceptor(IConfig iConfig, List<InterceptHandler> list) {
        this(Integer.parseInt(iConfig.getProperty(BrokerConstants.BROKER_INTERCEPTOR_THREAD_POOL_SIZE, "1")), list);
    }

    void stop() {
        LOG.info("Shutting down interceptor thread pool...");
        this.executor.shutdown();
        try {
            LOG.info("Waiting for thread pool tasks to terminate...");
            this.executor.awaitTermination(10L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
        }
        if (this.executor.isTerminated()) {
            return;
        }
        LOG.warn("Forcing shutdown of interceptor thread pool...");
        this.executor.shutdownNow();
    }

    @Override // io.moquette.interception.Interceptor
    public void notifyClientConnected(MqttConnectMessage mqttConnectMessage) {
        for (InterceptHandler interceptHandler : this.handlers.get(InterceptConnectMessage.class)) {
            LOG.debug("Sending MQTT CONNECT message to interceptor. CId={}, interceptorId={}", mqttConnectMessage.payload().clientIdentifier(), interceptHandler.getID());
            this.executor.execute(() -> {
                interceptHandler.onConnect(new InterceptConnectMessage(mqttConnectMessage));
            });
        }
    }

    @Override // io.moquette.interception.Interceptor
    public void notifyClientDisconnected(String str, String str2) {
        for (InterceptHandler interceptHandler : this.handlers.get(InterceptDisconnectMessage.class)) {
            LOG.debug("Notifying MQTT client disconnection to interceptor. CId={}, username={}, interceptorId={}", str, str2, interceptHandler.getID());
            this.executor.execute(() -> {
                interceptHandler.onDisconnect(new InterceptDisconnectMessage(str, str2));
            });
        }
    }

    @Override // io.moquette.interception.Interceptor
    public void notifyClientConnectionLost(String str, String str2) {
        for (InterceptHandler interceptHandler : this.handlers.get(InterceptConnectionLostMessage.class)) {
            LOG.debug("Notifying unexpected MQTT client disconnection to interceptor CId={}, username={}, interceptorId={}", str, str2, interceptHandler.getID());
            this.executor.execute(() -> {
                interceptHandler.onConnectionLost(new InterceptConnectionLostMessage(str, str2));
            });
        }
    }

    @Override // io.moquette.interception.Interceptor
    public void notifyTopicPublished(MqttPublishMessage mqttPublishMessage, String str, String str2) {
        int messageId = mqttPublishMessage.variableHeader().messageId();
        String str3 = mqttPublishMessage.variableHeader().topicName();
        for (InterceptHandler interceptHandler : this.handlers.get(InterceptPublishMessage.class)) {
            LOG.debug("Notifying MQTT PUBLISH message to interceptor. CId={}, messageId={}, topic={}, interceptorId={}", str, Integer.valueOf(messageId), str3, interceptHandler.getID());
            this.executor.execute(() -> {
                interceptHandler.onPublish(new InterceptPublishMessage(mqttPublishMessage, str, str2));
            });
        }
    }

    @Override // io.moquette.interception.Interceptor
    public void notifyTopicSubscribed(Subscription subscription, String str) {
        for (InterceptHandler interceptHandler : this.handlers.get(InterceptSubscribeMessage.class)) {
            LOG.debug("Notifying MQTT SUBSCRIBE message to interceptor. CId={}, topicFilter={}, interceptorId={}", subscription.getClientId(), subscription.getTopicFilter(), interceptHandler.getID());
            this.executor.execute(() -> {
                interceptHandler.onSubscribe(new InterceptSubscribeMessage(subscription, str));
            });
        }
    }

    @Override // io.moquette.interception.Interceptor
    public void notifyTopicUnsubscribed(String str, String str2, String str3) {
        for (InterceptHandler interceptHandler : this.handlers.get(InterceptUnsubscribeMessage.class)) {
            LOG.debug("Notifying MQTT UNSUBSCRIBE message to interceptor. CId={}, topic={}, interceptorId={}", str2, str, interceptHandler.getID());
            this.executor.execute(() -> {
                interceptHandler.onUnsubscribe(new InterceptUnsubscribeMessage(str, str2, str3));
            });
        }
    }

    @Override // io.moquette.interception.Interceptor
    public void notifyMessageAcknowledged(InterceptAcknowledgedMessage interceptAcknowledgedMessage) {
        for (InterceptHandler interceptHandler : this.handlers.get(InterceptAcknowledgedMessage.class)) {
            LOG.debug("Notifying MQTT ACK message to interceptor. CId={}, messageId={}, topic={}, interceptorId={}", interceptAcknowledgedMessage.getMsg().getClientID(), Integer.valueOf(interceptAcknowledgedMessage.getPacketID()), interceptAcknowledgedMessage.getTopic(), interceptHandler.getID());
            this.executor.execute(() -> {
                interceptHandler.onMessageAcknowledged(interceptAcknowledgedMessage);
            });
        }
    }

    @Override // io.moquette.interception.Interceptor
    public void addInterceptHandler(InterceptHandler interceptHandler) {
        Class<?>[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler);
        LOG.info("Adding MQTT message interceptor. InterceptorId={}, handledMessageTypes={}", interceptHandler.getID(), interceptedMessageTypes);
        for (Class<?> cls : interceptedMessageTypes) {
            this.handlers.get(cls).add(interceptHandler);
        }
    }

    @Override // io.moquette.interception.Interceptor
    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        Class<?>[] interceptedMessageTypes = getInterceptedMessageTypes(interceptHandler);
        LOG.info("Removing MQTT message interceptor. InterceptorId={}, handledMessageTypes={}", interceptHandler.getID(), interceptedMessageTypes);
        for (Class<?> cls : interceptedMessageTypes) {
            this.handlers.get(cls).remove(interceptHandler);
        }
    }

    private static Class<?>[] getInterceptedMessageTypes(InterceptHandler interceptHandler) {
        Class<?>[] interceptedMessageTypes = interceptHandler.getInterceptedMessageTypes();
        return interceptedMessageTypes == null ? InterceptHandler.ALL_MESSAGE_TYPES : interceptedMessageTypes;
    }
}
