ARTEMIS-5157 Add management for AMQP broker connections

Add basic management views for AMQP broker connections and implement control
types for AMQP federation features along with the broker connection management
views. Some initial work also to provide support for other broker connection
features to add management control and also plan for future views of incoming
broker connections and management of AMQP federation resources.
This commit is contained in:
Timothy Bish 2024-11-19 12:53:49 -05:00 committed by Robbie Gemmell
parent 8afa3d8a41
commit e9c06bd9f8
36 changed files with 2486 additions and 452 deletions

View File

@ -2744,4 +2744,61 @@ public interface AuditLogger {
@LogMessage(id = 601781, value = "User {} is getting authorization failure count on target resource: {}", level = LogMessage.Level.INFO)
void getAuthorizationFailureCount(String user, Object source);
static void isConnected(Object source) {
BASE_LOGGER.isConnected(getCaller(), source);
}
@LogMessage(id = 601782, value = "User {} is getting connected state on target resource: {}", level = LogMessage.Level.INFO)
void isConnected(String user, Object source);
static void getUri(Object source) {
BASE_LOGGER.getUri(getCaller(), source);
}
@LogMessage(id = 601783, value = "User {} is getting URI on target resource: {}", level = LogMessage.Level.INFO)
void getUri(String user, Object source);
static void getProtocol(Object source) {
BASE_LOGGER.getProtocol(getCaller(), source);
}
@LogMessage(id = 601784, value = "User {} is getting the protocol on target resource: {}", level = LogMessage.Level.INFO)
void getProtocol(String user, Object source);
static void getType(Object source) {
BASE_LOGGER.getType(getCaller(), source);
}
@LogMessage(id = 601785, value = "User {} is getting the type on target resource: {}", level = LogMessage.Level.INFO)
void getType(String user, Object source);
static void getRole(Object source) {
BASE_LOGGER.getRole(getCaller(), source);
}
@LogMessage(id = 601786, value = "User {} is getting the role on target resource: {}", level = LogMessage.Level.INFO)
void getRole(String user, Object source);
static void getFqqn(Object source) {
BASE_LOGGER.getFqqn(getCaller(), source);
}
@LogMessage(id = 601787, value = "User {} is getting the FQQN on target resource: {}", level = LogMessage.Level.INFO)
void getFqqn(String user, Object source);
static void getPriority(Object source) {
BASE_LOGGER.getPriority(getCaller(), source);
}
@LogMessage(id = 601788, value = "User {} is getting the priority on target resource: {}", level = LogMessage.Level.INFO)
void getPriority(String user, Object source);
static void getMessagesReceived(Object source) {
BASE_LOGGER.getMessagesReceived(getCaller(), source);
}
@LogMessage(id = 601789, value = "User {} is getting the number of messages received on target resource: {}", level = LogMessage.Level.INFO)
void getMessagesReceived(String user, Object source);
}

View File

@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.api.core.management;
/**
* An API for a BrokerConnectionControl object that is used to view and manage BrokerConnection state.
*/
public interface BrokerConnectionControl extends ActiveMQComponentControl {
/**
* Returns if this broker connection is currently connected to the remote.
*/
@Attribute(desc = "whether this broker connection is currently connected to the remote")
boolean isConnected();
/**
* Returns the name of this broker connection
*/
@Attribute(desc = "name of this broker connection")
String getName();
/**
* Returns the connection URI for this broker connection.
*/
@Attribute(desc = "connection URI for this broker connection")
String getUri();
/**
* Returns the user this broker connection is using.
*/
@Attribute(desc = "the user this broker connection is using")
String getUser();
/**
* Returns the wire protocol this broker connection is using.
*/
@Attribute(desc = "the wire protocol this broker connection is using")
String getProtocol();
/**
* Returns the retry interval configured for this broker connection.
*/
@Attribute(desc = "Configured retry interval of this broker connection")
long getRetryInterval();
/**
* Returns the number of reconnection attempts configured for this broker connection.
*/
@Attribute(desc = "Configured number of reconnection attempts of this broker connection")
int getReconnectAttempts();
}

View File

