package org.apache.activemq.artemis.protocol.amqp.federation.internal;

import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/artemis/protocol/amqp/federation/internal/FederationAddressPolicyManager.class */
public abstract class FederationAddressPolicyManager implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin {
    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    protected final ActiveMQServer server;
    protected final FederationInternal federation;
    protected final FederationReceiveFromAddressPolicy policy;
    protected final Map<String, FederationAddressEntry> demandTracking = new HashMap();
    protected final Map<DivertBinding, Set<QueueBinding>> divertsTracking = new HashMap();
    private volatile boolean started;

    public FederationAddressPolicyManager(FederationInternal federationInternal, FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy) throws ActiveMQException {
        Objects.requireNonNull(federationInternal, "The Federation instance cannot be null");
        Objects.requireNonNull(federationReceiveFromAddressPolicy, "The Address match policy cannot be null");
        this.federation = federationInternal;
        this.policy = federationReceiveFromAddressPolicy;
        this.server = federationInternal.getServer();
    }

    public synchronized void start() {
        if (this.started) {
            return;
        }
        this.started = true;
        handlePolicyManagerStarted(this.policy);
        this.server.registerBrokerPlugin(this);
        scanAllBindings();
    }

    public synchronized void stop() {
        if (this.started) {
            this.started = false;
            this.server.unRegisterBrokerPlugin(this);
            this.demandTracking.forEach((str, federationAddressEntry) -> {
                if (federationAddressEntry.hasConsumer()) {
                    federationAddressEntry.getConsumer().close();
                }
            });
            this.demandTracking.clear();
            this.divertsTracking.clear();
        }
    }

    public synchronized void afterRemoveAddress(SimpleString simpleString, AddressInfo addressInfo) throws ActiveMQException {
        FederationAddressEntry remove;
        if (this.started && (remove = this.demandTracking.remove(simpleString.toString())) != null && remove.hasConsumer()) {
            remove.getConsumer().close();
        }
    }

    public synchronized void afterRemoveBinding(Binding binding, Transaction transaction, boolean z) throws ActiveMQException {
        if (this.started) {
            if (binding instanceof QueueBinding) {
                FederationAddressEntry federationAddressEntry = this.demandTracking.get(binding.getAddress().toString());
                if (federationAddressEntry != null) {
                    tryRemoveDemandOnAddress(federationAddressEntry, binding);
                    return;
                } else {
                    if (this.policy.isEnableDivertBindings()) {
                        this.divertsTracking.entrySet().forEach(entry -> {
                            String simpleString = ((DivertBinding) entry.getKey()).getAddress().toString();
                            if (isAddressInDivertForwards(binding.getAddress(), ((DivertBinding) entry.getKey()).getDivert().getForwardAddress())) {
                                ((Set) entry.getValue()).remove(binding);
                                if (((Set) entry.getValue()).isEmpty()) {
                                    tryRemoveDemandOnAddress(this.demandTracking.get(simpleString), (Binding) entry.getKey());
                                }
                            }
                        });
                        return;
                    }
                    return;
                }
            }
            if (this.policy.isEnableDivertBindings() && (binding instanceof DivertBinding)) {
                DivertBinding divertBinding = (DivertBinding) binding;
                if (this.divertsTracking.remove(divertBinding) != null) {
                    try {
                        tryRemoveDemandOnAddress(this.demandTracking.get(divertBinding.getAddress().toString()), divertBinding);
                    } catch (Exception e) {
                        ActiveMQServerLogger.LOGGER.federationBindingsLookupError(divertBinding.getDivert().getForwardAddress(), e);
                    }
                }
            }
        }
    }

    protected final void tryRemoveDemandOnAddress(FederationAddressEntry federationAddressEntry, Binding binding) {
        if (federationAddressEntry != null) {
            federationAddressEntry.removeDemand(binding);
            logger.trace("Reducing demand on federated address {}, remaining demand? {}", federationAddressEntry.getAddress(), Boolean.valueOf(federationAddressEntry.hasDemand()));
            if (federationAddressEntry.hasDemand() || !federationAddressEntry.hasConsumer()) {
                return;
            }
            FederationConsumerInternal consumer = federationAddressEntry.getConsumer();
            try {
                signalBeforeCloseFederationConsumer(consumer);
                consumer.close();
                signalAfterCloseFederationConsumer(consumer);
                this.demandTracking.remove(federationAddressEntry.getAddress());
            } catch (Throwable th) {
                this.demandTracking.remove(federationAddressEntry.getAddress());
                throw th;
            }
        }
    }

