ARTEMIS-3660 - rename broker-balancer to connection-router

This commit is contained in:
gtully 2022-01-28 13:47:53 +00:00 committed by Bruscino Domenico Francesco
parent 6438d6a7a8
commit dcaebfb24e
149 changed files with 899 additions and 896 deletions

View File

@ -268,10 +268,10 @@ public enum ActiveMQExceptionType {
return new ActiveMQDivertDoesNotExistException(msg);
}
},
REDIRECTED(222) {
ROUTING_EXCEPTION(222) {
@Override
public ActiveMQException createException(String msg) {
return new ActiveMQRedirectedException(msg);
return new ActiveMQRoutingException(msg);
}
};
private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

View File

@ -19,16 +19,16 @@ package org.apache.activemq.artemis.api.core;
/**
* A client was redirected.
*/
public final class ActiveMQRedirectedException extends ActiveMQException {
public final class ActiveMQRoutingException extends ActiveMQException {
private static final long serialVersionUID = 7414966383933311627L;
public ActiveMQRedirectedException() {
super(ActiveMQExceptionType.REDIRECTED);
public ActiveMQRoutingException() {
super(ActiveMQExceptionType.ROUTING_EXCEPTION);
}
public ActiveMQRedirectedException(String message) {
super(ActiveMQExceptionType.REDIRECTED, message);
public ActiveMQRoutingException(String message) {
super(ActiveMQExceptionType.ROUTING_EXCEPTION, message);
}
}

View File

@ -20,9 +20,9 @@ import javax.management.MBeanOperationInfo;
import javax.management.openmbean.CompositeData;
/**
* A BrokerBalancerControl is used to manage a BrokerBalancer.
* A ConnectionRouterControl is used to manage a ConnectionRouter
*/
public interface BrokerBalancerControl {
public interface ConnectionRouterControl {
@Operation(desc = "Get the target associated with key", impact = MBeanOperationInfo.INFO)
CompositeData getTarget(@Parameter(desc = "a key", name = "key") String key) throws Exception;

View File

@ -145,12 +145,12 @@ public final class ObjectNameBuilder {
}
/**
* Returns the ObjectName used by BrokerBalancerControl.
* Returns the ObjectName used by ConnectionRouterControl.
*
* @see BrokerBalancerControl
* @see ConnectionRouterControl
*/
public ObjectName getBrokerBalancerObjectName(final String name) throws Exception {
return createObjectName("broker-balancer", name);
public ObjectName getConnectionRouterObjectName(final String name) throws Exception {
return createObjectName("connection-router", name);
}
private ObjectName createObjectName(final String type, final String name) throws Exception {

View File

@ -44,7 +44,7 @@ public final class ResourceNames {
public static final String BROADCAST_GROUP = "broadcastgroup.";
public static final String BROKER_BALANCER = "brokerbalancer.";
public static final String CONNECTION_ROUTER = "connectionrouter.";
public static final String RETROACTIVE_SUFFIX = "retro";

View File

@ -27,7 +27,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageException;
import org.apache.activemq.artemis.api.core.ActiveMQLargeMessageInterruptedException;
import org.apache.activemq.artemis.api.core.ActiveMQNotConnectedException;
import org.apache.activemq.artemis.api.core.ActiveMQObjectClosedException;
import org.apache.activemq.artemis.api.core.ActiveMQRedirectedException;
import org.apache.activemq.artemis.api.core.ActiveMQRoutingException;
import org.apache.activemq.artemis.api.core.ActiveMQTransactionOutcomeUnknownException;
import org.apache.activemq.artemis.api.core.ActiveMQTransactionRolledBackException;
import org.apache.activemq.artemis.api.core.ActiveMQUnBlockedException;
@ -240,5 +240,5 @@ public interface ActiveMQClientMessageBundle {
RuntimeException failedToHandlePacket(@Cause Exception e);
@Message(id = 219066, value = "The connection was redirected")
ActiveMQRedirectedException redirected();
ActiveMQRoutingException redirected();
}

View File

@ -33,7 +33,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQRedirectedException;
import org.apache.activemq.artemis.api.core.ActiveMQRoutingException;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.QueueAttributes;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
@ -1461,8 +1461,8 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
sessionContext.returnBlocking(cause);
}
} catch (ActiveMQRedirectedException e) {
logger.info("failedToHandleFailover.ActiveMQRedirectedException");
} catch (ActiveMQRoutingException e) {
logger.info("failedToHandleFailover.ActiveMQRoutingException");
suc = false;
} catch (Throwable t) {
ActiveMQClientLogger.LOGGER.failedToHandleFailover(t);

View File

@ -56,7 +56,7 @@ public interface CoreRemotingConnection extends RemotingConnection {
return version >= PacketImpl.ARTEMIS_2_18_0_VERSION;
}
default boolean isVersionSupportRedirect() {
default boolean isVersionSupportRouting() {
int version = getChannelVersion();
return version >= PacketImpl.ARTEMIS_2_18_0_VERSION;
}

View File

@ -309,7 +309,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager {
throw cause;
if (cause.getType() == ActiveMQExceptionType.UNBLOCKED ||
cause.getType() == ActiveMQExceptionType.REDIRECTED) {
cause.getType() == ActiveMQExceptionType.ROUTING_EXCEPTION) {
// This means the thread was blocked on create session and failover unblocked it
// so failover could occur

View File

@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQRedirectedException;
import org.apache.activemq.artemis.api.core.ActiveMQRoutingException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.DisconnectReason;
import org.apache.activemq.artemis.api.core.Interceptor;
@ -208,7 +208,7 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement
destroyed = true;
}
if (!(me instanceof ActiveMQRemoteDisconnectException) && !(me instanceof ActiveMQRedirectedException)) {
if (!(me instanceof ActiveMQRemoteDisconnectException) && !(me instanceof ActiveMQRoutingException)) {
ActiveMQClientLogger.LOGGER.connectionFailureDetected(transportConnection.getRemoteAddress(), me.getMessage(), me.getType());
}

View File

@ -363,9 +363,9 @@ public class TransportConstants {
public static final boolean DEFAULT_PROXY_REMOTE_DNS = false;
public static final String REDIRECT_TO = "redirect-to";
public static final String ROUTER = "router";
public static final String DEFAULT_REDIRECT_TO = null;
public static final String DEFAULT_ROUTER = null;
private static int parseDefaultVariable(String variableName, int defaultValue) {
try {
@ -444,7 +444,7 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.QUIET_PERIOD);
allowableAcceptorKeys.add(TransportConstants.DISABLE_STOMP_SERVER_HEADER);
allowableAcceptorKeys.add(TransportConstants.AUTO_START);
allowableAcceptorKeys.add(TransportConstants.REDIRECT_TO);
allowableAcceptorKeys.add(TransportConstants.ROUTER);
ALLOWABLE_ACCEPTOR_KEYS = Collections.unmodifiableSet(allowableAcceptorKeys);

View File

@ -177,7 +177,7 @@ public interface Connection {
return null;
}
default String getRedirectTo() {
default String getRouter() {
return null;
}
}

View File

@ -43,7 +43,7 @@ public class OperationAnnotationTest {
{AcceptorControl.class},
{ClusterConnectionControl.class},
{BroadcastGroupControl.class},
{BrokerBalancerControl.class}});
{ConnectionRouterControl.class}});
}
private Class<?> managementClass;

View File

@ -36,7 +36,7 @@ import org.apache.activemq.artemis.protocol.amqp.client.ProtonClientProtocolMana
import org.apache.activemq.artemis.protocol.amqp.connect.mirror.ReferenceNodeStore;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConnectionContext;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPConstants;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRedirectHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.AmqpSupport;
import org.apache.activemq.artemis.protocol.amqp.sasl.ClientSASLFactory;
import org.apache.activemq.artemis.protocol.amqp.sasl.MechanismFinder;
@ -54,7 +54,7 @@ import org.jboss.logging.Logger;
/**
* A proton protocol manager, basically reads the Proton Input and maps proton resources to ActiveMQ Artemis resources
*/
public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, AmqpInterceptor, ActiveMQProtonRemotingConnection, AMQPRedirectHandler> implements NotificationListener {
public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage, AmqpInterceptor, ActiveMQProtonRemotingConnection, AMQPRoutingHandler> implements NotificationListener {
private static final Logger logger = Logger.getLogger(ProtonProtocolManager.class);
@ -106,7 +106,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
private boolean directDeliver = true;
private final AMQPRedirectHandler redirectHandler;
private final AMQPRoutingHandler routingHandler;
/*
* used when you want to treat senders as a subscription on an address rather than consuming from the actual queue for
@ -120,7 +120,7 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
this.factory = factory;
this.server = server;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
redirectHandler = new AMQPRedirectHandler(server);
routingHandler = new AMQPRoutingHandler(server);
}
public synchronized ReferenceNodeStore getReferenceIDSupplier() {
@ -348,8 +348,8 @@ public class ProtonProtocolManager extends AbstractProtocolManager<AMQPMessage,
}
@Override
public AMQPRedirectHandler getRedirectHandler() {
return redirectHandler;
public AMQPRoutingHandler getRoutingHandler() {
return routingHandler;
}
public String invokeIncoming(AMQPMessage message, ActiveMQProtonRemotingConnection connection) {

View File

@ -548,8 +548,8 @@ public class AMQPConnectionContext extends ProtonInitializable implements EventH
log.error("Error init connection", e);
}
if (!validateUser(connection) || (connectionCallback.getTransportConnection().getRedirectTo() != null
&& protocolManager.getRedirectHandler().redirect(this, connection)) || !validateConnection(connection)) {
if (!validateUser(connection) || (connectionCallback.getTransportConnection().getRouter() != null
&& protocolManager.getRoutingHandler().route(this, connection)) || !validateConnection(connection)) {
connection.close();
} else {
connection.setContext(AMQPConnectionContext.this);

View File

@ -17,10 +17,10 @@
package org.apache.activemq.artemis.protocol.amqp.proton;
import org.apache.activemq.artemis.core.server.balancing.RedirectContext;
import org.apache.activemq.artemis.core.server.routing.RoutingContext;
import org.apache.qpid.proton.engine.Connection;
public class AMQPRedirectContext extends RedirectContext {
public class AMQPRoutingContext extends RoutingContext {
private final Connection protonConnection;
@ -29,7 +29,7 @@ public class AMQPRedirectContext extends RedirectContext {
}
public AMQPRedirectContext(AMQPConnectionContext connectionContext, Connection protonConnection) {
public AMQPRoutingContext(AMQPConnectionContext connectionContext, Connection protonConnection) {
super(connectionContext.getConnectionCallback().getProtonConnectionDelegate(), connectionContext.getRemoteContainer(),
connectionContext.getSASLResult() != null ? connectionContext.getSASLResult().getUser() : null);
this.protonConnection = protonConnection;

View File

@ -19,53 +19,54 @@ package org.apache.activemq.artemis.protocol.amqp.proton;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
import org.apache.activemq.artemis.core.server.routing.RoutingHandler;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.ConnectionError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
import org.apache.qpid.proton.engine.Connection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
public class AMQPRoutingHandler extends RoutingHandler<AMQPRoutingContext> {
public AMQPRedirectHandler(ActiveMQServer server) {
public AMQPRoutingHandler(ActiveMQServer server) {
super(server);
}
public boolean redirect(AMQPConnectionContext connectionContext, Connection protonConnection) throws Exception {
return redirect(new AMQPRedirectContext(connectionContext, protonConnection));
public boolean route(AMQPConnectionContext connectionContext, Connection protonConnection) throws Exception {
return route(new AMQPRoutingContext(connectionContext, protonConnection));
}
@Override
protected void cannotRedirect(AMQPRedirectContext context) {
protected void refuse(AMQPRoutingContext context) {
ErrorCondition error = new ErrorCondition();
error.setCondition(ConnectionError.CONNECTION_FORCED);
switch (context.getResult().getStatus()) {
case REFUSED_USE_ANOTHER:
error.setDescription(String.format("Broker balancer %s, rejected this connection", context.getConnection().getTransportConnection().getRedirectTo()));
error.setDescription(String.format("Connection router %s rejected this connection", context.getRouter()));
break;
case REFUSED_UNAVAILABLE:
error.setDescription(String.format("Broker balancer %s is not ready to redirect", context.getConnection().getTransportConnection().getRedirectTo()));
error.setDescription(String.format("Connection router %s is not ready", context.getRouter()));
break;
}
Connection protonConnection = context.getProtonConnection();
protonConnection.setCondition(error);
addConnectionOpenFailureHint(protonConnection);
protonConnection.setProperties(Collections.singletonMap(AmqpSupport.CONNECTION_OPEN_FAILED, true));
}
@Override
protected void redirectTo(AMQPRedirectContext context) {
protected void redirect(AMQPRoutingContext context) {
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, context.getTarget().getConnector().getParams());
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, context.getTarget().getConnector().getParams());
ErrorCondition error = new ErrorCondition();
error.setCondition(ConnectionError.REDIRECT);
error.setDescription(String.format("Connection redirected to %s:%d by broker balancer %s", host, port, context.getConnection().getTransportConnection().getRedirectTo()));
error.setDescription(String.format("Connection router %s redirected this connection to %s:%d", context.getRouter(), host, port));
Map<Symbol, Object> info = new HashMap<>();
info.put(AmqpSupport.NETWORK_HOST, host);
info.put(AmqpSupport.PORT, port);
@ -73,13 +74,6 @@ public class AMQPRedirectHandler extends RedirectHandler<AMQPRedirectContext> {
Connection protonConnection = context.getProtonConnection();
protonConnection.setCondition(error);
addConnectionOpenFailureHint(protonConnection);
}
private void addConnectionOpenFailureHint(Connection connection) {
Map<Symbol, Object> connProps = new HashMap<>();
connProps.put(AmqpSupport.CONNECTION_OPEN_FAILED, true);
connection.setProperties(connProps);
protonConnection.setProperties(Collections.singletonMap(AmqpSupport.CONNECTION_OPEN_FAILED, true));
}
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.sasl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRedirectHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@ -31,7 +31,7 @@ public class AnonymousServerSASLFactory implements ServerSASLFactory {
}
@Override
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRedirectHandler> manager, Connection connection,
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRoutingHandler> manager, Connection connection,
RemotingConnection remotingConnection) {
return new AnonymousServerSASL();
}

View File

@ -21,7 +21,7 @@ import java.security.Principal;
import org.apache.activemq.artemis.core.remoting.CertificateUtil;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRedirectHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@ -40,7 +40,7 @@ public class ExternalServerSASLFactory implements ServerSASLFactory {
}
@Override
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRedirectHandler> manager, Connection connection,
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRoutingHandler> manager, Connection connection,
RemotingConnection remotingConnection) {
// validate ssl cert present
Principal principal = CertificateUtil.getPeerPrincipalFromConnection(remotingConnection);

View File

@ -19,7 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.sasl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRedirectHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@ -35,7 +35,7 @@ public class GSSAPIServerSASLFactory implements ServerSASLFactory {
}
@Override
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRedirectHandler> manager, Connection connection,
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRoutingHandler> manager, Connection connection,
RemotingConnection remotingConnection) {
if (manager instanceof ProtonProtocolManager) {
GSSAPIServerSASL gssapiServerSASL = new GSSAPIServerSASL();

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.sasl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRedirectHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@ -31,7 +31,7 @@ public class PlainServerSASLFactory implements ServerSASLFactory {
}
@Override
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRedirectHandler> manager, Connection connection,
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRoutingHandler> manager, Connection connection,
RemotingConnection remotingConnection) {
return new PlainSASL(server.getSecurityStore(), manager.getSecurityDomain(), connection.getProtocolConnection());
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.protocol.amqp.sasl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRedirectHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
@ -41,7 +41,7 @@ public interface ServerSASLFactory {
* @param remotingConnection
* @return a new instance of {@link ServerSASL} that implements the provided mechanism
*/
ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRedirectHandler> manager, Connection connection,
ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRoutingHandler> manager, Connection connection,
RemotingConnection remotingConnection);
/**

View File

@ -31,7 +31,7 @@ import javax.security.auth.login.LoginException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.protocol.amqp.broker.AmqpInterceptor;
import org.apache.activemq.artemis.protocol.amqp.broker.ProtonProtocolManager;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRedirectHandler;
import org.apache.activemq.artemis.protocol.amqp.proton.AMQPRoutingHandler;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASL;
import org.apache.activemq.artemis.protocol.amqp.sasl.ServerSASLFactory;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
@ -68,7 +68,7 @@ public abstract class SCRAMServerSASLFactory implements ServerSASLFactory {
}
@Override
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRedirectHandler> manager, Connection connection,
public ServerSASL create(ActiveMQServer server, ProtocolManager<AmqpInterceptor, AMQPRoutingHandler> manager, Connection connection,
RemotingConnection remotingConnection) {
try {
if (manager instanceof ProtonProtocolManager) {

View File

@ -241,7 +241,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
return;
}
if (connection.getTransportConnection().getRedirectTo() == null || !protocolManager.getRedirectHandler().redirect(connection, session, connect)) {
if (connection.getTransportConnection().getRouter() == null || !protocolManager.getRoutingHandler().route(connection, session, connect)) {
/* [MQTT-3.1.2-2] Reject unsupported clients. */
int packetVersion = connect.variableHeader().version();
if (packetVersion != MqttVersion.MQTT_3_1.protocolLevel() &&

View File

@ -47,7 +47,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.jboss.logging.Logger;
public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInterceptor, MQTTConnection, MQTTRedirectHandler> implements NotificationListener {
public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQTTInterceptor, MQTTConnection, MQTTRoutingHandler> implements NotificationListener {
private static final Logger logger = Logger.getLogger(MQTTProtocolManager.class);
@ -72,7 +72,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
private int maximumPacketSize = MQTTUtil.DEFAULT_MAXIMUM_PACKET_SIZE;
private final MQTTRedirectHandler redirectHandler;
private final MQTTRoutingHandler routingHandler;
MQTTProtocolManager(ActiveMQServer server,
List<BaseInterceptor> incomingInterceptors,
@ -80,7 +80,7 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
this.server = server;
this.updateInterceptors(incomingInterceptors, outgoingInterceptors);
server.getManagementService().addNotificationListener(this);
redirectHandler = new MQTTRedirectHandler(server);
routingHandler = new MQTTRoutingHandler(server);
}
public int getDefaultMqttSessionExpiryInterval() {
@ -315,8 +315,8 @@ public class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage, MQ
}
@Override
public MQTTRedirectHandler getRedirectHandler() {
return redirectHandler;
public MQTTRoutingHandler getRoutingHandler() {
return routingHandler;
}
public String invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {

View File

@ -18,9 +18,9 @@
package org.apache.activemq.artemis.core.protocol.mqtt;
import io.netty.handler.codec.mqtt.MqttConnectMessage;
import org.apache.activemq.artemis.core.server.balancing.RedirectContext;
import org.apache.activemq.artemis.core.server.routing.RoutingContext;
public class MQTTRedirectContext extends RedirectContext {
public class MQTTRoutingContext extends RoutingContext {
private final MQTTSession mqttSession;
@ -30,7 +30,7 @@ public class MQTTRedirectContext extends RedirectContext {
}
public MQTTRedirectContext(MQTTConnection mqttConnection, MQTTSession mqttSession, MqttConnectMessage connect) {
public MQTTRoutingContext(MQTTConnection mqttConnection, MQTTSession mqttSession, MqttConnectMessage connect) {
super(mqttConnection, connect.payload().clientIdentifier(), connect.payload().userName());
this.mqttSession = mqttSession;
}

View File

@ -21,23 +21,23 @@ import io.netty.handler.codec.mqtt.MqttConnectMessage;
import io.netty.handler.codec.mqtt.MqttProperties;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
import org.apache.activemq.artemis.core.server.routing.RoutingHandler;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import static io.netty.handler.codec.mqtt.MqttProperties.MqttPropertyType.SERVER_REFERENCE;
public class MQTTRedirectHandler extends RedirectHandler<MQTTRedirectContext> {
public class MQTTRoutingHandler extends RoutingHandler<MQTTRoutingContext> {
protected MQTTRedirectHandler(ActiveMQServer server) {
protected MQTTRoutingHandler(ActiveMQServer server) {
super(server);
}
public boolean redirect(MQTTConnection mqttConnection, MQTTSession mqttSession, MqttConnectMessage connect) throws Exception {
return redirect(new MQTTRedirectContext(mqttConnection, mqttSession, connect));
public boolean route(MQTTConnection mqttConnection, MQTTSession mqttSession, MqttConnectMessage connect) throws Exception {
return route(new MQTTRoutingContext(mqttConnection, mqttSession, connect));
}
@Override
protected void cannotRedirect(MQTTRedirectContext context) {
protected void refuse(MQTTRoutingContext context) {
switch (context.getResult().getStatus()) {
case REFUSED_USE_ANOTHER:
context.getMQTTSession().getProtocolHandler().sendConnack(MQTTReasonCodes.USE_ANOTHER_SERVER);
@ -50,7 +50,7 @@ public class MQTTRedirectHandler extends RedirectHandler<MQTTRedirectContext> {
}
@Override
protected void redirectTo(MQTTRedirectContext context) {
protected void redirect(MQTTRoutingContext context) {
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, context.getTarget().getConnector().getParams());
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, context.getTarget().getConnector().getParams());

View File

@ -1189,8 +1189,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processAddConnection(ConnectionInfo info) throws Exception {
try {
protocolManager.validateUser(OpenWireConnection.this, info);
if (transportConnection.getRedirectTo() != null) {
if (protocolManager.getRedirectHandler().redirect(OpenWireConnection.this, info)) {
if (transportConnection.getRouter() != null) {
if (protocolManager.getRoutingHandler().route(OpenWireConnection.this, info)) {
shutdown(true);
return null;
}

View File

@ -83,7 +83,7 @@ import org.apache.activemq.util.LongSequenceGenerator;
import static org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil.SELECTOR_AWARE_OPTION;
public class OpenWireProtocolManager extends AbstractProtocolManager<Command, OpenWireInterceptor, OpenWireConnection, OpenWireRedirectHandler> implements ClusterTopologyListener {
public class OpenWireProtocolManager extends AbstractProtocolManager<Command, OpenWireInterceptor, OpenWireConnection, OpenWireRoutingHandler> implements ClusterTopologyListener {
private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
@ -143,7 +143,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
private final List<OpenWireInterceptor> incomingInterceptors = new ArrayList<>();
private final List<OpenWireInterceptor> outgoingInterceptors = new ArrayList<>();
private final OpenWireRedirectHandler redirectHandler;
private final OpenWireRoutingHandler routingHandler;
protected static class VirtualTopicConfig {
public int filterPathTerminus;
@ -195,7 +195,7 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
//make sure we don't cluster advisories
clusterManager.addProtocolIgnoredAddress(AdvisorySupport.ADVISORY_TOPIC_PREFIX);
redirectHandler = new OpenWireRedirectHandler(server, this);
routingHandler = new OpenWireRoutingHandler(server, this);
}
/** Is Duplicate detection enabled when used with failover clients. */
@ -681,8 +681,8 @@ public class OpenWireProtocolManager extends AbstractProtocolManager<Command, O
}
@Override
public OpenWireRedirectHandler getRedirectHandler() {
return redirectHandler;
public OpenWireRoutingHandler getRoutingHandler() {
return routingHandler;
}
@Override

View File

@ -17,10 +17,10 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.artemis.core.server.balancing.RedirectContext;
import org.apache.activemq.artemis.core.server.routing.RoutingContext;
import org.apache.activemq.command.ConnectionInfo;
public class OpenWireRedirectContext extends RedirectContext {
public class OpenWireRoutingContext extends RoutingContext {
private final OpenWireConnection openWireConnection;
@ -30,7 +30,7 @@ public class OpenWireRedirectContext extends RedirectContext {
}
public OpenWireRedirectContext(OpenWireConnection openWireConnection, ConnectionInfo connectionInfo) {
public OpenWireRoutingContext(OpenWireConnection openWireConnection, ConnectionInfo connectionInfo) {
super(openWireConnection.getRemotingConnection(), connectionInfo.getClientId(), connectionInfo.getUserName());
this.openWireConnection = openWireConnection;
}

View File

@ -19,34 +19,34 @@ package org.apache.activemq.artemis.core.protocol.openwire;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
import org.apache.activemq.artemis.core.server.routing.RoutingHandler;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.ConnectionInfo;
public class OpenWireRedirectHandler extends RedirectHandler<OpenWireRedirectContext> {
public class OpenWireRoutingHandler extends RoutingHandler<OpenWireRoutingContext> {
private final OpenWireProtocolManager protocolManager;
protected OpenWireRedirectHandler(ActiveMQServer server, OpenWireProtocolManager protocolManager) {
protected OpenWireRoutingHandler(ActiveMQServer server, OpenWireProtocolManager protocolManager) {
super(server);
this.protocolManager = protocolManager;
}
public boolean redirect(OpenWireConnection openWireConnection, ConnectionInfo connectionInfo) throws Exception {
public boolean route(OpenWireConnection openWireConnection, ConnectionInfo connectionInfo) throws Exception {
if (!connectionInfo.isFaultTolerant()) {
throw new java.lang.IllegalStateException("Client not fault tolerant");
}
return redirect(new OpenWireRedirectContext(openWireConnection, connectionInfo));
return route(new OpenWireRoutingContext(openWireConnection, connectionInfo));
}
@Override
protected void cannotRedirect(OpenWireRedirectContext context) throws Exception {
protected void refuse(OpenWireRoutingContext context) throws Exception {
}
@Override
protected void redirectTo(OpenWireRedirectContext context) throws Exception {
protected void redirect(OpenWireRoutingContext context) throws Exception {
String host = ConfigurationHelper.getStringProperty(TransportConstants.HOST_PROP_NAME, TransportConstants.DEFAULT_HOST, context.getTarget().getConnector().getParams());
int port = ConfigurationHelper.getIntProperty(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_PORT, context.getTarget().getConnector().getParams());

View File

@ -38,7 +38,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
import org.apache.activemq.artemis.core.server.routing.RoutingHandler;
import org.apache.activemq.artemis.logs.AuditLogger;
import org.apache.activemq.artemis.spi.core.protocol.AbstractProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
@ -53,7 +53,7 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
/**
* StompProtocolManager
*/
public class StompProtocolManager extends AbstractProtocolManager<StompFrame, StompFrameInterceptor, StompConnection, RedirectHandler> {
public class StompProtocolManager extends AbstractProtocolManager<StompFrame, StompFrameInterceptor, StompConnection, RoutingHandler> {
private static final List<String> websocketRegistryNames = Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp");
@ -186,7 +186,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
}
@Override
public RedirectHandler getRedirectHandler() {
public RoutingHandler getRoutingHandler() {
return null;
}

View File

@ -25,7 +25,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerFederationPlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerAddressPlugin;
@ -467,14 +467,14 @@ public interface Configuration {
/**
* Returns the redirects configured for this server.
*/
List<BrokerBalancerConfiguration> getBalancerConfigurations();
List<ConnectionRouterConfiguration> getConnectionRouters();
/**
* Sets the redirects configured for this server.
*/
Configuration setBalancerConfigurations(List<BrokerBalancerConfiguration> configs);
Configuration setConnectionRouters(List<ConnectionRouterConfiguration> configs);
Configuration addBalancerConfiguration(BrokerBalancerConfiguration config);
Configuration addConnectionRouter(ConnectionRouterConfiguration config);
/**
* Returns the cluster connections configured for this server.

View File

@ -55,7 +55,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
@ -182,7 +182,7 @@ public class ConfigurationImpl implements Configuration, Serializable {
protected List<DivertConfiguration> divertConfigurations = new ArrayList<>();
protected List<BrokerBalancerConfiguration> brokerBalancerConfigurations = new ArrayList<>();
protected List<ConnectionRouterConfiguration> connectionRouters = new ArrayList<>();
protected List<ClusterConnectionConfiguration> clusterConfigurations = new ArrayList<>();
@ -901,19 +901,19 @@ public class ConfigurationImpl implements Configuration, Serializable {
}
@Override
public List<BrokerBalancerConfiguration> getBalancerConfigurations() {
return brokerBalancerConfigurations;
public List<ConnectionRouterConfiguration> getConnectionRouters() {
return connectionRouters;
}
@Override
public ConfigurationImpl setBalancerConfigurations(final List<BrokerBalancerConfiguration> configs) {
brokerBalancerConfigurations = configs;
public ConfigurationImpl setConnectionRouters(final List<ConnectionRouterConfiguration> configs) {
connectionRouters = configs;
return this;
}
@Override
public ConfigurationImpl addBalancerConfiguration(final BrokerBalancerConfiguration config) {
brokerBalancerConfigurations.add(config);
public ConfigurationImpl addConnectionRouter(final ConnectionRouterConfiguration config) {
connectionRouters.add(config);
return this;
}

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.config.impl;
import java.util.EnumSet;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
@ -275,12 +275,12 @@ public final class Validators {
}
};
public static final Validator TARGET_KEY = new Validator() {
public static final Validator KEY_TYPE = new Validator() {
@Override
public void validate(final String name, final Object value) {
String val = (String) value;
if (val == null || !EnumSet.allOf(TargetKey.class).contains(TargetKey.valueOf(val))) {
throw ActiveMQMessageBundle.BUNDLE.invalidTargetKey(val);
if (val == null || !EnumSet.allOf(KeyType.class).contains(KeyType.valueOf(val))) {
throw ActiveMQMessageBundle.BUNDLE.invalidConnectionRouterKey(val);
}
}
};

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.balancing;
package org.apache.activemq.artemis.core.config.routing;
import java.io.Serializable;

View File

@ -14,17 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.balancing;
package org.apache.activemq.artemis.core.config.routing;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import java.io.Serializable;
public class BrokerBalancerConfiguration implements Serializable {
public class ConnectionRouterConfiguration implements Serializable {
private String name = null;
private TargetKey targetKey = TargetKey.SOURCE_IP;
private String targetKeyFilter = null;
private KeyType keyType = KeyType.SOURCE_IP;
private String keyFilter = null;
private String localTargetFilter = null;
private CacheConfiguration cacheConfiguration = null;
private PoolConfiguration poolConfiguration = null;
@ -35,26 +35,26 @@ public class BrokerBalancerConfiguration implements Serializable {
return name;
}
public BrokerBalancerConfiguration setName(String name) {
public ConnectionRouterConfiguration setName(String name) {
this.name = name;
return this;
}
public TargetKey getTargetKey() {
return targetKey;
public KeyType getKeyType() {
return keyType;
}
public BrokerBalancerConfiguration setTargetKey(TargetKey targetKey) {
this.targetKey = targetKey;
public ConnectionRouterConfiguration setKeyType(KeyType keyType) {
this.keyType = keyType;
return this;
}
public String getTargetKeyFilter() {
return targetKeyFilter;
public String getKeyFilter() {
return keyFilter;
}
public BrokerBalancerConfiguration setTargetKeyFilter(String targetKeyFilter) {
this.targetKeyFilter = targetKeyFilter;
public ConnectionRouterConfiguration setKeyFilter(String keyFilter) {
this.keyFilter = keyFilter;
return this;
}
@ -62,7 +62,7 @@ public class BrokerBalancerConfiguration implements Serializable {
return localTargetFilter;
}
public BrokerBalancerConfiguration setLocalTargetFilter(String localTargetFilter) {
public ConnectionRouterConfiguration setLocalTargetFilter(String localTargetFilter) {
this.localTargetFilter = localTargetFilter;
return this;
}
@ -71,7 +71,7 @@ public class BrokerBalancerConfiguration implements Serializable {
return cacheConfiguration;
}
public BrokerBalancerConfiguration setCacheConfiguration(CacheConfiguration cacheConfiguration) {
public ConnectionRouterConfiguration setCacheConfiguration(CacheConfiguration cacheConfiguration) {
this.cacheConfiguration = cacheConfiguration;
return this;
}
@ -80,7 +80,7 @@ public class BrokerBalancerConfiguration implements Serializable {
return policyConfiguration;
}
public BrokerBalancerConfiguration setPolicyConfiguration(NamedPropertyConfiguration policyConfiguration) {
public ConnectionRouterConfiguration setPolicyConfiguration(NamedPropertyConfiguration policyConfiguration) {
this.policyConfiguration = policyConfiguration;
return this;
}
@ -89,7 +89,7 @@ public class BrokerBalancerConfiguration implements Serializable {
return poolConfiguration;
}
public BrokerBalancerConfiguration setPoolConfiguration(PoolConfiguration poolConfiguration) {
public ConnectionRouterConfiguration setPoolConfiguration(PoolConfiguration poolConfiguration) {
this.poolConfiguration = poolConfiguration;
return this;
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.balancing;
package org.apache.activemq.artemis.core.config.routing;
import java.io.Serializable;
import java.util.Map;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.config.balancing;
package org.apache.activemq.artemis.core.config.routing;
import java.io.Serializable;
import java.util.List;

View File

@ -46,9 +46,9 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.config.routing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
@ -65,7 +65,7 @@ import org.apache.activemq.artemis.core.config.WildcardConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.routing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationAddressPolicyConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationDownstreamConfiguration;
import org.apache.activemq.artemis.core.config.federation.FederationPolicySet;
@ -92,9 +92,9 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactoryResolver;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.metrics.ActiveMQMetricsPlugin;
@ -634,16 +634,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
parseDivertConfiguration(dvNode, config);
}
NodeList ccBalancers = e.getElementsByTagName("broker-balancers");
NodeList ccConnectionRouters = e.getElementsByTagName("connection-routers");
if (ccBalancers != null) {
NodeList ccBalancer = e.getElementsByTagName("broker-balancer");
if (ccConnectionRouters != null) {
NodeList ccConnectionRouter = e.getElementsByTagName("connection-router");
if (ccBalancer != null) {
for (int i = 0; i < ccBalancer.getLength(); i++) {
Element ccNode = (Element) ccBalancer.item(i);
if (ccConnectionRouter != null) {
for (int i = 0; i < ccConnectionRouter.getLength(); i++) {
Element ccNode = (Element) ccConnectionRouter.item(i);
parseBalancerConfiguration(ccNode, config);
parseConnectionRouterConfiguration(ccNode, config);
}
}
}
@ -2646,16 +2646,16 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
mainConfig.getDivertConfigurations().add(config);
}
private void parseBalancerConfiguration(final Element e, final Configuration config) throws Exception {
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
private void parseConnectionRouterConfiguration(final Element e, final Configuration config) throws Exception {
ConnectionRouterConfiguration connectionRouterConfiguration = new ConnectionRouterConfiguration();
brokerBalancerConfiguration.setName(e.getAttribute("name"));
connectionRouterConfiguration.setName(e.getAttribute("name"));
brokerBalancerConfiguration.setTargetKey(TargetKey.valueOf(getString(e, "target-key", brokerBalancerConfiguration.getTargetKey().name(), Validators.TARGET_KEY)));
connectionRouterConfiguration.setKeyType(KeyType.valueOf(getString(e, "key-type", connectionRouterConfiguration.getKeyType().name(), Validators.KEY_TYPE)));
brokerBalancerConfiguration.setTargetKeyFilter(getString(e, "target-key-filter", brokerBalancerConfiguration.getTargetKeyFilter(), Validators.NO_CHECK));
connectionRouterConfiguration.setKeyFilter(getString(e, "key-filter", connectionRouterConfiguration.getKeyFilter(), Validators.NO_CHECK));
brokerBalancerConfiguration.setLocalTargetFilter(getString(e, "local-target-filter", brokerBalancerConfiguration.getLocalTargetFilter(), Validators.NO_CHECK));
connectionRouterConfiguration.setLocalTargetFilter(getString(e, "local-target-filter", connectionRouterConfiguration.getLocalTargetFilter(), Validators.NO_CHECK));
NamedPropertyConfiguration policyConfiguration = null;
PoolConfiguration poolConfiguration = null;
@ -2667,23 +2667,23 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
if (child.getNodeName().equals("cache")) {
CacheConfiguration cacheConfiguration = new CacheConfiguration();
parseCacheConfiguration((Element) child, cacheConfiguration);
brokerBalancerConfiguration.setCacheConfiguration(cacheConfiguration);
connectionRouterConfiguration.setCacheConfiguration(cacheConfiguration);
} else if (child.getNodeName().equals("policy")) {
policyConfiguration = new NamedPropertyConfiguration();
parsePolicyConfiguration((Element) child, policyConfiguration);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfiguration);
connectionRouterConfiguration.setPolicyConfiguration(policyConfiguration);
} else if (child.getNodeName().equals("pool")) {
poolConfiguration = new PoolConfiguration();
parsePoolConfiguration((Element) child, config, poolConfiguration);
brokerBalancerConfiguration.setPoolConfiguration(poolConfiguration);
connectionRouterConfiguration.setPoolConfiguration(poolConfiguration);
} else if (child.getNodeName().equals("local-target-key-transformer")) {
policyConfiguration = new NamedPropertyConfiguration();
parseTransformerConfiguration((Element) child, policyConfiguration);
brokerBalancerConfiguration.setTransformerConfiguration(policyConfiguration);
connectionRouterConfiguration.setTransformerConfiguration(policyConfiguration);
}
}
config.getBalancerConfigurations().add(brokerBalancerConfiguration);
config.getConnectionRouters().add(connectionRouterConfiguration);
}
private void parseCacheConfiguration(final Element e, final CacheConfiguration cacheConfiguration) throws ClassNotFoundException {

View File

@ -18,11 +18,11 @@
package org.apache.activemq.artemis.core.management.impl;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
import org.apache.activemq.artemis.api.core.management.ConnectionRouterControl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.json.JsonObjectBuilder;
@ -40,8 +40,8 @@ import javax.management.openmbean.TabularDataSupport;
import javax.management.openmbean.TabularType;
import java.util.Map;
public class BrokerBalancerControlImpl extends AbstractControl implements BrokerBalancerControl {
private final BrokerBalancer balancer;
public class ConnectionRouterControlImpl extends AbstractControl implements ConnectionRouterControl {
private final ConnectionRouter connectionRouter;
private static CompositeType parameterType;
@ -53,14 +53,14 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
private static CompositeType targetType;
public BrokerBalancerControlImpl(final BrokerBalancer balancer, final StorageManager storageManager) throws NotCompliantMBeanException {
super(BrokerBalancerControl.class, storageManager);
this.balancer = balancer;
public ConnectionRouterControlImpl(final ConnectionRouter connectionRouter, final StorageManager storageManager) throws NotCompliantMBeanException {
super(ConnectionRouterControl.class, storageManager);
this.connectionRouter = connectionRouter;
}
@Override
public CompositeData getTarget(String key) throws Exception {
TargetResult result = balancer.getTarget(key);
TargetResult result = connectionRouter.getTarget(key);
if (TargetResult.Status.OK == result.getStatus()) {
CompositeData connectorData = null;
TransportConfiguration connector = result.getTarget().getConnector();
@ -87,7 +87,7 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
@Override
public String getTargetAsJSON(String key) {
TargetResult result = balancer.getTarget(key);
TargetResult result = connectionRouter.getTarget(key);
if (TargetResult.Status.OK == result.getStatus()) {
TransportConfiguration connector = result.getTarget().getConnector();
@ -109,32 +109,32 @@ public class BrokerBalancerControlImpl extends AbstractControl implements Broker
@Override
public void setLocalTargetFilter(String regExp) {
balancer.setLocalTargetFilter(regExp);
connectionRouter.setLocalTargetFilter(regExp);
}
@Override
public String getLocalTargetFilter() {
return balancer.getLocalTargetFilter();
return connectionRouter.getLocalTargetFilter();
}
@Override
public void setTargetKeyFilter(String regExp) {
balancer.getTargetKeyResolver().setKeyFilter(regExp);
connectionRouter.getTargetKeyResolver().setKeyFilter(regExp);
}
@Override
public String getTargetKeyFilter() {
return balancer.getTargetKeyResolver().getKeyFilter();
return connectionRouter.getTargetKeyResolver().getKeyFilter();
}
@Override
protected MBeanOperationInfo[] fillMBeanOperationInfo() {
return MBeanInfoHelper.getMBeanOperationsInfo(BrokerBalancerControl.class);
return MBeanInfoHelper.getMBeanOperationsInfo(ConnectionRouterControl.class);
}
@Override
protected MBeanAttributeInfo[] fillMBeanAttributeInfo() {
return MBeanInfoHelper.getMBeanAttributesInfo(BrokerBalancerControl.class);
return MBeanInfoHelper.getMBeanAttributesInfo(ConnectionRouterControl.class);
}

View File

@ -172,8 +172,8 @@ public class ActiveMQPacketHandler implements ChannelHandler {
}
final String validatedUser = server.validateUser(activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), connection, protocolManager.getSecurityDomain());
if (connection.getTransportConnection().getRedirectTo() != null) {
protocolManager.getRedirectHandler().redirect(connection, request);
if (connection.getTransportConnection().getRouter() != null) {
protocolManager.getRoutingHandler().route(connection, request);
}
OperationContext sessionOperationContext = server.newOperationContext();
@ -198,8 +198,8 @@ public class ActiveMQPacketHandler implements ChannelHandler {
if (e.getType() == ActiveMQExceptionType.INCOMPATIBLE_CLIENT_SERVER_VERSIONS) {
incompatibleVersion = true;
logger.debug("Sending ActiveMQException after Incompatible client", e);
} else if (e.getType() == ActiveMQExceptionType.REDIRECTED) {
logger.debug("Sending ActiveMQException after redirected client", e);
} else if (e.getType() == ActiveMQExceptionType.ROUTING_EXCEPTION) {
logger.debug("Sending ActiveMQException after routing client", e);
} else {
ActiveMQServerLogger.LOGGER.failedToCreateSession(e);
}

View File

@ -19,10 +19,10 @@ package org.apache.activemq.artemis.core.protocol.core.impl;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.server.balancing.RedirectContext;
import org.apache.activemq.artemis.core.server.routing.RoutingContext;
public class ActiveMQRedirectContext extends RedirectContext {
public ActiveMQRedirectContext(CoreRemotingConnection connection, CreateSessionMessage message) {
public class ActiveMQRoutingContext extends RoutingContext {
public ActiveMQRoutingContext(CoreRemotingConnection connection, CreateSessionMessage message) {
super(connection, connection.getClientID(), message.getUsername());
}
}

View File

@ -22,36 +22,36 @@ import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.CreateSessionMessage;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
import org.apache.activemq.artemis.core.server.routing.RoutingHandler;
public class ActiveMQRedirectHandler extends RedirectHandler<ActiveMQRedirectContext> {
public class ActiveMQRoutingHandler extends RoutingHandler<ActiveMQRoutingContext> {
public ActiveMQRedirectHandler(ActiveMQServer server) {
public ActiveMQRoutingHandler(ActiveMQServer server) {
super(server);
}
public boolean redirect(CoreRemotingConnection connection, CreateSessionMessage message) throws Exception {
if (!connection.isVersionSupportRedirect()) {
public boolean route(CoreRemotingConnection connection, CreateSessionMessage message) throws Exception {
if (!connection.isVersionSupportRouting()) {
throw ActiveMQMessageBundle.BUNDLE.incompatibleClientServer();
}
return redirect(new ActiveMQRedirectContext(connection, message));
return route(new ActiveMQRoutingContext(connection, message));
}
@Override
public void cannotRedirect(ActiveMQRedirectContext context) throws Exception {
public void refuse(ActiveMQRoutingContext context) throws Exception {
switch (context.getResult().getStatus()) {
case REFUSED_UNAVAILABLE:
throw ActiveMQMessageBundle.BUNDLE.cannotRedirect();
throw ActiveMQMessageBundle.BUNDLE.connectionRouterNotReady(context.getRouter());
case REFUSED_USE_ANOTHER:
throw ActiveMQMessageBundle.BUNDLE.balancerReject();
throw ActiveMQMessageBundle.BUNDLE.connectionRejected(context.getRouter());
}
}
@Override
public void redirectTo(ActiveMQRedirectContext context) throws Exception {
public void redirect(ActiveMQRoutingContext context) throws Exception {
context.getConnection().disconnect(DisconnectReason.REDIRECT, context.getTarget().getNodeID(), context.getTarget().getConnector());
throw ActiveMQMessageBundle.BUNDLE.redirectConnection(context.getTarget().getConnector());
throw ActiveMQMessageBundle.BUNDLE.connectionRedirected(context.getRouter(), context.getTarget().getConnector());
}
}

View File

@ -73,7 +73,7 @@ import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
public class CoreProtocolManager implements ProtocolManager<Interceptor, ActiveMQRedirectHandler> {
public class CoreProtocolManager implements ProtocolManager<Interceptor, ActiveMQRoutingHandler> {
private static final Logger logger = Logger.getLogger(CoreProtocolManager.class);
@ -91,7 +91,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor, ActiveM
private String securityDomain;
private final ActiveMQRedirectHandler redirectHandler;
private final ActiveMQRoutingHandler routingHandler;
public CoreProtocolManager(final CoreProtocolManagerFactory factory,
final ActiveMQServer server,
@ -105,7 +105,7 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor, ActiveM
this.outgoingInterceptors = outgoingInterceptors;
this.redirectHandler = new ActiveMQRedirectHandler(server);
this.routingHandler = new ActiveMQRoutingHandler(server);
}
@Override
@ -239,8 +239,8 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor, ActiveM
}
@Override
public ActiveMQRedirectHandler getRedirectHandler() {
return redirectHandler;
public ActiveMQRoutingHandler getRoutingHandler() {
return routingHandler;
}
private boolean isArtemis(ActiveMQBuffer buffer) {

View File

@ -233,7 +233,7 @@ public class NettyAcceptor extends AbstractAcceptor {
private final boolean autoStart;
private final String redirectTo;
private final String router;
final AtomicBoolean warningPrinted = new AtomicBoolean(false);
@ -375,7 +375,7 @@ public class NettyAcceptor extends AbstractAcceptor {
autoStart = ConfigurationHelper.getBooleanProperty(TransportConstants.AUTO_START, TransportConstants.DEFAULT_AUTO_START, configuration);
redirectTo = ConfigurationHelper.getStringProperty(TransportConstants.REDIRECT_TO, TransportConstants.DEFAULT_REDIRECT_TO, configuration);
router = ConfigurationHelper.getStringProperty(TransportConstants.ROUTER, TransportConstants.DEFAULT_ROUTER, configuration);
}
private Object loadSSLContext() {
@ -911,7 +911,7 @@ public class NettyAcceptor extends AbstractAcceptor {
super.channelActive(ctx);
Listener connectionListener = new Listener();
NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver, redirectTo);
NettyServerConnection nc = new NettyServerConnection(configuration, ctx.channel(), connectionListener, !httpEnabled && batchDelay > 0, directDeliver, router);
connectionListener.connectionCreated(NettyAcceptor.this, nc, protocolHandler.getProtocol(protocol));

View File

@ -25,17 +25,17 @@ public class NettyServerConnection extends NettyConnection {
private String sniHostname;
private final String redirectTo;
private final String router;
public NettyServerConnection(Map<String, Object> configuration,
Channel channel,
ServerConnectionLifeCycleListener listener,
boolean batchingEnabled,
boolean directDeliver,
String redirectTo) {
String router) {
super(configuration, channel, listener, batchingEnabled, directDeliver);
this.redirectTo = redirectTo;
this.router = router;
}
@Override
@ -48,7 +48,7 @@ public class NettyServerConnection extends NettyConnection {
}
@Override
public String getRedirectTo() {
return redirectTo;
public String getRouter() {
return router;
}
}

View File

@ -39,7 +39,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseExce
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.ActiveMQRedirectedException;
import org.apache.activemq.artemis.api.core.ActiveMQRoutingException;
import org.apache.activemq.artemis.api.core.ActiveMQRemoteDisconnectException;
import org.apache.activemq.artemis.api.core.ActiveMQReplicationTimeooutException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
@ -508,18 +508,18 @@ public interface ActiveMQMessageBundle {
@Message(id = 229235, value = "Incompatible binding with name {0} already exists: {1}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQIllegalStateException bindingAlreadyExists(String name, String binding);
@Message(id = 229236, value = "Invalid target key {0}", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidTargetKey(String val);
@Message(id = 229236, value = "Invalid connection router key {0}", format = Message.Format.MESSAGE_FORMAT)
IllegalArgumentException invalidConnectionRouterKey(String val);
@Message(id = 229237, value = "Connection redirected to {0}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQRedirectedException redirectConnection(TransportConfiguration connector);
@Message(id = 229237, value = "Connection router {0} redirected the connection to {1}", format = Message.Format.MESSAGE_FORMAT)
ActiveMQRoutingException connectionRedirected(String connectionRouter, TransportConfiguration connector);
@Message(id = 229238, value = "No target to redirect the connection")
ActiveMQRedirectedException cannotRedirect();
@Message(id = 229238, value = "Connection router {0} not ready", format = Message.Format.MESSAGE_FORMAT)
ActiveMQRoutingException connectionRouterNotReady(String connectionRouter);
@Message(id = 229239, value = "There is no retention configured. In order to use the replay method you must specify journal-retention-directory element on the broker.xml")
IllegalArgumentException noRetention();
@Message(id = 229240, value = "Balancer rejected the connection")
ActiveMQRemoteDisconnectException balancerReject();
@Message(id = 229240, value = "Connection router {0} rejected the connection", format = Message.Format.MESSAGE_FORMAT)
ActiveMQRemoteDisconnectException connectionRejected(String connectionRouter);
}

View File

@ -68,7 +68,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugi
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancerManager;
import org.apache.activemq.artemis.core.server.routing.ConnectionRouterManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@ -958,7 +958,7 @@ public interface ActiveMQServer extends ServiceComponent {
void reloadConfigurationFile() throws Exception;
BrokerBalancerManager getBalancerManager();
ConnectionRouterManager getConnectionRouterManager();
String validateUser(String username, String password, RemotingConnection connection, String securityDomain) throws Exception;

View File

@ -40,7 +40,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.protocol.core.Packet;
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.BackupReplicationStartFailedMessage;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.impl.BridgeImpl;
import org.apache.activemq.artemis.core.server.cluster.impl.ClusterConnectionImpl;
@ -454,12 +454,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
void requestedQuorumVotes(int vote);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221085, value = "Redirect {0} to {1}", format = Message.Format.MESSAGE_FORMAT)
void redirectClientConnection(Connection connection, Target target);
@Message(id = 221085, value = "Route {0} to {1}", format = Message.Format.MESSAGE_FORMAT)
void routeClientConnection(Connection connection, Target target);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 221086, value = "Cannot redirect {0}", format = Message.Format.MESSAGE_FORMAT)
void cannotRedirectClientConnection(Connection connection);
@Message(id = 221086, value = "Cannot route {0}", format = Message.Format.MESSAGE_FORMAT)
void cannotRouteClientConnection(Connection connection);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 222000, value = "ActiveMQServer is being finalized and has not been stopped. Please remember to stop the server before letting it go out of scope",
@ -2189,8 +2189,8 @@ public interface ActiveMQServerLogger extends BasicLogger {
void pageStoreStop(SimpleString storeName, long addressSize, long maxSize, long globalMaxSize);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224109, value = "BrokerBalancer {0} not found", format = Message.Format.MESSAGE_FORMAT)
void brokerBalancerNotFound(String name);
@Message(id = 224109, value = "ConnectionRouter {0} not found", format = Message.Format.MESSAGE_FORMAT)
void connectionRouterNotFound(String name);
@LogMessage(level = Logger.Level.WARN)
@Message(id = 224110, value = "Configuration 'whitelist' is deprecated, please use the 'allowlist' configuration", format = Message.Format.MESSAGE_FORMAT)

View File

@ -143,7 +143,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.core.server.ServiceComponent;
import org.apache.activemq.artemis.core.server.ServiceRegistry;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancerManager;
import org.apache.activemq.artemis.core.server.cluster.BackupManager;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@ -174,6 +173,7 @@ import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerMessagePlugi
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerQueuePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerResourcePlugin;
import org.apache.activemq.artemis.core.server.plugin.ActiveMQServerSessionPlugin;
import org.apache.activemq.artemis.core.server.routing.ConnectionRouterManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManager;
import org.apache.activemq.artemis.core.server.reload.ReloadManagerImpl;
import org.apache.activemq.artemis.core.server.replay.ReplayManager;
@ -294,7 +294,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private volatile RemotingService remotingService;
private volatile BrokerBalancerManager balancerManager;
private volatile ConnectionRouterManager connectionRouterManager;
private final List<ProtocolManagerFactory> protocolManagerFactories = new ArrayList<>();
@ -1236,7 +1236,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
stopComponent(balancerManager);
stopComponent(connectionRouterManager);
stopComponent(connectorsService);
@ -1677,8 +1677,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
@Override
public BrokerBalancerManager getBalancerManager() {
return balancerManager;
public ConnectionRouterManager getConnectionRouterManager() {
return connectionRouterManager;
}
public BackupManager getBackupManager() {
@ -3165,9 +3165,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
federationManager.deploy();
balancerManager = new BrokerBalancerManager(configuration, this, scheduledPool);
connectionRouterManager = new ConnectionRouterManager(configuration, this, scheduledPool);
balancerManager.deploy();
connectionRouterManager.deploy();
remotingService = new RemotingServiceImpl(clusterManager, configuration, this, managementService, scheduledPool, protocolManagerFactories, executorFactory.getExecutor(), serviceRegistry);
@ -3332,7 +3332,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
federationManager.start();
}
balancerManager.start();
connectionRouterManager.start();
startProtocolServices();

View File

@ -43,7 +43,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@ -127,9 +127,9 @@ public interface ManagementService extends NotificationService, ActiveMQComponen
void unregisterCluster(String name) throws Exception;
void registerBrokerBalancer(BrokerBalancer balancer) throws Exception;
void registerConnectionRouter(ConnectionRouter router) throws Exception;
void unregisterBrokerBalancer(String name) throws Exception;
void unregisterConnectionRouter(String name) throws Exception;
Object getResource(String resourceName);

View File

@ -48,7 +48,7 @@ import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.AddressControl;
import org.apache.activemq.artemis.api.core.management.BaseBroadcastGroupControl;
import org.apache.activemq.artemis.api.core.management.BridgeControl;
import org.apache.activemq.artemis.api.core.management.BrokerBalancerControl;
import org.apache.activemq.artemis.api.core.management.ConnectionRouterControl;
import org.apache.activemq.artemis.api.core.management.ClusterConnectionControl;
import org.apache.activemq.artemis.api.core.management.DivertControl;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
@ -63,7 +63,7 @@ import org.apache.activemq.artemis.core.management.impl.AddressControlImpl;
import org.apache.activemq.artemis.core.management.impl.BaseBroadcastGroupControlImpl;
import org.apache.activemq.artemis.core.management.impl.BridgeControlImpl;
import org.apache.activemq.artemis.core.management.impl.BroadcastGroupControlImpl;
import org.apache.activemq.artemis.core.management.impl.BrokerBalancerControlImpl;
import org.apache.activemq.artemis.core.management.impl.ConnectionRouterControlImpl;
import org.apache.activemq.artemis.core.management.impl.ClusterConnectionControlImpl;
import org.apache.activemq.artemis.core.management.impl.DivertControlImpl;
import org.apache.activemq.artemis.core.management.impl.JGroupsChannelBroadcastGroupControlImpl;
@ -85,7 +85,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@ -486,22 +486,22 @@ public class ManagementServiceImpl implements ManagementService {
}
@Override
public synchronized void registerBrokerBalancer(final BrokerBalancer balancer) throws Exception {
ObjectName objectName = objectNameBuilder.getBrokerBalancerObjectName(balancer.getName());
BrokerBalancerControl brokerBalancerControl = new BrokerBalancerControlImpl(balancer, storageManager);
registerInJMX(objectName, brokerBalancerControl);
registerInRegistry(ResourceNames.BROKER_BALANCER + balancer.getName(), brokerBalancerControl);
public synchronized void registerConnectionRouter(final ConnectionRouter router) throws Exception {
ObjectName objectName = objectNameBuilder.getConnectionRouterObjectName(router.getName());
ConnectionRouterControl connectionRouterControl = new ConnectionRouterControlImpl(router, storageManager);
registerInJMX(objectName, connectionRouterControl);
registerInRegistry(ResourceNames.CONNECTION_ROUTER + router.getName(), connectionRouterControl);
if (logger.isDebugEnabled()) {
logger.debug("registered broker balancer " + objectName);
logger.debug("registered connection router " + objectName);
}
}
@Override
public synchronized void unregisterBrokerBalancer(final String name) throws Exception {
ObjectName objectName = objectNameBuilder.getBrokerBalancerObjectName(name);
public synchronized void unregisterConnectionRouter(final String name) throws Exception {
ObjectName objectName = objectNameBuilder.getConnectionRouterObjectName(name);
unregisterFromJMX(objectName);
unregisterFromRegistry(ResourceNames.BROKER_BALANCER + name);
unregisterFromRegistry(ResourceNames.CONNECTION_ROUTER + name);
}
@Override

View File

@ -15,35 +15,33 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing;
package org.apache.activemq.artemis.core.server.routing;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
import org.apache.activemq.artemis.core.server.routing.caches.Cache;
import org.apache.activemq.artemis.core.server.routing.policies.Policy;
import org.apache.activemq.artemis.core.server.routing.pools.Pool;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.jboss.logging.Logger;
import java.util.List;
import java.util.regex.Pattern;
public class BrokerBalancer implements ActiveMQComponent {
private static final Logger logger = Logger.getLogger(BrokerBalancer.class);
public class ConnectionRouter implements ActiveMQComponent {
private static final Logger logger = Logger.getLogger(ConnectionRouter.class);
public static final String CLIENT_ID_PREFIX = ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX + "balancer.client.";
public static final String CLIENT_ID_PREFIX = ActiveMQDefaultConfiguration.DEFAULT_INTERNAL_NAMING_PREFIX + "router.client.";
private final String name;
private final TargetKey targetKey;
private final KeyType keyType;
private final TargetKeyResolver targetKeyResolver;
private final KeyResolver keyResolver;
private final TargetResult localTarget;
@ -63,8 +61,8 @@ public class BrokerBalancer implements ActiveMQComponent {
return name;
}
public TargetKey getTargetKey() {
return targetKey;
public KeyType getTargetKey() {
return keyType;
}
public Target getLocalTarget() {
@ -93,22 +91,22 @@ public class BrokerBalancer implements ActiveMQComponent {
}
public BrokerBalancer(final String name,
final TargetKey targetKey,
final String targetKeyFilter,
final Target localTarget,
final String localTargetFilter,
final Cache cache,
final Pool pool,
final Policy policy,
KeyTransformer transformer) {
public ConnectionRouter(final String name,
final KeyType keyType,
final String targetKeyFilter,
final Target localTarget,
final String localTargetFilter,
final Cache cache,
final Pool pool,
final Policy policy,
KeyTransformer transformer) {
this.name = name;
this.targetKey = targetKey;
this.keyType = keyType;
this.transformer = transformer;
this.targetKeyResolver = new TargetKeyResolver(targetKey, targetKeyFilter);
this.keyResolver = new KeyResolver(keyType, targetKeyFilter);
this.localTarget = new TargetResult(localTarget);
@ -148,22 +146,22 @@ public class BrokerBalancer implements ActiveMQComponent {
}
public TargetResult getTarget(Connection connection, String clientID, String username) {
if (clientID != null && clientID.startsWith(BrokerBalancer.CLIENT_ID_PREFIX)) {
if (clientID != null && clientID.startsWith(ConnectionRouter.CLIENT_ID_PREFIX)) {
if (logger.isDebugEnabled()) {
logger.debug("The clientID [" + clientID + "] starts with BrokerBalancer.CLIENT_ID_PREFIX");
logger.debug("The clientID [" + clientID + "] starts with ConnectionRouter.CLIENT_ID_PREFIX");
}
return localTarget;
}
return getTarget(targetKeyResolver.resolve(connection, clientID, username));
return getTarget(keyResolver.resolve(connection, clientID, username));
}
public TargetResult getTarget(String key) {
if (this.localTargetFilter != null && this.localTargetFilter.matcher(transform(key)).matches()) {
if (logger.isDebugEnabled()) {
logger.debug("The " + targetKey + "[" + key + "] matches the localTargetFilter " + localTargetFilter.pattern());
logger.debug("The " + keyType + "[" + key + "] matches the localTargetFilter " + localTargetFilter.pattern());
}
return localTarget;
@ -179,21 +177,21 @@ public class BrokerBalancer implements ActiveMQComponent {
String nodeId = cache.get(key);
if (logger.isDebugEnabled()) {
logger.debug("The cache returns target [" + nodeId + "] for " + targetKey + "[" + key + "]");
logger.debug("The cache returns target [" + nodeId + "] for " + keyType + "[" + key + "]");
}
if (nodeId != null) {
Target target = pool.getReadyTarget(nodeId);
if (target != null) {
if (logger.isDebugEnabled()) {
logger.debug("The target [" + nodeId + "] is ready for " + targetKey + "[" + key + "]");
logger.debug("The target [" + nodeId + "] is ready for " + keyType + "[" + key + "]");
}
return new TargetResult(target);
}
if (logger.isDebugEnabled()) {
logger.debug("The target [" + nodeId + "] is not ready for " + targetKey + "[" + key + "]");
logger.debug("The target [" + nodeId + "] is not ready for " + keyType + "[" + key + "]");
}
}
}
@ -203,14 +201,14 @@ public class BrokerBalancer implements ActiveMQComponent {
Target target = policy.selectTarget(targets, key);
if (logger.isDebugEnabled()) {
logger.debug("The policy selects [" + target + "] from " + targets + " for " + targetKey + "[" + key + "]");
logger.debug("The policy selects [" + target + "] from " + targets + " for " + keyType + "[" + key + "]");
}
if (target != null) {
result = new TargetResult(target);
if (cache != null) {
if (logger.isDebugEnabled()) {
logger.debug("Caching " + targetKey + "[" + key + "] for [" + target + "]");
logger.debug("Caching " + keyType + "[" + key + "] for [" + target + "]");
}
cache.put(key, target.getNodeID());
}
@ -227,8 +225,8 @@ public class BrokerBalancer implements ActiveMQComponent {
}
}
public TargetKeyResolver getTargetKeyResolver() {
return targetKeyResolver;
public KeyResolver getTargetKeyResolver() {
return keyResolver;
}
private String transform(String key) {

View File

@ -15,36 +15,36 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing;
package org.apache.activemq.artemis.core.server.routing;
import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.config.routing.CacheConfiguration;
import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.routing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.caches.Cache;
import org.apache.activemq.artemis.core.server.balancing.caches.LocalCache;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactory;
import org.apache.activemq.artemis.core.server.balancing.policies.PolicyFactoryResolver;
import org.apache.activemq.artemis.core.server.balancing.pools.ClusterPool;
import org.apache.activemq.artemis.core.server.balancing.pools.DiscoveryGroupService;
import org.apache.activemq.artemis.core.server.balancing.pools.DiscoveryPool;
import org.apache.activemq.artemis.core.server.balancing.pools.DiscoveryService;
import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.pools.StaticPool;
import org.apache.activemq.artemis.core.server.balancing.targets.ActiveMQTargetFactory;
import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactory;
import org.apache.activemq.artemis.core.server.balancing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.routing.caches.Cache;
import org.apache.activemq.artemis.core.server.routing.caches.LocalCache;
import org.apache.activemq.artemis.core.server.routing.policies.Policy;
import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactory;
import org.apache.activemq.artemis.core.server.routing.policies.PolicyFactoryResolver;
import org.apache.activemq.artemis.core.server.routing.pools.ClusterPool;
import org.apache.activemq.artemis.core.server.routing.pools.DiscoveryGroupService;
import org.apache.activemq.artemis.core.server.routing.pools.DiscoveryPool;
import org.apache.activemq.artemis.core.server.routing.pools.DiscoveryService;
import org.apache.activemq.artemis.core.server.routing.pools.Pool;
import org.apache.activemq.artemis.core.server.routing.pools.StaticPool;
import org.apache.activemq.artemis.core.server.routing.targets.ActiveMQTargetFactory;
import org.apache.activemq.artemis.core.server.routing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer;
import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactory;
import org.apache.activemq.artemis.core.server.routing.transformer.TransformerFactoryResolver;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import org.jboss.logging.Logger;
@ -54,8 +54,8 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
public final class BrokerBalancerManager implements ActiveMQComponent {
private static final Logger logger = Logger.getLogger(BrokerBalancerManager.class);
public final class ConnectionRouterManager implements ActiveMQComponent {
private static final Logger logger = Logger.getLogger(ConnectionRouterManager.class);
public static final String CACHE_ID_PREFIX = "$.BC.";
@ -68,7 +68,7 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
private volatile boolean started = false;
private Map<String, BrokerBalancer> balancerControllers = new HashMap<>();
private Map<String, ConnectionRouter> connectionRouters = new HashMap<>();
@Override
@ -77,21 +77,21 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
}
public BrokerBalancerManager(final Configuration config, final ActiveMQServer server, ScheduledExecutorService scheduledExecutor) {
public ConnectionRouterManager(final Configuration config, final ActiveMQServer server, ScheduledExecutorService scheduledExecutor) {
this.config = config;
this.server = server;
this.scheduledExecutor = scheduledExecutor;
}
public void deploy() throws Exception {
for (BrokerBalancerConfiguration balancerConfig : config.getBalancerConfigurations()) {
deployBrokerBalancer(balancerConfig);
for (ConnectionRouterConfiguration connectionRouterConfig : config.getConnectionRouters()) {
deployConnectionRouter(connectionRouterConfig);
}
}
public void deployBrokerBalancer(BrokerBalancerConfiguration config) throws Exception {
public void deployConnectionRouter(ConnectionRouterConfiguration config) throws Exception {
if (logger.isDebugEnabled()) {
logger.debugf("Deploying BrokerBalancer " + config.getName());
logger.debugf("Deploying ConnectionRouter " + config.getName());
}
Target localTarget = new LocalTarget(null, server);
@ -121,12 +121,12 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
transformer = deployTransformer(transformerConfiguration);
}
BrokerBalancer balancer = new BrokerBalancer(config.getName(), config.getTargetKey(), config.getTargetKeyFilter(),
localTarget, config.getLocalTargetFilter(), cache, pool, policy, transformer);
ConnectionRouter connectionRouter = new ConnectionRouter(config.getName(), config.getKeyType(),
config.getKeyFilter(), localTarget, config.getLocalTargetFilter(), cache, pool, policy, transformer);
balancerControllers.put(balancer.getName(), balancer);
connectionRouters.put(connectionRouter.getName(), connectionRouter);
server.getManagementService().registerBrokerBalancer(balancer);
server.getManagementService().registerConnectionRouter(connectionRouter);
}
private Cache deployCache(CacheConfiguration configuration, String name) throws ClassNotFoundException {
@ -212,14 +212,14 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
return transformer;
}
public BrokerBalancer getBalancer(String name) {
return balancerControllers.get(name);
public ConnectionRouter getRouter(String name) {
return connectionRouters.get(name);
}
@Override
public void start() throws Exception {
for (BrokerBalancer brokerBalancer : balancerControllers.values()) {
brokerBalancer.start();
for (ConnectionRouter connectionRouter : connectionRouters.values()) {
connectionRouter.start();
}
started = true;
@ -229,9 +229,9 @@ public final class BrokerBalancerManager implements ActiveMQComponent {
public void stop() throws Exception {
started = false;
for (BrokerBalancer balancer : balancerControllers.values()) {
balancer.stop();
server.getManagementService().unregisterBrokerBalancer(balancer.getName());
for (ConnectionRouter connectionRouter : connectionRouters.values()) {
connectionRouter.stop();
server.getManagementService().unregisterConnectionRouter(connectionRouter.getName());
}
}
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing;
import javax.security.auth.Subject;
@ -26,22 +26,22 @@ import org.jboss.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class TargetKeyResolver {
public class KeyResolver {
public static final String DEFAULT_KEY_VALUE = "DEFAULT";
private static final Logger logger = Logger.getLogger(TargetKeyResolver.class);
private static final Logger logger = Logger.getLogger(KeyResolver.class);
private static final char SOCKET_ADDRESS_DELIMITER = ':';
private static final String SOCKET_ADDRESS_PREFIX = "/";
private final TargetKey key;
private final KeyType key;
private volatile Pattern keyFilter;
public TargetKey getKey() {
public KeyType getKey() {
return key;
}
@ -49,7 +49,7 @@ public class TargetKeyResolver {
return keyFilter != null ? keyFilter.pattern() : null;
}
public TargetKeyResolver(TargetKey key, String keyFilter) {
public KeyResolver(KeyType key, String keyFilter) {
this.key = key;
setKeyFilter(keyFilter);
}

View File

@ -15,16 +15,16 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing;
public enum TargetKey {
public enum KeyType {
CLIENT_ID, SNI_HOST, SOURCE_IP, USER_NAME, ROLE_NAME;
public static final String validValues;
static {
StringBuffer stringBuffer = new StringBuffer();
for (TargetKey type : TargetKey.values()) {
for (KeyType type : KeyType.values()) {
if (stringBuffer.length() != 0) {
stringBuffer.append(",");
@ -36,7 +36,7 @@ public enum TargetKey {
validValues = stringBuffer.toString();
}
public static TargetKey getType(String type) {
public static KeyType getType(String type) {
switch (type) {
case "CLIENT_ID":
return CLIENT_ID;

View File

@ -15,13 +15,14 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing;
package org.apache.activemq.artemis.core.server.routing;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
public class RedirectContext {
public class RoutingContext {
private final RemotingConnection connection;
private final String clientID;
@ -34,6 +35,14 @@ public class RedirectContext {
return connection;
}
public String getRouter() {
return connection.getTransportConnection().getRouter();
}
public Connection getTransportConnection() {
return connection.getTransportConnection();
}
public String getClientID() {
return clientID;
}
@ -54,7 +63,7 @@ public class RedirectContext {
this.result = result;
}
public RedirectContext(RemotingConnection connection, String clientID, String username) {
public RoutingContext(RemotingConnection connection, String clientID, String username) {
this.connection = connection;
this.clientID = clientID;
this.username = username;

View File

@ -15,14 +15,13 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing;
package org.apache.activemq.artemis.core.server.routing;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
public abstract class RedirectHandler<T extends RedirectContext> {
public abstract class RoutingHandler<T extends RoutingContext> {
private final ActiveMQServer server;
@ -31,41 +30,39 @@ public abstract class RedirectHandler<T extends RedirectContext> {
}
protected RedirectHandler(ActiveMQServer server) {
protected RoutingHandler(ActiveMQServer server) {
this.server = server;
}
protected abstract void cannotRedirect(T context) throws Exception;
protected abstract void refuse(T context) throws Exception;
protected abstract void redirectTo(T context) throws Exception;
protected abstract void redirect(T context) throws Exception;
protected boolean redirect(T context) throws Exception {
Connection transportConnection = context.getConnection().getTransportConnection();
protected boolean route(T context) throws Exception {
ConnectionRouter connectionRouter = getServer().getConnectionRouterManager().getRouter(context.getRouter());
BrokerBalancer brokerBalancer = getServer().getBalancerManager().getBalancer(transportConnection.getRedirectTo());
if (connectionRouter == null) {
ActiveMQServerLogger.LOGGER.connectionRouterNotFound(context.getRouter());
if (brokerBalancer == null) {
ActiveMQServerLogger.LOGGER.brokerBalancerNotFound(transportConnection.getRedirectTo());
cannotRedirect(context);
refuse(context);
return true;
}
context.setResult(brokerBalancer.getTarget(transportConnection, context.getClientID(), context.getUsername()));
context.setResult(connectionRouter.getTarget(context.getTransportConnection(), context.getClientID(), context.getUsername()));
if (TargetResult.Status.OK != context.getResult().getStatus()) {
ActiveMQServerLogger.LOGGER.cannotRedirectClientConnection(transportConnection);
ActiveMQServerLogger.LOGGER.cannotRouteClientConnection(context.getTransportConnection());
cannotRedirect(context);
refuse(context);
return true;
}
ActiveMQServerLogger.LOGGER.redirectClientConnection(transportConnection, context.getTarget());
ActiveMQServerLogger.LOGGER.routeClientConnection(context.getTransportConnection(), context.getTarget());
if (!context.getTarget().isLocal()) {
redirectTo(context);
redirect(context);
return true;
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.caches;
package org.apache.activemq.artemis.core.server.routing.caches;
public interface Cache {

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.caches;
package org.apache.activemq.artemis.core.server.routing.caches;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
package org.apache.activemq.artemis.core.server.routing.policies;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetProbe;
import org.apache.activemq.artemis.core.server.routing.targets.TargetProbe;
import java.util.Map;

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
package org.apache.activemq.artemis.core.server.routing.policies;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import java.util.List;
import java.util.Map;

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
package org.apache.activemq.artemis.core.server.routing.policies;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import java.util.List;

View File

@ -15,10 +15,10 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
package org.apache.activemq.artemis.core.server.routing.policies;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetProbe;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetProbe;
import org.jboss.logging.Logger;
import java.util.ArrayList;

View File

@ -15,10 +15,10 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
package org.apache.activemq.artemis.core.server.routing.policies;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetProbe;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetProbe;
import java.util.List;
import java.util.Map;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
package org.apache.activemq.artemis.core.server.routing.policies;
public interface PolicyFactory {
Policy create();

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
package org.apache.activemq.artemis.core.server.routing.policies;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import java.util.HashMap;
import java.util.Map;
@ -56,7 +56,7 @@ public class PolicyFactoryResolver {
private void loadPolicyFactories() {
ServiceLoader<PolicyFactory> serviceLoader = ServiceLoader.load(
PolicyFactory.class, BrokerBalancer.class.getClassLoader());
PolicyFactory.class, ConnectionRouter.class.getClassLoader());
for (PolicyFactory policyFactory : serviceLoader) {
policyFactories.put(keyFromClassName(policyFactory.getClass().getName()), policyFactory);

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.policies;
package org.apache.activemq.artemis.core.server.routing.policies;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.utils.RandomUtil;
import java.util.List;

View File

@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.pools;
package org.apache.activemq.artemis.core.server.routing.pools;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetMonitor;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetProbe;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.routing.targets.TargetMonitor;
import org.apache.activemq.artemis.core.server.routing.targets.TargetProbe;
import org.jboss.logging.Logger;
import java.util.ArrayList;

View File

@ -15,11 +15,11 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.pools;
package org.apache.activemq.artemis.core.server.routing.pools;
import org.apache.activemq.artemis.api.core.client.ClusterTopologyListener;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.routing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
import java.util.Map;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.pools;
package org.apache.activemq.artemis.core.server.routing.pools;
import org.apache.activemq.artemis.core.cluster.DiscoveryEntry;
import org.apache.activemq.artemis.core.cluster.DiscoveryGroup;

View File

@ -15,9 +15,9 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.pools;
package org.apache.activemq.artemis.core.server.routing.pools;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.routing.targets.TargetFactory;
import java.util.concurrent.ScheduledExecutorService;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.pools;
package org.apache.activemq.artemis.core.server.routing.pools;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;

View File

@ -15,11 +15,11 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.pools;
package org.apache.activemq.artemis.core.server.routing.pools;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetProbe;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetProbe;
import java.util.List;

View File

@ -15,10 +15,10 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.pools;
package org.apache.activemq.artemis.core.server.routing.pools;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetFactory;
import org.apache.activemq.artemis.core.server.routing.targets.TargetFactory;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
import org.apache.activemq.artemis.api.core.TransportConfiguration;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
public abstract class AbstractTargetFactory implements TargetFactory {

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -25,7 +25,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.ActiveMQManagementProxy;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
@ -67,7 +67,7 @@ public class ActiveMQTarget extends AbstractTarget implements FailureListener {
managementProxy = new ActiveMQManagementProxy(sessionFactory.createSession(getUsername(), getPassword(),
false, true, true, false, ActiveMQClient.DEFAULT_ACK_BATCH_SIZE,
BrokerBalancer.CLIENT_ID_PREFIX + UUIDGenerator.getInstance().generateStringUUID()).start());
ConnectionRouter.CLIENT_ID_PREFIX + UUIDGenerator.getInstance().generateStringUUID()).start());
if (getNodeID() == null) {
setNodeID(getAttribute(ResourceNames.BROKER, "NodeID", String.class, 3000));

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
import org.apache.activemq.artemis.api.core.TransportConfiguration;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
import org.apache.activemq.artemis.api.core.TransportConfiguration;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
import org.apache.activemq.artemis.api.core.TransportConfiguration;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
public interface TargetListener {
void targetConnected();

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.jboss.logging.Logger;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
public abstract class TargetProbe {
private final String name;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.targets;
package org.apache.activemq.artemis.core.server.routing.targets;
public class TargetResult {

View File

@ -15,12 +15,12 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
package org.apache.activemq.artemis.core.server.routing.transformer;
import java.util.Map;
import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKeyResolver;
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.routing.KeyResolver;
public class ConsistentHashModulo implements KeyTransformer {
public static final String NAME = "CONSISTENT_HASH_MODULO";
@ -29,7 +29,7 @@ public class ConsistentHashModulo implements KeyTransformer {
@Override
public String transform(String str) {
if (TargetKeyResolver.DEFAULT_KEY_VALUE.equals(str)) {
if (KeyResolver.DEFAULT_KEY_VALUE.equals(str)) {
// we only want to transform resolved keys
return str;
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
package org.apache.activemq.artemis.core.server.routing.transformer;
import java.util.Map;

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
package org.apache.activemq.artemis.core.server.routing.transformer;
public interface TransformerFactory {
KeyTransformer create();

View File

@ -15,13 +15,13 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.transformer;
package org.apache.activemq.artemis.core.server.routing.transformer;
import java.util.HashMap;
import java.util.Map;
import java.util.ServiceLoader;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
public class TransformerFactoryResolver {
private static TransformerFactoryResolver instance;
@ -50,7 +50,7 @@ public class TransformerFactoryResolver {
private void loadFactories() {
ServiceLoader<TransformerFactory> serviceLoader = ServiceLoader.load(
TransformerFactory.class, BrokerBalancer.class.getClassLoader());
TransformerFactory.class, ConnectionRouter.class.getClassLoader());
for (TransformerFactory factory : serviceLoader) {
factories.put(keyFromClassName(factory.getClass().getName()), factory);
}

View File

@ -27,9 +27,9 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
import org.apache.activemq.artemis.core.server.routing.RoutingHandler;
public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C extends RemotingConnection, R extends RedirectHandler> implements ProtocolManager<I, R> {
public abstract class AbstractProtocolManager<P, I extends BaseInterceptor<P>, C extends RemotingConnection, R extends RoutingHandler> implements ProtocolManager<I, R> {
private final Map<SimpleString, RoutingType> prefixes = new HashMap<>();

View File

@ -25,14 +25,14 @@ import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.server.balancing.RedirectHandler;
import org.apache.activemq.artemis.core.server.routing.RoutingHandler;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
/**
* Info: ProtocolManager is loaded by {@link org.apache.activemq.artemis.core.remoting.server.impl.RemotingServiceImpl#loadProtocolManagerFactories(Iterable)}
*/
public interface ProtocolManager<P extends BaseInterceptor, R extends RedirectHandler> {
public interface ProtocolManager<P extends BaseInterceptor, R extends RoutingHandler> {
ProtocolManagerFactory<P> getFactory();
@ -82,5 +82,5 @@ public interface ProtocolManager<P extends BaseInterceptor, R extends RedirectHa
String getSecurityDomain();
R getRedirectHandler();
R getRoutingHandler();
}

View File

@ -627,15 +627,15 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="broker-balancers" maxOccurs="1" minOccurs="0">
<xsd:element name="connection-routers" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
A list of balancers
A list of connection routers
</xsd:documentation>
</xsd:annotation>
<xsd:complexType>
<xsd:sequence>
<xsd:element name="broker-balancer" type="brokerBalancerType" maxOccurs="unbounded" minOccurs="0"/>
<xsd:element name="connection-router" type="connectionRouterType" maxOccurs="unbounded" minOccurs="0"/>
</xsd:sequence>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
@ -2110,16 +2110,16 @@
</xsd:sequence>
</xsd:complexType>
<xsd:complexType name="brokerBalancerType">
<xsd:complexType name="connectionRouterType">
<xsd:sequence maxOccurs="unbounded">
<xsd:element name="target-key" type="brokerBalancerTargetKeyType" maxOccurs="1" minOccurs="0">
<xsd:element name="key-type" type="connectionRouterKeyType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the optional target key
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="target-key-filter" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:element name="key-filter" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the filter for the target key
@ -2133,28 +2133,28 @@
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="cache" type="brokerBalancerCacheType" maxOccurs="1" minOccurs="0">
<xsd:element name="cache" type="connectionRouterCacheType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the time period for a cache entry to remain active
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="policy" type="brokerBalancerPolicyType" maxOccurs="1" minOccurs="0">
<xsd:element name="policy" type="connectionRouterPolicyType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the policy configuration
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="pool" type="brokerBalancerPoolType" maxOccurs="1" minOccurs="0">
<xsd:element name="pool" type="connectionRouterPoolType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the pool configuration
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="local-target-key-transformer" type="brokerBalancerKeyTransformerType" maxOccurs="1" minOccurs="0">
<xsd:element name="local-target-key-transformer" type="connectionRouterKeyTransformerType" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
the local target key transformer configuration
@ -2165,14 +2165,14 @@
<xsd:attribute name="name" type="xsd:string" use="required">
<xsd:annotation>
<xsd:documentation>
a unique name for the broker balancer
a unique name for the connection router
</xsd:documentation>
</xsd:annotation>
</xsd:attribute>
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
<xsd:simpleType name="brokerBalancerTargetKeyType">
<xsd:simpleType name="connectionRouterKeyType">
<xsd:restriction base="xsd:string">
<xsd:enumeration value="CLIENT_ID"/>
<xsd:enumeration value="SNI_HOST"/>
@ -2181,7 +2181,7 @@
</xsd:restriction>
</xsd:simpleType>
<xsd:complexType name="brokerBalancerCacheType">
<xsd:complexType name="connectionRouterCacheType">
<xsd:sequence maxOccurs="unbounded">
<xsd:element name="persisted" type="xsd:boolean" default="false" maxOccurs="1" minOccurs="0">
<xsd:annotation>
@ -2201,7 +2201,7 @@
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
<xsd:complexType name="brokerBalancerPolicyType">
<xsd:complexType name="connectionRouterPolicyType">
<xsd:sequence>
<xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">
<xsd:annotation>
@ -2221,7 +2221,7 @@
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
<xsd:complexType name="brokerBalancerKeyTransformerType">
<xsd:complexType name="connectionRouterKeyTransformerType">
<xsd:sequence>
<xsd:element ref="property" maxOccurs="unbounded" minOccurs="0">
<xsd:annotation>
@ -2241,7 +2241,7 @@
<xsd:attributeGroup ref="xml:specialAttrs"/>
</xsd:complexType>
<xsd:complexType name="brokerBalancerPoolType">
<xsd:complexType name="connectionRouterPoolType">
<xsd:sequence maxOccurs="unbounded">
<xsd:element name="username" type="xsd:string" maxOccurs="1" minOccurs="0">
<xsd:annotation>

View File

@ -599,8 +599,8 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
properties.put("balancerConfigurations.joe.localTargetFilter", "LF");
properties.put("balancerConfigurations(joe).targetKeyFilter", "TF");
properties.put("connectionRouters.joe.localTargetFilter", "LF");
properties.put("connectionRouters.joe.keyFilter", "TF");
properties.put("acceptorConfigurations.tcp.params.HOST", "LOCALHOST");
properties.put("acceptorConfigurations.tcp.params.PORT", "61616");
@ -623,9 +623,9 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
configuration.parsePrefixedProperties(properties, null);
Assert.assertEquals(1, configuration.getBalancerConfigurations().size());
Assert.assertEquals("LF", configuration.getBalancerConfigurations().get(0).getLocalTargetFilter());
Assert.assertEquals("TF", configuration.getBalancerConfigurations().get(0).getTargetKeyFilter());
Assert.assertEquals(1, configuration.getConnectionRouters().size());
Assert.assertEquals("LF", configuration.getConnectionRouters().get(0).getLocalTargetFilter());
Assert.assertEquals("TF", configuration.getConnectionRouters().get(0).getKeyFilter());
Assert.assertEquals(2, configuration.getAcceptorConfigurations().size());
@ -659,16 +659,22 @@ public class ConfigurationImplTest extends ActiveMQTestBase {
ConfigurationImpl configuration = new ConfigurationImpl();
Properties properties = new Properties();
properties.put("balancerConfigurations.joe.localTargetFilter", "LF");
properties.put("connectionRouters.joe.localTargetFilter", "LF");
// does not exist, ignored
properties.put("balancerConfigurations(bob).targetKeyFilter", "TF");
properties.put("balancerConfigurations(joe).targetKeyFilter", "TF");
properties.put("connectionRouters(bob).keyFilter", "TF");
// apply twice b/c there is no guarantee of order, this may be a problem
configuration.parsePrefixedProperties(properties, null);
properties = new Properties();
// update existing
properties.put("connectionRouters(joe).keyFilter", "TF");
configuration.parsePrefixedProperties(properties, null);
Assert.assertEquals(1, configuration.getBalancerConfigurations().size());
Assert.assertEquals("LF", configuration.getBalancerConfigurations().get(0).getLocalTargetFilter());
Assert.assertEquals("TF", configuration.getBalancerConfigurations().get(0).getTargetKeyFilter());
Assert.assertEquals(1, configuration.getConnectionRouters().size());
Assert.assertEquals("LF", configuration.getConnectionRouters().get(0).getLocalTargetFilter());
Assert.assertEquals("TF", configuration.getConnectionRouters().get(0).getKeyFilter());
}
@Test

View File

@ -46,7 +46,7 @@ import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.MetricsConfiguration;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
@ -55,10 +55,10 @@ import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.SecuritySettingPlugin;
import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.balancing.policies.FirstElementPolicy;
import org.apache.activemq.artemis.core.server.balancing.policies.LeastConnectionsPolicy;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.routing.policies.FirstElementPolicy;
import org.apache.activemq.artemis.core.server.routing.policies.LeastConnectionsPolicy;
import org.apache.activemq.artemis.core.server.routing.KeyType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LegacyLDAPSecuritySettingPlugin;
@ -76,7 +76,7 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo.MODULO;
import static org.apache.activemq.artemis.core.server.routing.transformer.ConsistentHashModulo.MODULO;
public class FileConfigurationTest extends ConfigurationImplTest {
@ -269,30 +269,30 @@ public class FileConfigurationTest extends ConfigurationImplTest {
}
}
Assert.assertEquals(5, conf.getBalancerConfigurations().size());
for (BrokerBalancerConfiguration bc : conf.getBalancerConfigurations()) {
Assert.assertEquals(5, conf.getConnectionRouters().size());
for (ConnectionRouterConfiguration bc : conf.getConnectionRouters()) {
if (bc.getName().equals("simple-local")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID);
Assert.assertEquals(bc.getKeyType(), KeyType.CLIENT_ID);
Assert.assertNotNull(bc.getLocalTargetFilter());
Assert.assertNotNull(bc.getTargetKeyFilter());
Assert.assertNotNull(bc.getKeyFilter());
Assert.assertNull(bc.getPolicyConfiguration());
} else if (bc.getName().equals("simple-local-with-transformer")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.CLIENT_ID);
Assert.assertEquals(bc.getKeyType(), KeyType.CLIENT_ID);
Assert.assertNotNull(bc.getLocalTargetFilter());
Assert.assertNotNull(bc.getTargetKeyFilter());
Assert.assertNotNull(bc.getKeyFilter());
Assert.assertNull(bc.getPolicyConfiguration());
Assert.assertNotNull(bc.getTransformerConfiguration());
Assert.assertNotNull(bc.getTransformerConfiguration().getProperties().get(MODULO));
} else if (bc.getName().equals("simple-balancer")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.USER_NAME);
} else if (bc.getName().equals("simple-router")) {
Assert.assertEquals(bc.getKeyType(), KeyType.USER_NAME);
Assert.assertNull(bc.getLocalTargetFilter());
Assert.assertEquals(bc.getPolicyConfiguration().getName(), FirstElementPolicy.NAME);
Assert.assertEquals(false, bc.getPoolConfiguration().isLocalTargetEnabled());
Assert.assertEquals("connector1", bc.getPoolConfiguration().getStaticConnectors().get(0));
Assert.assertEquals(null, bc.getPoolConfiguration().getDiscoveryGroupName());
} else if (bc.getName().equals("consistent-hash-balancer")) {
Assert.assertEquals(bc.getTargetKey(), TargetKey.SNI_HOST);
Assert.assertEquals(bc.getTargetKeyFilter(), "^[^.]+");
} else if (bc.getName().equals("consistent-hash-router")) {
Assert.assertEquals(bc.getKeyType(), KeyType.SNI_HOST);
Assert.assertEquals(bc.getKeyFilter(), "^[^.]+");
Assert.assertEquals(bc.getLocalTargetFilter(), "DEFAULT");
Assert.assertEquals(bc.getPolicyConfiguration().getName(), ConsistentHashPolicy.NAME);
Assert.assertEquals(1000, bc.getPoolConfiguration().getCheckPeriod());
@ -300,8 +300,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
Assert.assertEquals(null, bc.getPoolConfiguration().getStaticConnectors());
Assert.assertEquals("dg1", bc.getPoolConfiguration().getDiscoveryGroupName());
} else {
Assert.assertEquals(bc.getTargetKey(), TargetKey.SOURCE_IP);
Assert.assertEquals("least-connections-balancer", bc.getName());
Assert.assertEquals(bc.getKeyType(), KeyType.SOURCE_IP);
Assert.assertEquals("least-connections-router", bc.getName());
Assert.assertNotNull(bc.getCacheConfiguration());
Assert.assertEquals(true, bc.getCacheConfiguration().isPersisted());
Assert.assertEquals(60000, bc.getCacheConfiguration().getTimeout());

View File

@ -45,7 +45,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Divert;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.balancing.BrokerBalancer;
import org.apache.activemq.artemis.core.server.routing.ConnectionRouter;
import org.apache.activemq.artemis.core.server.cluster.Bridge;
import org.apache.activemq.artemis.core.server.cluster.BroadcastGroup;
import org.apache.activemq.artemis.core.server.cluster.ClusterConnection;
@ -326,12 +326,12 @@ public class ClusteredResetMockTest extends ActiveMQTestBase {
}
@Override
public void registerBrokerBalancer(BrokerBalancer balancer) throws Exception {
public void registerConnectionRouter(ConnectionRouter router) throws Exception {
}
@Override
public void unregisterBrokerBalancer(String name) throws Exception {
public void unregisterConnectionRouter(String name) throws Exception {
}

View File

@ -15,18 +15,17 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing;
package org.apache.activemq.artemis.core.server.routing;
import java.util.HashMap;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.balancing.BrokerBalancerConfiguration;
import org.apache.activemq.artemis.core.config.balancing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.balancing.PoolConfiguration;
import org.apache.activemq.artemis.core.config.routing.ConnectionRouterConfiguration;
import org.apache.activemq.artemis.core.config.routing.NamedPropertyConfiguration;
import org.apache.activemq.artemis.core.config.routing.PoolConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.transformer.ConsistentHashModulo;
import org.apache.activemq.artemis.core.server.routing.policies.ConsistentHashPolicy;
import org.apache.activemq.artemis.core.server.routing.transformer.ConsistentHashModulo;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.junit.After;
import org.junit.Before;
@ -39,10 +38,10 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class BrokerBalancerManagerTest {
public class ConnectionRouterManagerTest {
ActiveMQServer mockServer;
BrokerBalancerManager underTest;
ConnectionRouterManager underTest;
@Before
public void setUp() throws Exception {
@ -50,7 +49,7 @@ public class BrokerBalancerManagerTest {
mockServer = mock(ActiveMQServer.class);
Mockito.when(mockServer.getNodeID()).thenReturn(SimpleString.toSimpleString("UUID"));
underTest = new BrokerBalancerManager(null, mockServer, null);
underTest = new ConnectionRouterManager(null, mockServer, null);
underTest.start();
}
@ -64,17 +63,17 @@ public class BrokerBalancerManagerTest {
@Test(expected = IllegalStateException.class)
public void deployLocalOnlyPoolInvalid() throws Exception {
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
brokerBalancerConfiguration.setName("partition-local-pool");
ConnectionRouterConfiguration connectionRouterConfiguration = new ConnectionRouterConfiguration();
connectionRouterConfiguration.setName("partition-local-pool");
NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration();
policyConfig.setName(ConsistentHashPolicy.NAME);
brokerBalancerConfiguration.setPolicyConfiguration(policyConfig);
connectionRouterConfiguration.setPolicyConfiguration(policyConfig);
PoolConfiguration poolConfiguration = new PoolConfiguration();
poolConfiguration.setLocalTargetEnabled(true);
brokerBalancerConfiguration.setPoolConfiguration(poolConfiguration);
connectionRouterConfiguration.setPoolConfiguration(poolConfiguration);
underTest.deployBrokerBalancer(brokerBalancerConfiguration);
underTest.deployConnectionRouter(connectionRouterConfiguration);
}
@Test
@ -83,10 +82,10 @@ public class BrokerBalancerManagerTest {
ManagementService mockManagementService = Mockito.mock(ManagementService.class);
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
brokerBalancerConfiguration.setName("partition-local-pool");
ConnectionRouterConfiguration connectionRouterConfiguration = new ConnectionRouterConfiguration();
connectionRouterConfiguration.setName("partition-local-pool");
underTest.deployBrokerBalancer(brokerBalancerConfiguration);
underTest.deployConnectionRouter(connectionRouterConfiguration);
}
@Test()
@ -95,16 +94,16 @@ public class BrokerBalancerManagerTest {
ManagementService mockManagementService = Mockito.mock(ManagementService.class);
Mockito.when(mockServer.getManagementService()).thenReturn(mockManagementService);
BrokerBalancerConfiguration brokerBalancerConfiguration = new BrokerBalancerConfiguration();
brokerBalancerConfiguration.setName("partition-local-consistent-hash").setTargetKey(TargetKey.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
ConnectionRouterConfiguration connectionRouterConfiguration = new ConnectionRouterConfiguration();
connectionRouterConfiguration.setName("partition-local-consistent-hash").setKeyType(KeyType.CLIENT_ID).setLocalTargetFilter(String.valueOf(2));
NamedPropertyConfiguration policyConfig = new NamedPropertyConfiguration();
policyConfig.setName(ConsistentHashModulo.NAME);
HashMap<String, String> properties = new HashMap<>();
properties.put(ConsistentHashModulo.MODULO, String.valueOf(2));
policyConfig.setProperties(properties);
brokerBalancerConfiguration.setTransformerConfiguration(policyConfig);
connectionRouterConfiguration.setTransformerConfiguration(policyConfig);
underTest.deployBrokerBalancer(brokerBalancerConfiguration);
underTest.deployConnectionRouter(connectionRouterConfiguration);
}
}

View File

@ -15,20 +15,19 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing;
package org.apache.activemq.artemis.core.server.routing;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.balancing.policies.Policy;
import org.apache.activemq.artemis.core.server.balancing.pools.Pool;
import org.apache.activemq.artemis.core.server.balancing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.balancing.targets.Target;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetKey;
import org.apache.activemq.artemis.core.server.balancing.targets.TargetResult;
import org.apache.activemq.artemis.core.server.balancing.transformer.KeyTransformer;
import org.apache.activemq.artemis.core.server.routing.policies.Policy;
import org.apache.activemq.artemis.core.server.routing.pools.Pool;
import org.apache.activemq.artemis.core.server.routing.targets.LocalTarget;
import org.apache.activemq.artemis.core.server.routing.targets.Target;
import org.apache.activemq.artemis.core.server.routing.targets.TargetResult;
import org.apache.activemq.artemis.core.server.routing.transformer.KeyTransformer;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@ -36,10 +35,10 @@ import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class BrokerBalancerTest {
public class ConnectionRouterTest {
Target localTarget;
BrokerBalancer underTest;
ConnectionRouter underTest;
@Before
public void setUp() {
@ -52,7 +51,7 @@ public class BrokerBalancerTest {
public void getTarget() {
Pool pool = null;
Policy policy = null;
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
underTest = new ConnectionRouter("test", KeyType.CLIENT_ID, "^.{3}",
localTarget, "^FOO.*", null, pool, policy, null);
assertEquals( localTarget, underTest.getTarget("FOO_EE").getTarget());
assertEquals(TargetResult.REFUSED_USE_ANOTHER_RESULT, underTest.getTarget("BAR_EE"));
@ -68,7 +67,7 @@ public class BrokerBalancerTest {
return key.substring("TRANSFORM_TO".length() + 1);
}
};
underTest = new BrokerBalancer("test", TargetKey.CLIENT_ID, "^.{3}",
underTest = new ConnectionRouter("test", KeyType.CLIENT_ID, "^.{3}",
localTarget, "^FOO.*", null, pool, policy, keyTransformer);
assertEquals( localTarget, underTest.getTarget("TRANSFORM_TO_FOO_EE").getTarget());
}

View File

@ -15,7 +15,7 @@
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.balancing.caches;
package org.apache.activemq.artemis.core.server.routing.caches;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;

Some files were not shown because too many files have changed in this diff Show More