@ -126,6 +126,29 @@ public final class ObjectNameBuilder {
return createObjectName("broadcast-group", name);
}
/**
* Returns the ObjectName used by broker connection management objects for outgoing connections.
*
* @see BrokerConnectionControl
*/
public ObjectName getBrokerConnectionObjectName(String name) throws Exception {
return ObjectName.getInstance(getBrokerConnectionBaseObjectNameString(name));
}
/**
* Returns the base ObjectName string used by for outgoing broker connections and possibly broker connection
* services that are registered by broker connection specific features. This value is pre-quoted and ready
* for use in an ObjectName.getIstnace call but is intended for use by broker connection components to create
* names specific to a broker connection feature that registers its own object for management.
*
* @return the base object name string that is used for broker connection Object names.
*
* @see BrokerConnectionControl
*/
public String getBrokerConnectionBaseObjectNameString(String name) throws Exception {
return getActiveMQServerName() + ",component=broker-connections,name=" + ObjectName.quote(name);
}
/**
* Returns the ObjectName used by BridgeControl.
*

View File

@ -34,6 +34,8 @@ public final class ResourceNames {
public static final String ADDRESS = "address.";
public static final String BROKER_CONNECTION = "brokerconnection.";
public static final String BRIDGE = "bridge.";
public static final String ACCEPTOR = "acceptor.";

View File

@ -194,6 +194,17 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
return brokerConnectConfiguration;
}
@Override
public boolean isConnected() {
final NettyConnection connection = this.connection;
if (connection != null) {
return connection.isOpen();
} else {
return false;
}
}
@Override
public boolean isStarted() {
return started;
@ -208,49 +219,58 @@ public class AMQPBrokerConnection implements ClientConnectionLifeCycleListener,
}
@Override
public void stop() {
if (!started) return;
started = false;
if (protonRemotingConnection != null) {
protonRemotingConnection.fail(new ActiveMQException("Stopping Broker Connection"));
protonRemotingConnection = null;
connection = null;
}
ScheduledFuture<?> scheduledFuture = reconnectFuture;
reconnectFuture = null;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
if (brokerFederation != null) {
try {
brokerFederation.stop();
} catch (ActiveMQException e) {
public synchronized void stop() {
if (started) {
started = false;
if (protonRemotingConnection != null) {
protonRemotingConnection.fail(new ActiveMQException("Stopping Broker Connection"));
protonRemotingConnection = null;
connection = null;
}
final ScheduledFuture<?> scheduledFuture = reconnectFuture;
reconnectFuture = null;
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
if (brokerFederation != null) {
try {
brokerFederation.stop();
} catch (ActiveMQException e) {
logger.debug("Error caught while stopping federation instance.", e);
} finally {
brokerFederation = null;
}
}
}
}
@Override
public void start() throws Exception {
if (started) return;
started = true;
server.getConfiguration().registerBrokerPlugin(this);
try {
if (brokerConnectConfiguration != null && brokerConnectConfiguration.getConnectionElements() != null) {
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
final AMQPBrokerConnectionAddressType elementType = connectionElement.getType();
public synchronized void start() throws Exception {
if (!started) {
started = true;
if (elementType == AMQPBrokerConnectionAddressType.MIRROR) {
installMirrorController((AMQPMirrorBrokerConnectionElement) connectionElement, server);
} else if (elementType == AMQPBrokerConnectionAddressType.FEDERATION) {
installFederation((AMQPFederatedBrokerConnectionElement) connectionElement, server);
server.getConfiguration().registerBrokerPlugin(this);
try {
if (brokerConnectConfiguration != null && brokerConnectConfiguration.getConnectionElements() != null) {
for (AMQPBrokerConnectionElement connectionElement : brokerConnectConfiguration.getConnectionElements()) {
final AMQPBrokerConnectionAddressType elementType = connectionElement.getType();
if (elementType == AMQPBrokerConnectionAddressType.MIRROR) {
installMirrorController((AMQPMirrorBrokerConnectionElement) connectionElement, server);
} else if (elementType == AMQPBrokerConnectionAddressType.FEDERATION) {
installFederation((AMQPFederatedBrokerConnectionElement) connectionElement, server);
}
}
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
return;
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
return;
connectExecutor.execute(() -> doConnect());
}
connectExecutor.execute(() -> doConnect());
}
public ActiveMQServer getServer() {

View File

@ -89,6 +89,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
AMQPBrokerConnection amqpBrokerConnection = new AMQPBrokerConnection(this, configuration, protonProtocolManagerFactory, server);
amqpBrokerConnections.put(configuration.getName(), amqpBrokerConnection);
server.registerBrokerConnection(amqpBrokerConnection);
server.getManagementService().registerBrokerConnection(amqpBrokerConnection);
if (start) {
amqpBrokerConnection.start();
@ -142,6 +143,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
connection.stop();
} finally {
server.unregisterBrokerConnection(connection);
server.getManagementService().unregisterBrokerConnection(connection.getName());
}
}
@ -183,6 +185,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
connection.stop();
} finally {
server.unregisterBrokerConnection(connection);
server.getManagementService().unregisterBrokerConnection(connection.getName());
}
}
}
@ -211,6 +214,7 @@ public class AMQPBrokerConnectionManager implements ActiveMQComponent, ClientCon
connection.stop();
} finally {
server.unregisterBrokerConnection(connection);
server.getManagementService().unregisterBrokerConnection(connection.getName());
}
}
} finally {

View File

@ -34,9 +34,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationAddressPolicyManager;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationInternal;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQueuePolicyManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.qpid.proton.engine.Link;
@ -63,13 +61,14 @@ public abstract class AMQPFederation implements FederationInternal {
private static final WildcardConfiguration DEFAULT_WILDCARD_CONFIGURATION = new WildcardConfiguration();
// Local policies that should be matched against demand on local addresses and queues.
protected final Map<String, FederationQueuePolicyManager> queueMatchPolicies = new ConcurrentHashMap<>();
protected final Map<String, FederationAddressPolicyManager> addressMatchPolicies = new ConcurrentHashMap<>();
protected final Map<String, AMQPFederationQueuePolicyManager> queueMatchPolicies = new ConcurrentHashMap<>();
protected final Map<String, AMQPFederationAddressPolicyManager> addressMatchPolicies = new ConcurrentHashMap<>();
protected final Map<String, Predicate<Link>> linkClosedinterceptors = new ConcurrentHashMap<>();
protected final WildcardConfiguration wildcardConfiguration;
protected final ScheduledExecutorService scheduler;
protected final String brokerConnectionName;
protected final String name;
protected final ActiveMQServer server;
@ -83,10 +82,11 @@ public abstract class AMQPFederation implements FederationInternal {
protected boolean started;
protected volatile boolean connected;
public AMQPFederation(String name, ActiveMQServer server) {
public AMQPFederation(String brokerConnectionName, String name, ActiveMQServer server) {
Objects.requireNonNull(name, "Federation name cannot be null");
Objects.requireNonNull(server, "Provided server instance cannot be null");
this.brokerConnectionName = brokerConnectionName;
this.name = name;
this.server = server;
this.scheduler = server.getScheduledPool();
@ -260,7 +260,7 @@ public abstract class AMQPFederation implements FederationInternal {
* @throws ActiveMQException if an error occurs processing the added policy
*/
public synchronized AMQPFederation addQueueMatchPolicy(FederationReceiveFromQueuePolicy queuePolicy) throws ActiveMQException {
final FederationQueuePolicyManager manager = new AMQPFederationQueuePolicyManager(this, queuePolicy);
final AMQPFederationQueuePolicyManager manager = new AMQPFederationQueuePolicyManager(this, queuePolicy);
queueMatchPolicies.put(queuePolicy.getPolicyName(), manager);
@ -286,7 +286,7 @@ public abstract class AMQPFederation implements FederationInternal {
* @throws ActiveMQException if an error occurs processing the added policy
*/
public synchronized AMQPFederation addAddressMatchPolicy(FederationReceiveFromAddressPolicy addressPolicy) throws ActiveMQException {
final FederationAddressPolicyManager manager = new AMQPFederationAddressPolicyManager(this, addressPolicy);
final AMQPFederationAddressPolicyManager manager = new AMQPFederationAddressPolicyManager(this, addressPolicy);
addressMatchPolicies.put(addressPolicy.getPolicyName(), manager);
@ -479,4 +479,64 @@ public abstract class AMQPFederation implements FederationInternal {
ActiveMQServerLogger.LOGGER.federationPluginExecutionError("federationStopped", t);
}
}
/*
* This section contains internal management support APIs for resources managed by this
* Federation instance. The resources that are managed by a federation source or target
* call into this batch of API to add and remove themselves into management which allows
* the given federation source or target the control over how the resources are represented
* in the management hierarchy.
*
* NOTE: Currently the fact that broker connection name is null indicates that the resource
* is from a remote broker connection and no management types are registered. This should
* be improved upon when remote broker connection management registration is implemented.
*/
void registerAddressPolicyManagement(AMQPFederationAddressPolicyManager manager) throws Exception {
if (brokerConnectionName != null) {
AMQPFederationManagementSupport.registerAddressPolicyControl(brokerConnectionName, manager);
}
}
void unregisterAddressPolicyManagement(AMQPFederationAddressPolicyManager manager) throws Exception {
if (brokerConnectionName != null) {
AMQPFederationManagementSupport.unregisterAddressPolicyControl(brokerConnectionName, manager);
}
}
void registerAddressConsumerManagement(AMQPFederationAddressPolicyManager manager, AMQPFederationAddressConsumer consumer) throws Exception {
if (brokerConnectionName != null) {
AMQPFederationManagementSupport.registerAddressConsumerControl(brokerConnectionName, manager, consumer);
}
}
void unregisterAddressConsumerManagement(AMQPFederationAddressPolicyManager manager, AMQPFederationAddressConsumer consumer) throws Exception {
if (brokerConnectionName != null) {
AMQPFederationManagementSupport.unregisterAddressConsumerControl(brokerConnectionName, manager, consumer);
}
}
void registerQueuePolicyManagement(AMQPFederationQueuePolicyManager manager) throws Exception {
if (brokerConnectionName != null) {
AMQPFederationManagementSupport.registerQueuePolicyControl(brokerConnectionName, manager);
}
}
void unregisterQueuePolicyManagement(AMQPFederationQueuePolicyManager manager) throws Exception {
if (brokerConnectionName != null) {
AMQPFederationManagementSupport.unregisterQueuePolicyControl(brokerConnectionName, manager);
}
}
void registerQueueConsumerManagement(AMQPFederationQueuePolicyManager manager, AMQPFederationQueueConsumer consumer) throws Exception {
if (brokerConnectionName != null) {
AMQPFederationManagementSupport.registerQueueConsumerControl(brokerConnectionName, manager, consumer);
}
}
void unregisterQueueConsumerManagement(AMQPFederationQueuePolicyManager manager, AMQPFederationQueueConsumer consumer) throws Exception {
if (brokerConnectionName != null) {
AMQPFederationManagementSupport.unregisterQueueConsumerControl(brokerConnectionName, manager, consumer);
}
}
}

View File

@ -26,10 +26,6 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DETACH_FORCED;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NOT_FOUND;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.RESOURCE_DELETED;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.HashMap;
@ -38,8 +34,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.BiConsumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
@ -55,10 +50,8 @@ import org.apache.activemq.artemis.protocol.amqp.broker.AMQPMessage;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
@ -71,11 +64,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
@ -83,7 +72,6 @@ import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -93,7 +81,7 @@ import org.slf4j.LoggerFactory;
* AMQP peer and forwards those messages onto the internal broker Address for
* consumption by an attached consumers.
*/
public class AMQPFederationAddressConsumer implements FederationConsumerInternal {
public class AMQPFederationAddressConsumer extends AMQPFederationConsumer {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@ -106,32 +94,20 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
// arrives resulting in an unintended link stealing scenario in the proton engine.
private static final AtomicLong LINK_SEQUENCE_ID = new AtomicLong();
private static final Symbol[] DEFAULT_OUTCOMES = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
private final AMQPFederation federation;
private final AMQPFederationConsumerConfiguration configuration;
private final FederationConsumerInfo consumerInfo;
private final AMQPFederationAddressPolicyManager manager;
private final FederationReceiveFromAddressPolicy policy;
private final AMQPConnectionContext connection;
private final AMQPSessionContext session;
private final Predicate<Link> remoteCloseInterceptor = this::remoteLinkClosedInterceptor;
private final Transformer transformer;
private AMQPFederatedAddressDeliveryReceiver receiver;
private Receiver protonReceiver;
private boolean started;
private volatile boolean closed;
private Consumer<FederationConsumerInternal> remoteCloseHandler;
public AMQPFederationAddressConsumer(AMQPFederation federation, AMQPFederationConsumerConfiguration configuration,
AMQPSessionContext session, FederationConsumerInfo consumerInfo, FederationReceiveFromAddressPolicy policy) {
this.federation = federation;
this.consumerInfo = consumerInfo;
this.policy = policy;
this.connection = session.getAMQPConnectionContext();
this.session = session;
this.configuration = configuration;
public AMQPFederationAddressConsumer(AMQPFederationAddressPolicyManager manager,
AMQPFederationConsumerConfiguration configuration,
AMQPSessionContext session, FederationConsumerInfo consumerInfo,
BiConsumer<FederationConsumerInfo, Message> messageObserver) {
super(manager.getFederation(), configuration, session, consumerInfo, messageObserver);
this.manager = manager;
this.policy = manager.getPolicy();
final TransformerConfiguration transformerConfiguration = policy.getTransformerConfiguration();
if (transformerConfiguration != null) {
@ -141,16 +117,6 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
}
}
@Override
public Federation getFederation() {
return federation;
}
@Override
public FederationConsumerInfo getConsumerInfo() {
return consumerInfo;
}
/**
* @return the {@link FederationReceiveFromAddressPolicy} that initiated this consumer.
*/
@ -158,82 +124,6 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
return policy;
}
@Override
public synchronized void start() {
if (!started && !closed) {
started = true;
asyncCreateReceiver();
}
}
@Override
public synchronized void close() {
if (!closed) {
closed = true;
if (started) {
started = false;
connection.runLater(() -> {
federation.removeLinkClosedInterceptor(consumerInfo.getId());
if (receiver != null) {
try {
receiver.close(false);
} catch (ActiveMQAMQPException e) {
} finally {
receiver = null;
}
}
// Need to track the proton receiver and close it here as the default
// context implementation doesn't do that and could result in no detach
// being sent in some cases and possible resources leaks.
if (protonReceiver != null) {
try {
protonReceiver.close();
} finally {
protonReceiver = null;
}
}
connection.flush();
});
}
}
}
@Override
public synchronized AMQPFederationAddressConsumer setRemoteClosedHandler(Consumer<FederationConsumerInternal> handler) {
if (started) {
throw new IllegalStateException("Cannot set a remote close handler after the consumer is started");
}
this.remoteCloseHandler = handler;
return this;
}
protected boolean remoteLinkClosedInterceptor(Link link) {
if (link == protonReceiver && link.getRemoteCondition() != null && link.getRemoteCondition().getCondition() != null) {
final Symbol errorCondition = link.getRemoteCondition().getCondition();
// Cases where remote link close is not considered terminal, additional checks
// should be added as needed for cases where the remote has closed the link either
// during the attach or at some point later.
if (RESOURCE_DELETED.equals(errorCondition)) {
// Remote side manually deleted this queue.
return true;
} else if (NOT_FOUND.equals(errorCondition)) {
// Remote did not have a queue that matched.
return true;
} else if (DETACH_FORCED.equals(errorCondition)) {
// Remote operator forced the link to detach.
return true;
}
}
return false;
}
private void signalBeforeFederationConsumerMessageHandled(Message message) throws ActiveMQException {
try {
federation.getServer().callBrokerAMQPFederationPlugins((plugin) -> {
@ -265,7 +155,8 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
"-" + LINK_SEQUENCE_ID.incrementAndGet();
}
private void asyncCreateReceiver() {
@Override
protected final void asyncCreateReceiver() {
connection.runLater(() -> {
if (closed) {
return;
@ -292,7 +183,8 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
filtersMap.put(AmqpSupport.JMS_SELECTOR_KEY, jmsFilter);
}
source.setOutcomes(Arrays.copyOf(DEFAULT_OUTCOMES, DEFAULT_OUTCOMES.length));
source.setOutcomes(Arrays.copyOf(OUTCOMES, OUTCOMES.length));
source.setDefaultOutcome(DEFAULT_OUTCOME);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
source.setAddress(address);
@ -386,6 +278,35 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
});
}
@Override
protected final void asyncCloseReceiver() {
connection.runLater(() -> {
federation.removeLinkClosedInterceptor(consumerInfo.getId());
if (receiver != null) {
try {
receiver.close(false);
} catch (ActiveMQAMQPException e) {
} finally {
receiver = null;
}
}
// Need to track the proton receiver and close it here as the default
// context implementation doesn't do that and could result in no detach
// being sent in some cases and possible resources leaks.
if (protonReceiver != null) {
try {
protonReceiver.close();
} finally {
protonReceiver = null;
}
}
connection.flush();
});
}
private static AMQPMessage incrementAMQPMessageHops(AMQPMessage message) {
Object hops = message.getAnnotation(MESSAGE_HOPS_ANNOTATION);
@ -424,8 +345,8 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
private final SimpleString cachedAddress;
private MessageReader coreMessageReader;
private MessageReader coreLargeMessageReader;
private boolean closed;
/**
* Creates the federation receiver instance.
@ -443,15 +364,25 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
if (!closed) {
super.close(remoteLinkClose);
closed = true;
if (remoteLinkClose && remoteCloseHandler != null) {
try {
remoteCloseHandler.accept(AMQPFederationAddressConsumer.this);
federation.unregisterAddressConsumerManagement(manager, AMQPFederationAddressConsumer.this);
} catch (Exception e) {
logger.debug("User remote closed handler threw error: ", e);
} finally {
remoteCloseHandler = null;
logger.trace("Error thrown when unregistering federation address consumer from management", e);
}
if (remoteLinkClose && remoteCloseHandler != null) {
try {
remoteCloseHandler.accept(AMQPFederationAddressConsumer.this);
} catch (Exception e) {
logger.debug("User remote closed handler threw error: ", e);
} finally {
remoteCloseHandler = null;
}
}
}
}
@ -509,6 +440,12 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
try {
federation.registerAddressConsumerManagement(manager, AMQPFederationAddressConsumer.this);
} catch (Exception e) {
logger.debug("Error caught when trying to add federation address consumer to management", e);
}
flow();
}
@ -559,6 +496,8 @@ public class AMQPFederationAddressConsumer implements FederationConsumerInternal
} catch (Exception e) {
logger.warn("Inbound delivery for {} encountered an error: {}", consumerInfo, e.getMessage(), e);
deliveryFailed(delivery, receiver, e);
} finally {
recordFederatedMessageReceived(message);
}
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import javax.management.NotCompliantMBeanException;
import org.apache.activemq.artemis.core.management.impl.AbstractControl;
import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper;
import org.apache.activemq.artemis.logs.AuditLogger;
/**
* Management service control for an AMQP Federation address policy manager instance.
*/
public class AMQPFederationAddressPolicyControl extends AbstractControl implements AMQPFederationPolicyControl {
private final AMQPFederationAddressPolicyManager policyManager;
public AMQPFederationAddressPolicyControl(AMQPFederationAddressPolicyManager policyManager) throws NotCompliantMBeanException {
super(AMQPFederationPolicyControl.class, policyManager.getFederation().getServer().getStorageManager());
this.policyManager = policyManager;
}
@Override
public String getType() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getType(this.policyManager);
}
clearIO();
try {
return policyManager.getPolicyType().toString();
} finally {
blockOnIO();
}
}
@Override
public String getName() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getName(this.policyManager);
}
clearIO();
try {
return policyManager.getPolicyName();
} finally {
blockOnIO();
}
}
@Override
public long getMessagesReceived() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getMessagesReceived(this.policyManager);
}
clearIO();
try {
return policyManager.getMessagesReceived();
} finally {
blockOnIO();
}
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(AMQPFederationPolicyControl.class);
}
@Override
protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(AMQPFederationPolicyControl.class);
}
}

View File

@ -22,9 +22,11 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
@ -32,6 +34,7 @@ import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromResourcePolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationAddressPolicyManager;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
@ -48,6 +51,8 @@ public class AMQPFederationAddressPolicyManager extends FederationAddressPolicyM
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final AtomicLong messageCount = new AtomicLong();
protected final BiConsumer<FederationConsumerInfo, Message> messageObserver = (i, m) -> messageCount.incrementAndGet();
protected final AMQPFederation federation;
protected final String remoteQueueFilter;
@ -62,7 +67,19 @@ public class AMQPFederationAddressPolicyManager extends FederationAddressPolicyM
}
@Override
protected void handlePolicyManagerStarted(FederationReceiveFromAddressPolicy policy) {
public AMQPFederation getFederation() {
return federation;
}
/**
* @return the number of messages that all federation consumer of this policy have received from the remote.
*/
public long getMessagesReceived() {
return messageCount.get();
}
@Override
protected void handlePolicyManagerStarted(FederationReceiveFromResourcePolicy policy) {
// Capture state for the current connection on each start of the policy manager.
configuration = new AMQPFederationConsumerConfiguration(federation.getConfiguration(), policy.getProperties());
session = federation.getSessionContext();
@ -96,7 +113,7 @@ public class AMQPFederationAddressPolicyManager extends FederationAddressPolicyM
// Don't initiate anything yet as the caller might need to register error handlers etc
// before the attach is sent otherwise they could miss the failure case.
return new AMQPFederationAddressConsumer(federation, configuration, session, consumerInfo, policy);
return new AMQPFederationAddressConsumer(this, configuration, session, consumerInfo, messageObserver);
}
@Override

View File

@ -0,0 +1,175 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DETACH_FORCED;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NOT_FOUND;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.RESOURCE_DELETED;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Predicate;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
/**
* Base class for AMQP Federation consumers that implements some of the common functionality.
*/
public abstract class AMQPFederationConsumer implements FederationConsumerInternal {
protected static final Symbol[] OUTCOMES = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
protected static final Modified DEFAULT_OUTCOME;
static {
DEFAULT_OUTCOME = new Modified();
DEFAULT_OUTCOME.setDeliveryFailed(true);
}
protected final AMQPFederation federation;
protected final AMQPFederationConsumerConfiguration configuration;
protected final FederationConsumerInfo consumerInfo;
protected final AMQPConnectionContext connection;
protected final AMQPSessionContext session;
protected final Predicate<Link> remoteCloseInterceptor = this::remoteLinkClosedInterceptor;
protected final BiConsumer<FederationConsumerInfo, Message> messageObserver;
protected final AtomicLong messageCount = new AtomicLong();
protected Receiver protonReceiver;
protected boolean started;
protected volatile boolean closed;
protected Consumer<FederationConsumerInternal> remoteCloseHandler;
public AMQPFederationConsumer(AMQPFederation federation, AMQPFederationConsumerConfiguration configuration,
AMQPSessionContext session, FederationConsumerInfo consumerInfo,
BiConsumer<FederationConsumerInfo, Message> messageObserver) {
this.federation = federation;
this.consumerInfo = consumerInfo;
this.connection = session.getAMQPConnectionContext();
this.session = session;
this.configuration = configuration;
this.messageObserver = messageObserver;
}
/**
* @return the number of messages this consumer has received from the remote during its lifetime.
*/
public final long getMessagesReceived() {
return messageCount.get();
}
@Override
public final AMQPFederation getFederation() {
return federation;
}
@Override
public final FederationConsumerInfo getConsumerInfo() {
return consumerInfo;
}
@Override
public final synchronized void start() {
if (!started && !closed) {
started = true;
asyncCreateReceiver();
}
}
@Override
public final synchronized void close() {
if (!closed) {
closed = true;
if (started) {
started = false;
asyncCloseReceiver();
}
}
}
@Override
public final AMQPFederationConsumer setRemoteClosedHandler(Consumer<FederationConsumerInternal> handler) {
if (started) {
throw new IllegalStateException("Cannot set a remote close handler after the consumer is started");
}
this.remoteCloseHandler = handler;
return this;
}
protected final boolean remoteLinkClosedInterceptor(Link link) {
if (link == protonReceiver && link.getRemoteCondition() != null && link.getRemoteCondition().getCondition() != null) {
final Symbol errorCondition = link.getRemoteCondition().getCondition();
// Cases where remote link close is not considered terminal, additional checks
// should be added as needed for cases where the remote has closed the link either
// during the attach or at some point later.
if (RESOURCE_DELETED.equals(errorCondition)) {
// Remote side manually deleted this queue.
return true;
} else if (NOT_FOUND.equals(errorCondition)) {
// Remote did not have a queue that matched.
return true;
} else if (DETACH_FORCED.equals(errorCondition)) {
// Remote operator forced the link to detach.
return true;
}
}
return false;
}
/**
* Called from a subclass upon handling an incoming federated message from the remote.
*
* @param message
* The original message that arrived from the remote.
*/
protected final void recordFederatedMessageReceived(Message message) {
messageCount.incrementAndGet();
if (messageObserver != null) {
messageObserver.accept(consumerInfo, message);
}
}
/**
* Called during the start of the consumer to trigger an asynchronous link attach
* of the underlying AMQP receiver that backs this federation consumer.
*/
protected abstract void asyncCreateReceiver();
/**
* Called during the close of the consumer to trigger an asynchronous link detach
* of the underlying AMQP receiver that backs this federation consumer.
*/
protected abstract void asyncCloseReceiver();
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import org.apache.activemq.artemis.api.core.management.Attribute;
/**
* Management interface that is backed by an active federation consumer
* that was created when demand was applied to a matching address or queue.
*/
public interface AMQPFederationConsumerControl {
/**
* Returns the number of messages this federation consumer has received from the remote
*/
@Attribute(desc = "returns the number of messages this federation consumer has received from the remote")
long getMessagesReceived();
/**
* @return the type of federation consumer being represented.
*/
@Attribute(desc = "AMQP federation consumer type (address or queue) that backs this instance.")
String getRole();
/**
* Gets the queue name that will be used for this federation consumer instance.
*
* For Queue federation this will be the name of the queue whose messages are
* being federated to this server instance. For an Address federation this will
* be an automatically generated name that should be unique to a given federation
* instance
*
* @return the queue name associated with the federation consumer
*/
@Attribute(desc = "the queue name associated with the federation consumer.")
String getQueueName();
/**
* Gets the address that will be used for this federation consumer instance.
*
* For Queue federation this is the address under which the matching queue must
* reside. For Address federation this is the actual address whose messages are
* being federated.
*
* @return the address associated with this federation consumer.
*/
@Attribute(desc = "the address name associated with the federation consumer.")
String getAddress();
/**
* Gets the FQQN that comprises the address and queue where the remote consumer
* will be attached.
*
* @return provides the FQQN that can be used to address the consumer queue directly.
*/
@Attribute(desc = "the FQQN associated with the federation consumer.")
String getFqqn();
/**
* Gets the routing type that will be requested when creating a consumer on the
* remote server.
*
* @return the routing type of the remote consumer.
*/
@Attribute(desc = "the Routing Type associated with the federation consumer.")
String getRoutingType();
/**
* Gets the filter string that will be used when creating the remote consumer.
*
* For Queue federation this will be the filter that exists on the local queue that
* is requesting federation of messages from the remote. For address federation this
* filter will be used to restrict some movement of messages amongst federated server
* addresses.
*
* @return the filter string in use for the federation consumer.
*/
@Attribute(desc = "the filter string associated with the federation consumer.")
String getFilterString();
/**
* Gets the priority value that will be requested for the remote consumer that is
* created.
*
* @return the assigned consumer priority for the federation consumer.
*/
@Attribute(desc = "the assigned priority of the the federation consumer.")
int getPriority();
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import javax.management.NotCompliantMBeanException;
import org.apache.activemq.artemis.core.management.impl.AbstractControl;
import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
/**
* Management object used for AMQP federation address and queue consumers to expose consumer state.
*/
public class AMQPFederationConsumerControlType extends AbstractControl implements AMQPFederationConsumerControl {
private final AMQPFederationConsumer consumer;
private final FederationConsumerInfo consumerInfo;
public AMQPFederationConsumerControlType(AMQPFederationConsumer consumer) throws NotCompliantMBeanException {
super(AMQPFederationConsumerControl.class, consumer.getFederation().getServer().getStorageManager());
this.consumer = consumer;
this.consumerInfo = consumer.getConsumerInfo();
}
@Override
public long getMessagesReceived() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getMessagesReceived(this.consumerInfo);
}
clearIO();
try {
return consumer.getMessagesReceived();
} finally {
blockOnIO();
}
}
@Override
public String getRole() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getRole(this.consumerInfo);
}
clearIO();
try {
return consumerInfo.getRole().toString();
} finally {
blockOnIO();
}
}
@Override
public String getQueueName() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getQueueName(this.consumerInfo);
}
clearIO();
try {
return consumerInfo.getQueueName();
} finally {
blockOnIO();
}
}
@Override
public String getAddress() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getAddress(this.consumerInfo);
}
clearIO();
try {
return consumerInfo.getAddress();
} finally {
blockOnIO();
}
}
@Override
public String getFqqn() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getFqqn(this.consumerInfo);
}
clearIO();
try {
return consumerInfo.getFqqn();
} finally {
blockOnIO();
}
}
@Override
public String getRoutingType() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getRoutingType(this.consumerInfo);
}
clearIO();
try {
return consumerInfo.getRoutingType().toString();
} finally {
blockOnIO();
}
}
@Override
public String getFilterString() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getFilterString(this.consumerInfo);
}
clearIO();
try {
return consumerInfo.getFilterString();
} finally {
blockOnIO();
}
}
@Override
public int getPriority() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getPriority(this.consumerInfo);
}
clearIO();
try {
return consumerInfo.getPriority();
} finally {
blockOnIO();
}
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(AMQPFederationConsumerControl.class);
}
@Override
protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(AMQPFederationConsumerControl.class);
}
}