    protected final void scanAllBindings() {
        this.server.getPostOffice().getAllBindings().filter(binding -> {
            return (binding instanceof QueueBinding) || (this.policy.isEnableDivertBindings() && (binding instanceof DivertBinding));
        }).forEach(binding2 -> {
            afterAddBinding(binding2);
        });
    }

    public synchronized void afterAddAddress(AddressInfo addressInfo, boolean z) {
        if (this.started && this.policy.isEnableDivertBindings() && this.policy.test(addressInfo)) {
            try {
                this.server.getPostOffice().getDirectBindings(addressInfo.getName()).stream().filter(binding -> {
                    return binding instanceof DivertBinding;
                }).forEach(this::checkBindingForMatch);
            } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.federationBindingsLookupError(addressInfo.getName(), e);
            }
        }
    }

    public synchronized void afterAddBinding(Binding binding) {
        if (this.started) {
            checkBindingForMatch(binding);
        }
    }

    protected final void checkBindingForMatch(Binding binding) {
        if (!(binding instanceof QueueBinding)) {
            if (binding instanceof DivertBinding) {
                reactIfAnyQueueBindingMatchesDivertTarget((DivertBinding) binding);
                return;
            }
            return;
        }
        QueueBinding queueBinding = (QueueBinding) binding;
        AddressInfo addressInfo = this.server.getPostOffice().getAddressInfo(binding.getAddress());
        if (!testIfAddressMatchesPolicy(addressInfo)) {
            reactIfQueueBindingMatchesAnyDivertTarget(queueBinding);
        } else {
            if (isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) {
                return;
            }
            createOrUpdateFederatedAddressConsumerForBinding(addressInfo, queueBinding);
        }
    }

    protected final void reactIfAnyQueueBindingMatchesDivertTarget(DivertBinding divertBinding) {
        if (this.policy.isEnableDivertBindings()) {
            AddressInfo addressInfo = this.server.getPostOffice().getAddressInfo(divertBinding.getAddress());
            if (testIfAddressMatchesPolicy(addressInfo) && this.divertsTracking.get(divertBinding) == null) {
                HashSet hashSet = new HashSet();
                this.divertsTracking.put(divertBinding, hashSet);
                SimpleString forwardAddress = divertBinding.getDivert().getForwardAddress();
                try {
                    for (SimpleString simpleString : forwardAddress.split(',')) {
                        this.server.getPostOffice().getBindingsForAddress(simpleString).getBindings().stream().filter(binding -> {
                            return binding instanceof QueueBinding;
                        }).map(binding2 -> {
                            return (QueueBinding) binding2;
                        }).forEach(queueBinding -> {
                            if (isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue()) || isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) {
                                return;
                            }
                            hashSet.add(queueBinding);
                            createOrUpdateFederatedAddressConsumerForBinding(addressInfo, divertBinding);
                        });
                    }
                } catch (Exception e) {
                    ActiveMQServerLogger.LOGGER.federationBindingsLookupError(forwardAddress, e);
                }
            }
        }
    }

    protected final void reactIfQueueBindingMatchesAnyDivertTarget(QueueBinding queueBinding) {
        if (this.policy.isEnableDivertBindings()) {
            SimpleString address = queueBinding.getAddress();
            this.divertsTracking.entrySet().forEach(entry -> {
                SimpleString forwardAddress = ((DivertBinding) entry.getKey()).getDivert().getForwardAddress();
                DivertBinding divertBinding = (DivertBinding) entry.getKey();
                if (((Set) entry.getValue()).contains(queueBinding) || !isAddressInDivertForwards(address, forwardAddress) || isPluginBlockingFederationConsumerCreate(divertBinding.getDivert(), queueBinding.getQueue()) || isPluginBlockingFederationConsumerCreate(queueBinding.getQueue())) {
                    return;
                }
                ((Set) entry.getValue()).add(queueBinding);
                createOrUpdateFederatedAddressConsumerForBinding(this.server.getPostOffice().getAddressInfo(divertBinding.getAddress()), divertBinding);
            });
        }
    }

    private static boolean isAddressInDivertForwards(SimpleString simpleString, SimpleString simpleString2) {
        for (SimpleString simpleString3 : simpleString2.split(',')) {
            if (simpleString.equals(simpleString3)) {
                return true;
            }
        }
        return false;
    }

    protected final void createOrUpdateFederatedAddressConsumerForBinding(AddressInfo addressInfo, Binding binding) {
        FederationAddressEntry federationAddressEntry;
        logger.trace("Federation Address Policy matched on for demand on address: {} : binding: {}", addressInfo, binding);
        String simpleString = addressInfo.getName().toString();
        if (this.demandTracking.containsKey(simpleString)) {
            federationAddressEntry = this.demandTracking.get(simpleString);
        } else {
            federationAddressEntry = new FederationAddressEntry(addressInfo);
            this.demandTracking.put(simpleString, federationAddressEntry);
        }
        federationAddressEntry.addDemand(binding);
        tryCreateFederationConsumerForAddress(federationAddressEntry);
    }

    private void tryCreateFederationConsumerForAddress(FederationAddressEntry federationAddressEntry) {
        AddressInfo addressInfo = federationAddressEntry.getAddressInfo();
        if (!federationAddressEntry.hasDemand() || federationAddressEntry.hasConsumer() || isPluginBlockingFederationConsumerCreate(addressInfo)) {
            return;
        }
        logger.trace("Federation Address Policy manager creating remote consumer for address: {}", addressInfo);
        FederationConsumerInfo createConsumerInfo = createConsumerInfo(addressInfo);
        FederationConsumerInternal createFederationConsumer = createFederationConsumer(createConsumerInfo);
        signalBeforeCreateFederationConsumer(createConsumerInfo);
        createFederationConsumer.setRemoteClosedHandler(federationConsumerInternal -> {
            synchronized (this) {
                try {
                    FederationAddressEntry federationAddressEntry2 = this.demandTracking.get(federationConsumerInternal.getConsumerInfo().getAddress());
                    if (federationAddressEntry2 != null) {
                        federationAddressEntry2.clearConsumer();
                    }
                    federationConsumerInternal.close();
                } catch (Throwable th) {
                    federationConsumerInternal.close();
                    throw th;
                }
            }
        });
        federationAddressEntry.setConsumer(createFederationConsumer);
        createFederationConsumer.start();
        signalAfterCreateFederationConsumer(createFederationConsumer);
    }

    public synchronized void afterRemoteAddressAdded(String str) throws Exception {
        if (this.started && testIfAddressMatchesPolicy(str, RoutingType.MULTICAST) && this.demandTracking.containsKey(str)) {
            tryCreateFederationConsumerForAddress(this.demandTracking.get(str));
        }
    }

    protected boolean testIfAddressMatchesPolicy(AddressInfo addressInfo) {
        return this.policy.test(addressInfo);
    }

    protected boolean testIfAddressMatchesPolicy(String str, RoutingType routingType) {
        return this.policy.test(str, routingType);
    }

    protected abstract void handlePolicyManagerStarted(FederationReceiveFromAddressPolicy federationReceiveFromAddressPolicy);

    protected abstract FederationConsumerInfo createConsumerInfo(AddressInfo addressInfo);

    protected FederationAddressEntry createConsumerEntry(AddressInfo addressInfo) {
        return new FederationAddressEntry(addressInfo);
    }

    protected abstract FederationConsumerInternal createFederationConsumer(FederationConsumerInfo federationConsumerInfo);

    protected abstract void signalBeforeCreateFederationConsumer(FederationConsumerInfo federationConsumerInfo);

    protected abstract void signalAfterCreateFederationConsumer(FederationConsumer federationConsumer);

    protected abstract void signalBeforeCloseFederationConsumer(FederationConsumer federationConsumer);

    protected abstract void signalAfterCloseFederationConsumer(FederationConsumer federationConsumer);

    protected abstract boolean isPluginBlockingFederationConsumerCreate(AddressInfo addressInfo);

    protected abstract boolean isPluginBlockingFederationConsumerCreate(Divert divert, Queue queue);

    protected abstract boolean isPluginBlockingFederationConsumerCreate(Queue queue);
}