View File

@ -0,0 +1,193 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import javax.management.ObjectName;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.management.ManagementService;
/**
* Support methods for working with the AMQP Federation management types
*/
public abstract class AMQPFederationManagementSupport {
/**
* Template used to denote local federation policy managers in the server management registry.
*/
public static final String FEDERATION_POLICY_RESOURCE_TEMPLATE = "brokerconnection.local.federation.policy.%s";
/**
* Template used to denote local federation consumers in the server management registry. Since policy names
* are unique on the local broker AMQP federation configuration these names should not collide as each policy
* will create only one consumer for a given address or queue.
*/
public static final String FEDERATION_CONSUMER_RESOURCE_TEMPLATE = "brokerconnection.local.federation.policy.%s.consumer.%s";
/**
* The template used to create the object name suffix that is appending to the broker connection
* object name when adding and removing AMQP federation policy control elements.
*/
public static final String FEDERATION_POLICY_NAME_TEMPLATE = "serviceCatagory=federations,federationName=%s,policyType=%s,policyName=%s";
/**
* The template used to create the object name suffix that is appending to the broker connection
* object name when adding and removing AMQP federation queue consumer control elements.
*/
public static final String FEDERATION_QUEUE_CONSUMER_NAME_TEMPLATE = FEDERATION_POLICY_NAME_TEMPLATE + ",linkType=consumers,fqqn=%s";
/**
* The template used to create the object name suffix that is appending to the broker connection
* object name when adding and removing AMQP federation address consumer control elements.
*/
public static final String FEDERATION_ADDRESS_CONSUMER_NAME_TEMPLATE = FEDERATION_POLICY_NAME_TEMPLATE + ",linkType=consumers,address=%s";
// APIs for federation address and queue policy management
public static void registerAddressPolicyControl(String brokerConnection, AMQPFederationAddressPolicyManager manager) throws Exception {
final AMQPFederation federation = manager.getFederation();
final ActiveMQServer server = federation.getServer();
final ManagementService management = server.getManagementService();
final AMQPFederationAddressPolicyControl control = new AMQPFederationAddressPolicyControl(manager);
final String policyName = manager.getPolicyName();
management.registerInJMX(getFederationPolicyObjectName(management, brokerConnection, federation.getName(), manager.getPolicyType().toString(), policyName), control);
management.registerInRegistry(getFederationPolicyResourceName(policyName), control);
}
public static void unregisterAddressPolicyControl(String brokerConnection, AMQPFederationAddressPolicyManager manager) throws Exception {
final AMQPFederation federation = manager.getFederation();
final ActiveMQServer server = federation.getServer();
final ManagementService management = server.getManagementService();
final String policyName = manager.getPolicyName();
management.unregisterFromJMX(getFederationPolicyObjectName(management, brokerConnection, federation.getName(), manager.getPolicyType().toString(), policyName));
management.unregisterFromRegistry(getFederationPolicyResourceName(policyName));
}
public static void registerQueuePolicyControl(String brokerConnection, AMQPFederationQueuePolicyManager manager) throws Exception {
final AMQPFederation federation = manager.getFederation();
final ActiveMQServer server = federation.getServer();
final ManagementService management = server.getManagementService();
final AMQPFederationQueuePolicyControl control = new AMQPFederationQueuePolicyControl(manager);
final String policyName = manager.getPolicyName();
management.registerInJMX(getFederationPolicyObjectName(management, brokerConnection, federation.getName(), manager.getPolicyType().toString(), policyName), control);
management.registerInRegistry(getFederationPolicyResourceName(policyName), control);
}
public static void unregisterQueuePolicyControl(String brokerConnection, AMQPFederationQueuePolicyManager manager) throws Exception {
final AMQPFederation federation = manager.getFederation();
final ActiveMQServer server = federation.getServer();
final ManagementService management = server.getManagementService();
final String policyName = manager.getPolicyName();
management.unregisterFromJMX(getFederationPolicyObjectName(management, brokerConnection, federation.getName(), manager.getPolicyType().toString(), policyName));
management.unregisterFromRegistry(getFederationPolicyResourceName(policyName));
}
public static String getFederationPolicyResourceName(String policyName) {
return String.format(FEDERATION_POLICY_RESOURCE_TEMPLATE, policyName);
}
public static ObjectName getFederationPolicyObjectName(ManagementService management, String brokerConnection, String federationName, String policyType, String policyName) throws Exception {
final String brokerConnectionName = management.getObjectNameBuilder().getBrokerConnectionBaseObjectNameString(brokerConnection);
return ObjectName.getInstance(
String.format("%s," + FEDERATION_POLICY_NAME_TEMPLATE,
brokerConnectionName,
ObjectName.quote(federationName),
ObjectName.quote(policyType),
ObjectName.quote(policyName)));
}
// APIs for federation consumer management
public static void registerAddressConsumerControl(String brokerConnection, AMQPFederationAddressPolicyManager manager, AMQPFederationAddressConsumer consumer) throws Exception {
final AMQPFederation federation = consumer.getFederation();
final ActiveMQServer server = federation.getServer();
final ManagementService management = server.getManagementService();
final AMQPFederationConsumerControlType control = new AMQPFederationConsumerControlType(consumer);
final String policyName = manager.getPolicyName();
management.registerInJMX(getFederationAddressConsumerObjectName(management, brokerConnection, federation.getName(), manager.getPolicyType().toString(), policyName, consumer.getConsumerInfo().getAddress()), control);
management.registerInRegistry(getFederationAddressConsumerResourceName(policyName, consumer.getConsumerInfo().getAddress()), control);
}
public static void unregisterAddressConsumerControl(String brokerConnection, AMQPFederationAddressPolicyManager manager, AMQPFederationAddressConsumer consumer) throws Exception {
final AMQPFederation federation = consumer.getFederation();
final ActiveMQServer server = federation.getServer();
final ManagementService management = server.getManagementService();
final String policyName = manager.getPolicyName();
management.unregisterFromJMX(getFederationAddressConsumerObjectName(management, brokerConnection, federation.getName(), manager.getPolicyType().toString(), policyName, consumer.getConsumerInfo().getAddress()));
management.unregisterFromRegistry(getFederationAddressConsumerResourceName(policyName, consumer.getConsumerInfo().getAddress()));
}
public static void registerQueueConsumerControl(String brokerConnection, AMQPFederationQueuePolicyManager manager, AMQPFederationQueueConsumer consumer) throws Exception {
final AMQPFederation federation = consumer.getFederation();
final ActiveMQServer server = federation.getServer();
final ManagementService management = server.getManagementService();
final AMQPFederationConsumerControlType control = new AMQPFederationConsumerControlType(consumer);
final String policyName = manager.getPolicyName();
management.registerInJMX(getFederationQueueConsumerObjectName(management, brokerConnection, federation.getName(), manager.getPolicyType().toString(), policyName, consumer.getConsumerInfo().getFqqn()), control);
management.registerInRegistry(getFederationQueueConsumerResourceName(policyName, consumer.getConsumerInfo().getFqqn()), control);
}
public static void unregisterQueueConsumerControl(String brokerConnection, AMQPFederationQueuePolicyManager manager, AMQPFederationQueueConsumer consumer) throws Exception {
final AMQPFederation federation = consumer.getFederation();
final ActiveMQServer server = federation.getServer();
final ManagementService management = server.getManagementService();
final String policyName = manager.getPolicyName();
management.unregisterFromJMX(getFederationQueueConsumerObjectName(management, brokerConnection, federation.getName(), manager.getPolicyType().toString(), policyName, consumer.getConsumerInfo().getFqqn()));
management.unregisterFromRegistry(getFederationQueueConsumerResourceName(policyName, consumer.getConsumerInfo().getFqqn()));
}
public static String getFederationAddressConsumerResourceName(String policyName, String address) {
return String.format(FEDERATION_CONSUMER_RESOURCE_TEMPLATE, policyName, address);
}
public static String getFederationQueueConsumerResourceName(String policyName, String fqqn) {
return String.format(FEDERATION_CONSUMER_RESOURCE_TEMPLATE, policyName, fqqn);
}
public static ObjectName getFederationAddressConsumerObjectName(ManagementService management, String brokerConnection, String federationName, String policyType, String policyName, String address) throws Exception {
final String brokerConnectionName = management.getObjectNameBuilder().getBrokerConnectionBaseObjectNameString(brokerConnection);
return ObjectName.getInstance(
String.format("%s," + FEDERATION_ADDRESS_CONSUMER_NAME_TEMPLATE,
brokerConnectionName,
ObjectName.quote(federationName),
ObjectName.quote(policyType),
ObjectName.quote(policyName),
ObjectName.quote(address)));
}
public static ObjectName getFederationQueueConsumerObjectName(ManagementService management, String brokerConnection, String federationName, String policyType, String policyName, String fqqn) throws Exception {
final String brokerConnectionName = management.getObjectNameBuilder().getBrokerConnectionBaseObjectNameString(brokerConnection);
return ObjectName.getInstance(
String.format("%s," + FEDERATION_QUEUE_CONSUMER_NAME_TEMPLATE,
brokerConnectionName,
ObjectName.quote(federationName),
ObjectName.quote(policyType),
ObjectName.quote(policyName),
ObjectName.quote(fqqn)));
}
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import org.apache.activemq.artemis.api.core.management.Attribute;
/**
* Management service control interface for an AMQPFederation policy manager instance.
*/
public interface AMQPFederationPolicyControl {
/**
* Returns the type of the AMQP federation policy manager being controlled
*/
@Attribute(desc = "AMQP federation policy manager type that backs this control instance.")
String getType();
/**
* Returns the configured name the AMQP federation policy manager being controlled
*/
@Attribute(desc = "The configured AMQP federation policy name that backs this control instance.")
String getName();
/**
* Returns the number of messages this federation policy has received from the remote.
*/
@Attribute(desc = "returns the number of messages this federation policy has received from the remote")
long getMessagesReceived();
}

View File

@ -22,10 +22,6 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_LARGE_MESSAGE_FORMAT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledMessageConstants.AMQP_TUNNELED_CORE_MESSAGE_FORMAT;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.AMQP_LINK_INITIALIZER_KEY;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.DETACH_FORCED;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.NOT_FOUND;
import static org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport.RESOURCE_DELETED;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.HashMap;
@ -34,9 +30,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.function.BiConsumer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
@ -50,10 +44,8 @@ import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException;
import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException;
import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPSessionContext;
@ -64,11 +56,7 @@ import org.apache.activemq.artemis.protocol.amqp.proton.AMQPTunneledCoreMessageR
import org.apache.activemq.artemis.protocol.amqp.proton.MessageReader;
import org.apache.activemq.artemis.protocol.amqp.proton.ProtonServerReceiverContext;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
@ -77,7 +65,6 @@ import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Link;
import org.apache.qpid.proton.engine.Receiver;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -87,44 +74,31 @@ import org.slf4j.LoggerFactory;
* AMQP peer and forwards those messages onto the internal broker Queue for
* consumption by an attached resource.
*/
public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
public class AMQPFederationQueueConsumer extends AMQPFederationConsumer {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
public static final int DEFAULT_PENDING_MSG_CHECK_BACKOFF_MULTIPLIER = 2;
public static final int DEFAULT_PENDING_MSG_CHECK_MAX_DELAY = 30;
private static final Symbol[] DEFAULT_OUTCOMES = new Symbol[]{Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL};
// Sequence ID value used to keep links that would otherwise have the same name from overlapping
// this generally occurs when consumers on the same queue have differing filters.
private static final AtomicLong LINK_SEQUENCE_ID = new AtomicLong();
private final AMQPFederation federation;
private final AMQPFederationConsumerConfiguration configuration;
private final FederationConsumerInfo consumerInfo;
private final AMQPFederationQueuePolicyManager manager;
private final FederationReceiveFromQueuePolicy policy;
private final AMQPConnectionContext connection;
private final AMQPSessionContext session;
private final Predicate<Link> remoteCloseIntercepter = this::remoteLinkClosedIntercepter;
private final Transformer transformer;
private AMQPFederatedQueueDeliveryReceiver receiver;
private Receiver protonReceiver;
private boolean started;
private volatile boolean closed;
private Consumer<FederationConsumerInternal> remoteCloseHandler;
public AMQPFederationQueueConsumer(AMQPFederation federation, AMQPFederationConsumerConfiguration configuration,
AMQPSessionContext session, FederationConsumerInfo consumerInfo, FederationReceiveFromQueuePolicy policy) {
this.federation = federation;
this.consumerInfo = consumerInfo;
this.policy = policy;
this.connection = session.getAMQPConnectionContext();
this.session = session;
this.configuration = configuration;
public AMQPFederationQueueConsumer(AMQPFederationQueuePolicyManager manager,
AMQPFederationConsumerConfiguration configuration,
AMQPSessionContext session, FederationConsumerInfo consumerInfo,
BiConsumer<FederationConsumerInfo, Message> messageObserver) {
super(manager.getFederation(), configuration, session, consumerInfo, messageObserver);
this.manager = manager;
this.policy = manager.getPolicy();
final TransformerConfiguration transformerConfiguration = policy.getTransformerConfiguration();
if (transformerConfiguration != null) {
@ -134,16 +108,6 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
}
}
@Override
public Federation getFederation() {
return federation;
}
@Override
public FederationConsumerInfo getConsumerInfo() {
return consumerInfo;
}
/**
* @return the {@link FederationReceiveFromQueuePolicy} that initiated this consumer.
*/
@ -151,82 +115,6 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
return policy;
}
@Override
public synchronized void start() {
if (!started && !closed) {
started = true;
asyncCreateReceiver();
}
}
@Override
public synchronized void close() {
if (!closed) {
closed = true;
if (started) {
started = false;
connection.runLater(() -> {
federation.removeLinkClosedInterceptor(consumerInfo.getId());
if (receiver != null) {
try {
receiver.close(false);
} catch (ActiveMQAMQPException e) {
} finally {
receiver = null;
}
}
// Need to track the proton receiver and close it here as the default
// context implementation doesn't do that and could result in no detach
// being sent in some cases and possible resources leaks.
if (protonReceiver != null) {
try {
protonReceiver.close();
} finally {
protonReceiver = null;
}
}
connection.flush();
});
}
}
}
@Override
public synchronized AMQPFederationQueueConsumer setRemoteClosedHandler(Consumer<FederationConsumerInternal> handler) {
if (started) {
throw new IllegalStateException("Cannot set a remote close handler after the consumer is started");
}
this.remoteCloseHandler = handler;
return this;
}
protected boolean remoteLinkClosedIntercepter(Link link) {
if (link == protonReceiver && link.getRemoteCondition() != null && link.getRemoteCondition().getCondition() != null) {
final Symbol errorCondition = link.getRemoteCondition().getCondition();
// Cases where remote link close is not considered terminal, additional checks
// should be added as needed for cases where the remote has closed the link either
// during the attach or at some point later.
if (RESOURCE_DELETED.equals(errorCondition)) {
// Remote side manually deleted this queue.
return true;
} else if (NOT_FOUND.equals(errorCondition)) {
// Remote did not have a queue that matched.
return true;
} else if (DETACH_FORCED.equals(errorCondition)) {
// Remote operator forced the link to detach.
return true;
}
}
return false;
}
private void signalBeforeFederationConsumerMessageHandled(Message message) throws ActiveMQException {
try {
federation.getServer().callBrokerAMQPFederationPlugins((plugin) -> {
@ -258,7 +146,8 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
LINK_SEQUENCE_ID.getAndIncrement();
}
private void asyncCreateReceiver() {
@Override
protected void asyncCreateReceiver() {
connection.runLater(() -> {
if (closed) {
return;
@ -277,7 +166,8 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
source.setCapabilities(AmqpSupport.TOPIC_CAPABILITY);
}
source.setOutcomes(Arrays.copyOf(DEFAULT_OUTCOMES, DEFAULT_OUTCOMES.length));
source.setOutcomes(Arrays.copyOf(OUTCOMES, OUTCOMES.length));
source.setDefaultOutcome(DEFAULT_OUTCOME);
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
source.setAddress(address);
@ -346,7 +236,7 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
// Intercept remote close and check for valid reasons for remote closure such as
// the remote peer not having a matching queue for this subscription or from an
// operator manually closing the link.
federation.addLinkClosedInterceptor(consumerInfo.getId(), remoteCloseIntercepter);
federation.addLinkClosedInterceptor(consumerInfo.getId(), remoteCloseInterceptor);
receiver = new AMQPFederatedQueueDeliveryReceiver(localQueue, protonReceiver);
@ -371,6 +261,35 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
});
}
@Override
protected final void asyncCloseReceiver() {
connection.runLater(() -> {
federation.removeLinkClosedInterceptor(consumerInfo.getId());
if (receiver != null) {
try {
receiver.close(false);
} catch (ActiveMQAMQPException e) {
} finally {
receiver = null;
}
}
// Need to track the proton receiver and close it here as the default
// context implementation doesn't do that and could result in no detach
// being sent in some cases and possible resources leaks.
if (protonReceiver != null) {
try {
protonReceiver.close();
} finally {
protonReceiver = null;
}
}
connection.flush();
});
}
private static int caclulateNextDelay(int lastDelay, int backoffMultiplier, int maxDelay) {
final int nextDelay;
@ -390,12 +309,11 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
private class AMQPFederatedQueueDeliveryReceiver extends ProtonServerReceiverContext {
private final SimpleString cachedFqqn;
private final Queue localQueue;
private MessageReader coreMessageReader;
private MessageReader coreLargeMessageReader;
private boolean closed;
/**
* Creates the federation receiver instance.
@ -418,15 +336,25 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
@Override
public void close(boolean remoteLinkClose) throws ActiveMQAMQPException {
super.close(remoteLinkClose);
if (!closed) {
closed = true;
super.close(remoteLinkClose);
if (remoteLinkClose && remoteCloseHandler != null) {
try {
remoteCloseHandler.accept(AMQPFederationQueueConsumer.this);
federation.unregisterQueueConsumerManagement(manager, AMQPFederationQueueConsumer.this);
} catch (Exception e) {
logger.debug("User remote closed handler threw error: ", e);
} finally {
remoteCloseHandler = null;
logger.trace("Error thrown when unregistering federation queue consumer from management", e);
}
if (remoteLinkClose && remoteCloseHandler != null) {
try {
remoteCloseHandler.accept(AMQPFederationQueueConsumer.this);
} catch (Exception e) {
logger.debug("User remote closed handler threw error: ", e);
} finally {
remoteCloseHandler = null;
}
}
}
}
@ -467,6 +395,12 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e);
}
try {
federation.registerQueueConsumerManagement(manager, AMQPFederationQueueConsumer.this);
} catch (Exception e) {
logger.debug("Error caught when trying to add federation queue consumer to management", e);
}
flow();
}
@ -504,6 +438,8 @@ public class AMQPFederationQueueConsumer implements FederationConsumerInternal {
} catch (Exception e) {
logger.warn("Inbound delivery for {} encountered an error: {}", consumerInfo, e.getMessage(), e);
deliveryFailed(delivery, receiver, e);
} finally {
recordFederatedMessageReceived(message);
}
}

View File

@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import javax.management.NotCompliantMBeanException;
import org.apache.activemq.artemis.core.management.impl.AbstractControl;
import org.apache.activemq.artemis.core.management.impl.MBeanInfoHelper;
import org.apache.activemq.artemis.logs.AuditLogger;
/**
* Management service control for an AMQP Federation queue policy manager instance.
*/
public class AMQPFederationQueuePolicyControl extends AbstractControl implements AMQPFederationPolicyControl {
private final AMQPFederationQueuePolicyManager policyManager;
public AMQPFederationQueuePolicyControl(AMQPFederationQueuePolicyManager policyManager) throws NotCompliantMBeanException {
super(AMQPFederationPolicyControl.class, policyManager.getFederation().getServer().getStorageManager());
this.policyManager = policyManager;
}
@Override
public String getType() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getType(this.policyManager);
}
clearIO();
try {
return policyManager.getPolicyType().toString();
} finally {
blockOnIO();
}
}
@Override
public String getName() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getName(this.policyManager);
}
clearIO();
try {
return policyManager.getPolicyName();
} finally {
blockOnIO();
}
}
@Override
public long getMessagesReceived() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getMessagesReceived(this.policyManager);
}
clearIO();
try {
return policyManager.getMessagesReceived();
} finally {
blockOnIO();
}
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(AMQPFederationPolicyControl.class);
}
@Override
protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(AMQPFederationPolicyControl.class);
}
}

View File

@ -20,9 +20,11 @@ package org.apache.activemq.artemis.protocol.amqp.connect.federation;
import java.lang.invoke.MethodHandles;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Queue;
@ -31,6 +33,7 @@ import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromQueuePolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo.Role;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromResourcePolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationConsumerInternal;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationGenericConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.internal.FederationQueuePolicyManager;
@ -46,6 +49,8 @@ public class AMQPFederationQueuePolicyManager extends FederationQueuePolicyManag
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final AtomicLong messageCount = new AtomicLong();
protected final BiConsumer<FederationConsumerInfo, Message> messageObserver = (i, m) -> messageCount.incrementAndGet();
protected final AMQPFederation federation;
protected volatile AMQPFederationConsumerConfiguration configuration;
@ -58,7 +63,19 @@ public class AMQPFederationQueuePolicyManager extends FederationQueuePolicyManag
}
@Override
protected void handlePolicyManagerStarted(FederationReceiveFromQueuePolicy policy) {
public AMQPFederation getFederation() {
return federation;
}
/**
* @return the number of messages that all federation consumer of this policy have received from the remote.
*/
public long getMessagesReceived() {
return messageCount.get();
}
@Override
protected void handlePolicyManagerStarted(FederationReceiveFromResourcePolicy policy) {
// Capture state for the current connection on each start of the policy manager.
configuration = new AMQPFederationConsumerConfiguration(federation.getConfiguration(), policy.getProperties());
session = federation.getSessionContext();
@ -96,7 +113,7 @@ public class AMQPFederationQueuePolicyManager extends FederationQueuePolicyManag
// Don't initiate anything yet as the caller might need to register error handlers etc
// before the attach is sent otherwise they could miss the failure case.
return new AMQPFederationQueueConsumer(federation, configuration, session, consumerInfo, policy);
return new AMQPFederationQueueConsumer(this, configuration, session, consumerInfo, messageObserver);
}
@Override

View File

@ -104,7 +104,7 @@ public class AMQPFederationSource extends AMQPFederation {
*/
@SuppressWarnings("unchecked")
public AMQPFederationSource(String name, Map<String, Object> properties, AMQPBrokerConnection connection) {
super(name, connection.getServer());
super(connection.getName(), name, connection.getServer());
if (properties == null || properties.isEmpty()) {
this.properties = Collections.EMPTY_MAP;
@ -268,6 +268,54 @@ public class AMQPFederationSource extends AMQPFederation {
asyncCreateControlLink();
}
@Override
protected void handleFederationStarted() throws ActiveMQException {
// These should move to the policy manager themselves eventually once
// a better lifetime management is worked out for these, right now the
// managers are started and stopped on connect and disconnect
addressMatchPolicies.forEach((nname, policy) -> {
try {
registerAddressPolicyManagement(policy);
} catch (Exception e) {
logger.warn("Error while attempting to add address policy control to management", e);
}
});
queueMatchPolicies.forEach((nname, policy) -> {
try {
registerQueuePolicyManagement(policy);
} catch (Exception e) {
logger.warn("Error while attempting to add queue policy control to management", e);
}
});
super.handleFederationStarted();
}
@Override
protected void handleFederationStopped() throws ActiveMQException {
// These should move to the policy manager themselves eventually once
// a better lifetime management is worked out for these, right now the
// managers are started and stopped on connect and disconnect
addressMatchPolicies.forEach((nname, policy) -> {
try {
unregisterAddressPolicyManagement(policy);
} catch (Exception e) {
logger.warn("Error while attempting to remote address policy control to management", e);
}
});
queueMatchPolicies.forEach((nname, policy) -> {
try {
unregisterQueuePolicyManagement(policy);
} catch (Exception e) {
logger.warn("Error while attempting to remote queue policy control to management", e);
}
});
super.handleFederationStopped();
}
@Override
protected void signalResourceCreateError(Exception cause) {
brokerConnection.connectError(cause);

View File

@ -46,7 +46,7 @@ public class AMQPFederationTarget extends AMQPFederation {
private final AMQPFederationConfiguration configuration;
public AMQPFederationTarget(String name, AMQPFederationConfiguration configuration, AMQPSessionContext session, ActiveMQServer server) {
super(name, server);
super(null, name, server);
Objects.requireNonNull(session, "Provided session instance cannot be null");

View File

@ -38,7 +38,7 @@ import org.apache.activemq.artemis.core.settings.impl.Match;
* Policy used to provide federation of remote to local broker addresses, once created the policy
* configuration is immutable.
*/
public class FederationReceiveFromAddressPolicy implements BiPredicate<String, RoutingType> {
public class FederationReceiveFromAddressPolicy implements FederationReceiveFromResourcePolicy, BiPredicate<String, RoutingType> {
private final Set<AddressMatcher> includesMatchers = new LinkedHashSet<>();
private final Set<AddressMatcher> excludesMatchers = new LinkedHashSet<>();
@ -85,6 +85,12 @@ public class FederationReceiveFromAddressPolicy implements BiPredicate<String, R
excludes.forEach((address) -> excludesMatchers.add(new AddressMatcher(address, wildcardConfig)));
}
@Override
public FederationType getPolicyType() {
return FederationType.ADDRESS_FEDERATION;
}
@Override
public String getPolicyName() {
return policyName;
}
@ -117,10 +123,12 @@ public class FederationReceiveFromAddressPolicy implements BiPredicate<String, R
return excludes;
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public TransformerConfiguration getTransformerConfiguration() {
return transformerConfig;
}

View File

@ -35,7 +35,7 @@ import org.apache.activemq.artemis.core.settings.impl.Match;
* Policy used to provide federation of remote to local broker queues, once created the policy
* configuration is immutable.
*/
public class FederationReceiveFromQueuePolicy implements BiPredicate<String, String> {
public class FederationReceiveFromQueuePolicy implements FederationReceiveFromResourcePolicy, BiPredicate<String, String> {
private final Set<QueueMatcher> includeMatchers = new LinkedHashSet<>();
private final Set<QueueMatcher> excludeMatchers = new LinkedHashSet<>();
@ -76,6 +76,12 @@ public class FederationReceiveFromQueuePolicy implements BiPredicate<String, Str
excludes.forEach((entry) -> excludeMatchers.add(new QueueMatcher(entry.getKey(), entry.getValue(), wildcardConfig)));
}
@Override
public FederationType getPolicyType() {
return FederationType.QUEUE_FEDERATION;
}
@Override
public String getPolicyName() {
return policyName;
}
@ -96,10 +102,12 @@ public class FederationReceiveFromQueuePolicy implements BiPredicate<String, Str
return excludes;
}
@Override
public Map<String, Object> getProperties() {
return properties;
}
@Override
public TransformerConfiguration getTransformerConfiguration() {
return transformerConfig;
}

View File

@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.federation;
import java.util.Map;
import org.apache.activemq.artemis.core.config.TransformerConfiguration;
/**
* Interface that a Federation receive from (address or queue) policy should implement
* and provides some common APIs that each should share.
*/
public interface FederationReceiveFromResourcePolicy {
/**
* @return the federation type this policy configuration defines.
*/
FederationType getPolicyType();
/**
* @return the name assigned to this federation policy.
*/
String getPolicyName();
/**
* @return a {@link Map} of properties that were used in the policy configuration.
*/
Map<String, Object> getProperties();
/**
* @return the {@link TransformerConfiguration} that was specified in the policy configuration.
*/
TransformerConfiguration getTransformerConfiguration();
}

View File

@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.federation;
/**
* Enumeration that define the type of federation a policy or resource implements.
*/
public enum FederationType {
/**
* Indicates a resource that is handling Address federation
*/
ADDRESS_FEDERATION("address-federation"),
/**
* Indicates a resource that is handling Queue federation
*/
QUEUE_FEDERATION("queue-federation");
private final String typeName;
FederationType(String typeName) {
this.typeName = typeName;
}
@Override
public String toString() {
return typeName;
}
}

View File

@ -29,15 +29,14 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.postoffice.impl.DivertBinding;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.federation.Federation;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.protocol.amqp.federation.Federation;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromAddressPolicy;
@ -57,25 +56,28 @@ import org.slf4j.LoggerFactory;
* which can federate all messages to the local side where the existing queues can apply
* any filtering they have in place.
*/
public abstract class FederationAddressPolicyManager implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin {
public abstract class FederationAddressPolicyManager extends FederationPolicyManager implements ActiveMQServerBindingPlugin, ActiveMQServerAddressPlugin {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final ActiveMQServer server;
protected final FederationInternal federation;
protected final FederationReceiveFromAddressPolicy policy;
protected final Map<String, FederationAddressEntry> demandTracking = new HashMap<>();
protected final Map<DivertBinding, Set<QueueBinding>> divertsTracking = new HashMap<>();
private volatile boolean started;
public FederationAddressPolicyManager(FederationInternal federation, FederationReceiveFromAddressPolicy addressPolicy) throws ActiveMQException {
Objects.requireNonNull(federation, "The Federation instance cannot be null");
super(federation);
Objects.requireNonNull(addressPolicy, "The Address match policy cannot be null");
this.federation = federation;
this.policy = addressPolicy;
this.server = federation.getServer();
}
/**
* @return the receive from address policy that backs the address policy manager.
*/
@Override
public FederationReceiveFromAddressPolicy getPolicy() {
return policy;
}
/**
@ -85,6 +87,10 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
* federation connection has been established.
*/
public synchronized void start() {
if (!federation.isStarted()) {
throw new IllegalStateException("Cannot start a federation policy manager when the federation is stopped.");
}
if (!started) {
started = true;
handlePolicyManagerStarted(policy);
@ -478,16 +484,6 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
return policy.test(address, type);
}
/**
* Called on start of the manager before any other actions are taken to allow the subclass time
* to configure itself and prepare any needed state prior to starting management of federated
* resources.
*
* @param policy
* The policy configuration for this policy manager.
*/
protected abstract void handlePolicyManagerStarted(FederationReceiveFromAddressPolicy policy);
/**
* Create a new {@link FederationConsumerInfo} based on the given {@link AddressInfo}
* and the configured {@link FederationReceiveFromAddressPolicy}. A subclass must override this
@ -515,59 +511,9 @@ public abstract class FederationAddressPolicyManager implements ActiveMQServerBi
return new FederationAddressEntry(addressInfo);
}
/**
* Create a new {@link FederationConsumerInternal} instance using the consumer information
* given. This is called when local demand for a matched queue requires a new consumer to
* be created. This method by default will call the configured consumer factory function that
* was provided when the manager was created, a subclass can override this to perform additional
* actions for the create operation.
*
* @param consumerInfo
* The {@link FederationConsumerInfo} that defines the consumer to be created.
*
* @return a new {@link FederationConsumerInternal} instance that will reside in this manager.
*/
protected abstract FederationConsumerInternal createFederationConsumer(FederationConsumerInfo consumerInfo);
/**
* Signal any registered plugins for this federation instance that a remote Address consumer
* is being created.
*
* @param info
* The {@link FederationConsumerInfo} that describes the remote Address consumer
*/
protected abstract void signalBeforeCreateFederationConsumer(FederationConsumerInfo info);
/**
* Signal any registered plugins for this federation instance that a remote Address consumer
* has been created.
*
* @param consumer
* The {@link FederationConsumerInfo} that describes the remote Address consumer
*/
protected abstract void signalAfterCreateFederationConsumer(FederationConsumer consumer);
/**
* Signal any registered plugins for this federation instance that a remote Address consumer
* is about to be closed.
*
* @param consumer
* The {@link FederationConsumer} that that is about to be closed.
*/
protected abstract void signalBeforeCloseFederationConsumer(FederationConsumer consumer);
/**
* Signal any registered plugins for this federation instance that a remote Address consumer
* has now been closed.
*
* @param consumer
* The {@link FederationConsumer} that that has been closed.
*/
protected abstract void signalAfterCloseFederationConsumer(FederationConsumer consumer);
/**
* Query all registered plugins for this federation instance to determine if any wish to
* prevent a federation consumer from being created for the given Queue.
* prevent a federation consumer from being created for the given resource.
*
* @param address
* The address on which the manager is intending to create a remote consumer for.

View File

@ -0,0 +1,140 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.protocol.amqp.federation.internal;
import java.util.Objects;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerBindingPlugin;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumer;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationConsumerInfo;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationReceiveFromResourcePolicy;
import org.apache.activemq.artemis.protocol.amqp.federation.FederationType;
/**
* Base Federation policy manager that declares some common APIs that address or queue policy
* managers must provide implementations for.
*/
public abstract class FederationPolicyManager implements ActiveMQServerBindingPlugin {
protected final ActiveMQServer server;
protected final FederationInternal federation;
protected volatile boolean started;
public FederationPolicyManager(FederationInternal federation) throws ActiveMQException {
Objects.requireNonNull(federation, "The Federation instance cannot be null");
this.federation = federation;
this.server = federation.getServer();
}
/**
* @return the immutable federation policy configuration that backs this manager.
*/
public abstract FederationReceiveFromResourcePolicy getPolicy();
/**
* @return the federation type this policy manager implements.
*/
public final FederationType getPolicyType() {
return getPolicy().getPolicyType();
}
/**
* @return the assigned name of the policy that is being managed.
*/
public final String getPolicyName() {
return getPolicy().getPolicyName();
}
/**
* @return <code>true</code> if the policy is started at the time this method was called.
*/
public final boolean isStarted() {
return started;
}
/**
* @return the {@link FederationInternal} instance that owns this policy manager.
*/
public FederationInternal getFederation() {
return federation;
}
/**
* Called on start of the manager before any other actions are taken to allow the subclass time
* to configure itself and prepare any needed state prior to starting management of federated
* resources.
*
* @param policy
* The policy configuration for this policy manager.
*/
protected abstract void handlePolicyManagerStarted(FederationReceiveFromResourcePolicy policy);
/**
* Create a new {@link FederationConsumerInternal} instance using the consumer information
* given. This is called when local demand for a matched resource requires a new consumer to
* be created. This method by default will call the configured consumer factory function that
* was provided when the manager was created, a subclass can override this to perform additional
* actions for the create operation.
*
* @param consumerInfo
* The {@link FederationConsumerInfo} that defines the consumer to be created.
*
* @return a new {@link FederationConsumerInternal} instance that will reside in this manager.
*/
protected abstract FederationConsumerInternal createFederationConsumer(FederationConsumerInfo consumerInfo);
/**
* Signal any registered plugins for this federation instance that a remote consumer
* is being created.
*
* @param info
* The {@link FederationConsumerInfo} that describes the remote federation consumer
*/
protected abstract void signalBeforeCreateFederationConsumer(FederationConsumerInfo info);
/**
* Signal any registered plugins for this federation instance that a remote consumer
* has been created.
*
* @param consumer
* The {@link FederationConsumerInfo} that describes the remote consumer
*/
protected abstract void signalAfterCreateFederationConsumer(FederationConsumer consumer);
/**
* Signal any registered plugins for this federation instance that a remote consumer
* is about to be closed.
*
* @param consumer
* The {@link FederationConsumer} that that is about to be closed.
*/
protected abstract void signalBeforeCloseFederationConsumer(FederationConsumer consumer);
/**
* Signal any registered plugins for this federation instance that a remote consumer
* has now been closed.
*
* @param consumer
* The {@link FederationConsumer} that that has been closed.
*/
protected abstract void signalAfterCloseFederationConsumer(FederationConsumer consumer);
}

View File

@ -49,28 +49,31 @@ import org.slf4j.LoggerFactory;
* monitoring broker queues for demand and creating a consumer for on the remote side
* to federate messages back to this peer.
*/
public abstract class FederationQueuePolicyManager implements ActiveMQServerConsumerPlugin, ActiveMQServerBindingPlugin {
public abstract class FederationQueuePolicyManager extends FederationPolicyManager implements ActiveMQServerConsumerPlugin, ActiveMQServerBindingPlugin {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
protected final ActiveMQServer server;
protected final Predicate<ServerConsumer> federationConsumerMatcher;
protected final FederationReceiveFromQueuePolicy policy;
protected final Map<FederationConsumerInfo, FederationQueueEntry> demandTracking = new HashMap<>();
protected final FederationInternal federation;
private volatile boolean started;
public FederationQueuePolicyManager(FederationInternal federation, FederationReceiveFromQueuePolicy queuePolicy) throws ActiveMQException {
Objects.requireNonNull(federation, "The Federation instance cannot be null");
super(federation);
Objects.requireNonNull(queuePolicy, "The Queue match policy cannot be null");
this.federation = federation;
this.policy = queuePolicy;
this.server = federation.getServer();
this.federationConsumerMatcher = createFederationConsumerMatcher(server, queuePolicy);
}
/**
* @return the receive from queue policy that backs the queue policy manager.
*/
@Override
public FederationReceiveFromQueuePolicy getPolicy() {
return policy;
}
/**
* Start the queue policy manager which will initiate a scan of all broker queue
* bindings and create and matching remote receivers. Start on a policy manager
@ -78,6 +81,10 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
* federation connection has been established.
*/
public synchronized void start() {
if (!federation.isStarted()) {
throw new IllegalStateException("Cannot start a federation policy manager when the federation is stopped.");
}
if (!started) {
started = true;
handlePolicyManagerStarted(policy);
@ -304,16 +311,6 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
return policy.testQueue(queueName);
}
/**
* Called on start of the manager before any other actions are taken to allow the subclass time
* to configure itself and prepare any needed state prior to starting management of federated
* resources.
*
* @param policy
* The policy configuration for this policy manager.
*/
protected abstract void handlePolicyManagerStarted(FederationReceiveFromQueuePolicy policy);
/**
* Create a new {@link FederationConsumerInfo} based on the given {@link ServerConsumer}
* and the configured {@link FederationReceiveFromQueuePolicy}. A subclass must override this
@ -341,18 +338,6 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
return new FederationQueueEntry(consumerInfo);
}
/**
* Create a new {@link FederationConsumerInternal} instance using the consumer information
* given. This is called when local demand for a matched queue requires a new consumer to
* be created. A subclass must override this to perform the creation of the remote consumer.
*
* @param consumerInfo
* The {@link FederationConsumerInfo} that defines the consumer to be created.
*
* @return a new {@link FederationConsumerInternal} instance that will reside in this manager.
*/
protected abstract FederationConsumerInternal createFederationConsumer(FederationConsumerInfo consumerInfo);
/**
* Creates a {@link Predicate} that should return true if the given consumer is a federation
* created consumer which should not be further federated.
@ -390,42 +375,6 @@ public abstract class FederationQueuePolicyManager implements ActiveMQServerCons
}
}
/**
* Signal any registered plugins for this federation instance that a remote Queue consumer
* is being created.
*
* @param info
* The {@link FederationConsumerInfo} that describes the remote Queue consumer
*/
protected abstract void signalBeforeCreateFederationConsumer(FederationConsumerInfo info);
/**
* Signal any registered plugins for this federation instance that a remote Queue consumer
* has been created.
*
* @param consumer
* The {@link FederationConsumerInfo} that describes the remote Queue consumer
*/
protected abstract void signalAfterCreateFederationConsumer(FederationConsumer consumer);
/**
* Signal any registered plugins for this federation instance that a remote Queue consumer
* is about to be closed.
*
* @param consumer
* The {@link FederationConsumer} that that is about to be closed.
*/
protected abstract void signalBeforeCloseFederationConsumer(FederationConsumer consumer);
/**
* Signal any registered plugins for this federation instance that a remote Queue consumer
* has now been closed.
*
* @param consumer
* The {@link FederationConsumer} that that has been closed.
*/
protected abstract void signalAfterCloseFederationConsumer(FederationConsumer consumer);
/**
* Query all registered plugins for this federation instance to determine if any wish to
* prevent a federation consumer from being created for the given Queue.

View File

@ -4020,6 +4020,8 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
obj.add("name", brokerConnection.getName());
obj.add("protocol", brokerConnection.getProtocol());
obj.add("started", brokerConnection.isStarted());
obj.add("uri", brokerConnection.getConfiguration().getUri());
obj.add("connected", brokerConnection.isConnected());
connections.add(obj.build());
}
return connections.build().toString();

View File

@ -0,0 +1,177 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.management.impl;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanOperationInfo;
import javax.management.NotCompliantMBeanException;
import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.logs.AuditLogger;
public class BrokerConnectionControlImpl extends AbstractControl implements BrokerConnectionControl {
private final BrokerConnection brokerConnection;
public BrokerConnectionControlImpl(BrokerConnection brokerConnection,
StorageManager storageManager) throws NotCompliantMBeanException {
super(BrokerConnectionControl.class, storageManager);
this.brokerConnection = brokerConnection;
}
@Override
public boolean isStarted() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.isStarted(brokerConnection);
}
clearIO();
try {
return brokerConnection.isStarted();
} finally {
blockOnIO();
}
}
@Override
public boolean isConnected() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.isConnected(brokerConnection);
}
clearIO();
try {
return brokerConnection.isConnected();
} finally {
blockOnIO();
}
}
@Override
public void start() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.startBrokerConnection(brokerConnection.getName());
}
clearIO();
try {
brokerConnection.start();
} finally {
blockOnIO();
}
}
@Override
public void stop() throws Exception {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.stopBrokerConnection(brokerConnection.getName());
}
clearIO();
try {
brokerConnection.stop();
} finally {
blockOnIO();
}
}
@Override
public String getName() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getName(brokerConnection);
}
clearIO();
try {
return brokerConnection.getName();
} finally {
blockOnIO();
}
}
@Override
public String getUri() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getUri(brokerConnection);
}
clearIO();
try {
return brokerConnection.getConfiguration().getUri();
} finally {
blockOnIO();
}
}
@Override
public String getUser() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getUser(brokerConnection);
}
clearIO();
try {
return brokerConnection.getConfiguration().getUser();
} finally {
blockOnIO();
}
}
@Override
public String getProtocol() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getProtocol(brokerConnection);
}
clearIO();
try {
return brokerConnection.getProtocol();
} finally {
blockOnIO();
}
}
@Override
public long getRetryInterval() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getRetryInterval(brokerConnection);
}
clearIO();
try {
return brokerConnection.getConfiguration().getRetryInterval();
} finally {
blockOnIO();
}
}
@Override
public int getReconnectAttempts() {
if (AuditLogger.isBaseLoggingEnabled()) {
AuditLogger.getReconnectAttempts(brokerConnection);
}
clearIO();
try {
return brokerConnection.getConfiguration().getReconnectAttempts();
} finally {
blockOnIO();
}
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(BrokerConnectionControl.class);
}
@Override
protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(BrokerConnectionControl.class);
}
}

View File

@ -30,6 +30,11 @@ public interface BrokerConnection extends ActiveMQComponent {
*/
String getProtocol();
/**
* @return <code>true</code> if the broker connection is currently connected to the remote.
*/
boolean isConnected();
/**
* @return the configuration that was used to create this broker connection.
*/

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
@ -132,6 +133,10 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
void unregisterConnectionRouter(String name) throws Exception;
void registerBrokerConnection(BrokerConnection brokerConnection) throws Exception;
void unregisterBrokerConnection(String name) throws Exception;
Object getResource(String resourceName);
Object[] getResources(Class<?> resourceType);
@ -145,4 +150,5 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
Object getAttribute(String resourceName, String attribute, SecurityAuth auth);
Object invokeOperation(String resourceName, String operation, Object[] params, SecurityAuth auth) throws Exception;
}

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl;
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.api.core.management.ConnectionRouterControl;
import org.apache.activemq.artemis.api.core.management.DivertControl;
@ -67,6 +68,7 @@ import org.apache.activemq.artemis.core.management.impl.AddressControlImpl;
import org.apache.activemq.artemis.core.management.impl.BaseBroadcastGroupControlImpl;
import org.apache.activemq.artemis.core.management.impl.BridgeControlImpl;
import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImpl;
import org.apache.activemq.artemis.core.management.impl.BrokerConnectionControlImpl;
import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl;
import org.apache.activemq.artemis.core.management.impl.ConnectionRouterControlImpl;
import org.apache.activemq.artemis.core.management.impl.DivertControlImpl;
@ -88,6 +90,7 @@ import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
@ -418,6 +421,19 @@ public class ManagementServiceImpl implements ManagementService {
unregisterFromRegistry(ResourceNames.BROADCAST_GROUP + name);
}
@Override
public void registerBrokerConnection(BrokerConnection brokerConnection) throws Exception {
BrokerConnectionControl control = new BrokerConnectionControlImpl(brokerConnection, storageManager);
registerInJMX(objectNameBuilder.getBrokerConnectionObjectName(brokerConnection.getName()), control);
registerInRegistry(ResourceNames.BROKER_CONNECTION + brokerConnection.getName(), control);
}
@Override
public void unregisterBrokerConnection(String name) throws Exception {
unregisterFromJMX(objectNameBuilder.getBrokerConnectionObjectName(name));
unregisterFromRegistry(ResourceNames.BROKER_CONNECTION + name);
}
@Override
public void registerBridge(final Bridge bridge) throws Exception {
bridge.setNotificationService(this);

View File

@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.security.SecurityAuth;
import org.apache.activemq.artemis.core.security.SecurityStore;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.BrokerConnection;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
@ -372,6 +373,16 @@ public class ClusteredResetMockTest extends ServerTestBase {
return null;
}
@Override
public void registerBrokerConnection(BrokerConnection brokerConnection) {
}
@Override
public void unregisterBrokerConnection(String name) {
}
@Override
public void start() throws Exception {
@ -408,5 +419,4 @@ public class ClusteredResetMockTest extends ServerTestBase {
}
}
}

View File

@ -48,6 +48,7 @@ import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPF
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.hamcrest.CoreMatchers.containsString;
@ -70,6 +71,8 @@ import javax.jms.Session;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
@ -156,6 +159,75 @@ public class AMQPFederationConnectTest extends AmqpClientTestSupport {
}
}
@Test
@Timeout(20)
public void testBrokerConnectsAndCreateManagementResource() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLPlainConnect("testUser", "pass", "PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Connect test started, peer listening on: {}", remoteURI);
// No user or pass given, it will have to select ANONYMOUS even though PLAIN also offered
AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.setRetryInterval(100);
amqpConnection.setUser("testUser");
amqpConnection.setPassword("pass");
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final BrokerConnectionControl brokerConnection = (BrokerConnectionControl)
server.getManagementService().getResource(ResourceNames.BROKER_CONNECTION + getTestName());
assertNotNull(brokerConnection);
assertTrue(brokerConnection.isConnected());
assertTrue(brokerConnection.isStarted());
assertNotNull(brokerConnection.getUri());
assertEquals(getTestName(), brokerConnection.getName());
assertEquals("testUser", brokerConnection.getUser());
assertEquals(0, brokerConnection.getReconnectAttempts());
assertEquals(100, brokerConnection.getRetryInterval());
assertEquals("AMQP", brokerConnection.getProtocol());
peer.expectClose().optional();
peer.expectConnectionToDrop();
brokerConnection.stop();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Reflects state change but no NPE when connection is stopped.
assertFalse(brokerConnection.isConnected());
assertFalse(brokerConnection.isStarted());
assertNotNull(brokerConnection.getUri());
assertEquals(getTestName(), brokerConnection.getName());
assertEquals("testUser", brokerConnection.getUser());
assertEquals(0, brokerConnection.getReconnectAttempts());
assertEquals(100, brokerConnection.getRetryInterval());
assertEquals("AMQP", brokerConnection.getProtocol());
peer.expectSASLPlainConnect("testUser", "pass", "PLAIN", "ANONYMOUS");
peer.expectOpen().respond();
peer.expectBegin().respond();
brokerConnection.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
assertTrue(brokerConnection.isConnected());
assertTrue(brokerConnection.isStarted());
peer.close();
}
}
@Test
@Timeout(20)
public void testFederationConfiguredCreatesControlLink() throws Exception {

View File

@ -0,0 +1,559 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_ADDRESS_RECEIVER;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_CONTROL_LINK;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_EVENT_LINK;
import static org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConstants.FEDERATION_QUEUE_RECEIVER;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.lang.invoke.MethodHandles;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.BrokerConnectionControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederatedBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationAddressPolicyElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPFederationQueuePolicyElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationAddressPolicyControl;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationConsumerControl;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationManagementSupport;
import org.apache.activemq.artemis.protocol.amqp.connect.federation.AMQPFederationQueuePolicyControl;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.qpid.protonj2.test.driver.ProtonTestServer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Tests that the broker create management objects for federation configurations.
*/
class AMQPFederationManagementTest extends AmqpClientTestSupport {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Override
protected String getConfiguredProtocols() {
return "AMQP,CORE";
}
@Override
protected ActiveMQServer createServer() throws Exception {
// Creates the broker used to make the outgoing connection. The port passed is for
// that brokers acceptor. The test server connected to by the broker binds to a random port.
return createServer(AMQP_PORT, false);
}
@Test
@Timeout(20)
public void testFederationCreatesManagementResourcesForAddressPolicyConfigurations() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withSenderSettleModeSettled()
.withSource().withDynamic(true)
.and()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind()
.withTarget().withAddress("test-dynamic-events");
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.addToIncludes(getTestName());
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalAddressPolicy(receiveFromAddress);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_ADDRESS_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.MULTICAST)
.setAddress(getTestName())
.setAutoCreated(false));
Wait.assertTrue(() -> server.queueQuery(SimpleString.of(getTestName())).isExists());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final BrokerConnectionControl brokerConnection = (BrokerConnectionControl)
server.getManagementService().getResource(ResourceNames.BROKER_CONNECTION + getTestName());
assertNotNull(brokerConnection);
assertTrue(brokerConnection.isConnected());
final String policyResourceName = AMQPFederationManagementSupport.getFederationPolicyResourceName("address-policy");
final String consumerResourceName = AMQPFederationManagementSupport.getFederationAddressConsumerResourceName("address-policy", getTestName());
final AMQPFederationAddressPolicyControl addressPolicyControl =
(AMQPFederationAddressPolicyControl) server.getManagementService().getResource(policyResourceName);
assertNotNull(addressPolicyControl);
assertEquals("address-policy", addressPolicyControl.getName());
assertEquals(0, addressPolicyControl.getMessagesReceived());
final AMQPFederationConsumerControl consumerControl = (AMQPFederationConsumerControl)
server.getManagementService().getResource(consumerResourceName);
assertNotNull(consumerControl);
assertEquals(getTestName(), consumerControl.getAddress());
assertEquals(0, consumerControl.getMessagesReceived());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
brokerConnection.stop();
// Stopping the connection should remove the federation management objects.
Wait.assertTrue(() -> server.getManagementService().getResource(policyResourceName) == null, 5_000, 100);
Wait.assertTrue(() -> server.getManagementService().getResource(consumerResourceName) == null, 5_000, 100);
peer.close();
}
}
@Test
@Timeout(20)
public void testFederationCreatesManagementResourcesForQueuePolicyConfigurations() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respond()
.withOfferedCapabilities(FEDERATION_CONTROL_LINK.toString());
peer.expectAttach().ofReceiver()
.withSenderSettleModeSettled()
.withSource().withDynamic(true)
.and()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind()
.withTarget().withAddress("test-dynamic-events");
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement();
receiveFromQueue.setName("queue-policy");
receiveFromQueue.addToIncludes("#", getTestName());
receiveFromQueue.setPriorityAdjustment(1);
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalQueuePolicy(receiveFromQueue);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("queue-receiver"),
containsString(server.getNodeID().toString())))
.respond()
.withOfferedCapabilities(FEDERATION_QUEUE_RECEIVER.toString());
peer.expectFlow().withLinkCredit(1000);
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
.setAddress(getTestName())
.setAutoCreated(false));
Wait.assertTrue(() -> server.queueQuery(SimpleString.of(getTestName())).isExists());
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
session.createConsumer(session.createQueue(getTestName()));
connection.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final String policyResourceName = AMQPFederationManagementSupport.getFederationPolicyResourceName("queue-policy");
final String consumerResourceName = AMQPFederationManagementSupport.getFederationAddressConsumerResourceName("queue-policy", getTestName() + "::" + getTestName());
final BrokerConnectionControl brokerConnection = (BrokerConnectionControl)
server.getManagementService().getResource(ResourceNames.BROKER_CONNECTION + getTestName());
assertNotNull(brokerConnection);
assertTrue(brokerConnection.isConnected());
final AMQPFederationQueuePolicyControl queuePolicyControl =
(AMQPFederationQueuePolicyControl) server.getManagementService().getResource(policyResourceName);
assertNotNull(queuePolicyControl);
assertEquals("queue-policy", queuePolicyControl.getName());
assertEquals(0, queuePolicyControl.getMessagesReceived());
final AMQPFederationConsumerControl consumerControl =
(AMQPFederationConsumerControl) server.getManagementService().getResource(consumerResourceName);
assertNotNull(consumerControl);
assertEquals(getTestName(), consumerControl.getAddress());
assertEquals(getTestName(), consumerControl.getQueueName());
assertEquals(getTestName() + "::" + getTestName(), consumerControl.getFqqn());
assertEquals(0, consumerControl.getMessagesReceived());
assertEquals(1, consumerControl.getPriority());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
brokerConnection.stop();
// Stopping the connection should remove the federation management objects.
Wait.assertTrue(() -> server.getManagementService().getResource(policyResourceName) == null, 5_000, 100);
Wait.assertTrue(() -> server.getManagementService().getResource(consumerResourceName) == null, 5_000, 100);
peer.close();
}
}
}
@Test
@Timeout(20)
public void testAddressManagementTracksMessagesAtPolicyAndConsumerLevels() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respondInKind();
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationAddressPolicyElement receiveFromAddress = new AMQPFederationAddressPolicyElement();
receiveFromAddress.setName("address-policy");
receiveFromAddress.addToIncludes(getTestName());
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalAddressPolicy(receiveFromAddress);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
server.addAddressInfo(new AddressInfo(SimpleString.of(getTestName()), RoutingType.MULTICAST));
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.respondInKind();
peer.expectFlow().withLinkCredit(1000);
peer.remoteTransfer().withBody().withString("test-message")
.also()
.withDeliveryId(0)
.queue();
peer.expectDisposition().withSettled(true).withState().accepted();
final String policyResourceName = AMQPFederationManagementSupport.getFederationPolicyResourceName("address-policy");
final String consumerResourceName = AMQPFederationManagementSupport.getFederationAddressConsumerResourceName("address-policy", getTestName());
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(session.createTopic(getTestName()));
connection.start();
final Message message = consumer.receive(5_000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
assertEquals("test-message", ((TextMessage) message).getText());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final AMQPFederationAddressPolicyControl addressPolicyControl = (AMQPFederationAddressPolicyControl)
server.getManagementService().getResource(policyResourceName);
final AMQPFederationConsumerControl consumerControl = (AMQPFederationConsumerControl)
server.getManagementService().getResource(consumerResourceName);
assertEquals(1, addressPolicyControl.getMessagesReceived());
assertEquals(1, consumerControl.getMessagesReceived());
peer.expectDetach().respond(); // demand will be gone and receiver link should close.
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Policy bean should still be active but consumer bean should be unregistered
{
final AMQPFederationAddressPolicyControl addressPolicyControl = (AMQPFederationAddressPolicyControl)
server.getManagementService().getResource(policyResourceName);
final AMQPFederationConsumerControl consumerControl = (AMQPFederationConsumerControl)
server.getManagementService().getResource(consumerResourceName);
assertNotNull(addressPolicyControl);
assertNull(consumerControl);
}
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_ADDRESS_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("address-receiver"),
containsString(server.getNodeID().toString())))
.respondInKind();
peer.expectFlow().withLinkCredit(1000);
peer.remoteTransfer().withBody().withString("test-message")
.also()
.withDeliveryId(1)
.queue();
peer.expectDisposition().withSettled(true).withState().accepted();
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(session.createTopic(getTestName()));
connection.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final Message message = consumer.receive(5_000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
assertEquals("test-message", ((TextMessage) message).getText());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final AMQPFederationAddressPolicyControl addressPolicyControl = (AMQPFederationAddressPolicyControl)
server.getManagementService().getResource(policyResourceName);
final AMQPFederationConsumerControl consumerControl = (AMQPFederationConsumerControl)
server.getManagementService().getResource(consumerResourceName);
assertEquals(2, addressPolicyControl.getMessagesReceived());
assertEquals(1, consumerControl.getMessagesReceived());
peer.expectDetach().respond(); // demand will be gone and receiver link should close.
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
@Test
@Timeout(20)
public void testQueueManagementTracksMessagesAtPolicyAndConsumerLevels() throws Exception {
try (ProtonTestServer peer = new ProtonTestServer()) {
peer.expectSASLAnonymousConnect();
peer.expectOpen().respond();
peer.expectBegin().respond();
peer.expectAttach().ofSender()
.withDesiredCapability(FEDERATION_CONTROL_LINK.toString())
.respondInKind();
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_EVENT_LINK.toString())
.respondInKind();
peer.expectFlow().withLinkCredit(10);
peer.start();
final URI remoteURI = peer.getServerURI();
logger.info("Test started, peer listening on: {}", remoteURI);
final AMQPFederationQueuePolicyElement receiveFromQueue = new AMQPFederationQueuePolicyElement();
receiveFromQueue.setName("queue-policy");
receiveFromQueue.addToIncludes(getTestName(), getTestName());
final AMQPFederatedBrokerConnectionElement element = new AMQPFederatedBrokerConnectionElement();
element.setName(getTestName());
element.addLocalQueuePolicy(receiveFromQueue);
final AMQPBrokerConnectConfiguration amqpConnection =
new AMQPBrokerConnectConfiguration(getTestName(), "tcp://" + remoteURI.getHost() + ":" + remoteURI.getPort());
amqpConnection.setReconnectAttempts(0);// No reconnects
amqpConnection.addElement(element);
server.getConfiguration().addAMQPConnection(amqpConnection);
server.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("queue-receiver"),
containsString(server.getNodeID().toString())))
.respondInKind();
peer.expectFlow().withLinkCredit(1000);
peer.remoteTransfer().withBody().withString("test-message")
.also()
.withDeliveryId(0)
.queue();
peer.expectDisposition().withSettled(true).withState().accepted();
server.createQueue(QueueConfiguration.of(getTestName()).setRoutingType(RoutingType.ANYCAST)
.setAddress(getTestName())
.setAutoCreated(false));
final String policyResourceName = AMQPFederationManagementSupport.getFederationPolicyResourceName("queue-policy");
final String consumerResourceName = AMQPFederationManagementSupport.getFederationAddressConsumerResourceName("queue-policy", getTestName() + "::" + getTestName());
final ConnectionFactory factory = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(session.createQueue(getTestName()));
connection.start();
final Message message = consumer.receive(5_000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
assertEquals("test-message", ((TextMessage) message).getText());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final AMQPFederationQueuePolicyControl queuePolicyControl = (AMQPFederationQueuePolicyControl)
server.getManagementService().getResource(policyResourceName);
final AMQPFederationConsumerControl consumerControl = (AMQPFederationConsumerControl)
server.getManagementService().getResource(consumerResourceName);
assertEquals(1, queuePolicyControl.getMessagesReceived());
assertEquals(1, consumerControl.getMessagesReceived());
peer.expectDetach().respond(); // demand will be gone and receiver link should close.
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
// Policy bean should still be active but consumer bean should be unregistered
{
final AMQPFederationQueuePolicyControl queuePolicyControl = (AMQPFederationQueuePolicyControl)
server.getManagementService().getResource(policyResourceName);
final AMQPFederationConsumerControl consumerControl = (AMQPFederationConsumerControl)
server.getManagementService().getResource(consumerResourceName);
assertNotNull(queuePolicyControl);
assertNull(consumerControl);
}
peer.expectAttach().ofReceiver()
.withDesiredCapability(FEDERATION_QUEUE_RECEIVER.toString())
.withName(allOf(containsString(getTestName()),
containsString("queue-receiver"),
containsString(server.getNodeID().toString())))
.respondInKind();
peer.expectFlow().withLinkCredit(1000);
peer.remoteTransfer().withBody().withString("test-message")
.also()
.withDeliveryId(1)
.queue();
peer.expectDisposition().withSettled(true).withState().accepted();
try (Connection connection = factory.createConnection()) {
final Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = session.createConsumer(session.createQueue(getTestName()));
connection.start();
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final Message message = consumer.receive(5_000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
assertEquals("test-message", ((TextMessage) message).getText());
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
final AMQPFederationQueuePolicyControl queuePolicyControl = (AMQPFederationQueuePolicyControl)
server.getManagementService().getResource(policyResourceName);
final AMQPFederationConsumerControl consumerControl = (AMQPFederationConsumerControl)
server.getManagementService().getResource(consumerResourceName);
assertEquals(2, queuePolicyControl.getMessagesReceived());
assertEquals(1, consumerControl.getMessagesReceived());
peer.expectDetach().respond(); // demand will be gone and receiver link should close.
}
peer.waitForScriptToComplete(5, TimeUnit.SECONDS);
peer.close();
}
}
}

View File

@ -6151,6 +6151,12 @@ public class ActiveMQServerControlTest extends ManagementTestBase {
public BrokerConnectConfiguration getConfiguration() {
return null;
}
@Override
public boolean isConnected() {
return false;
}
}
Fake fake = new Fake("fake" + UUIDGenerator.getInstance().generateStringUUID());
server.registerBrokerConnection(fake);