ARTEMIS-880 use built-in prefixing for STOMP

This commit is contained in:
jbertram 2016-12-13 15:05:09 -06:00
parent 59bba916a8
commit 3150759806
8 changed files with 79 additions and 79 deletions

View File

@ -203,14 +203,6 @@ public class TransportConstants {
public static final String STOMP_MIN_LARGE_MESSAGE_SIZE = "stomp-min-large-message-size";
public static final String STOMP_ANYCAST_PREFIX = "stompAnycastPrefix";
public static final String DEFAULT_STOMP_ANYCAST_PREFIX = "";
public static final String STOMP_MULTICAST_PREFIX = "stompMulticastPrefix";
public static final String DEFAULT_STOMP_MULTICAST_PREFIX = "";
public static final String NETTY_CONNECT_TIMEOUT = "connect-timeout-millis";
public static final int DEFAULT_NETTY_CONNECT_TIMEOUT = -1;
@ -250,8 +242,6 @@ public class TransportConstants {
allowableAcceptorKeys.add(TransportConstants.CLUSTER_CONNECTION);
allowableAcceptorKeys.add(TransportConstants.STOMP_CONSUMERS_CREDIT);
allowableAcceptorKeys.add(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE);
allowableAcceptorKeys.add(TransportConstants.STOMP_ANYCAST_PREFIX);
allowableAcceptorKeys.add(TransportConstants.STOMP_MULTICAST_PREFIX);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MAX);
allowableAcceptorKeys.add(TransportConstants.CONNECTION_TTL_MIN);

View File

@ -90,10 +90,6 @@ public final class StompConnection implements RemotingConnection {
private final int minLargeMessageSize;
private final String anycastPrefix;
private final String multicastPrefix;
private StompVersions version;
private VersionedStompFrameHandler frameHandler;
@ -168,8 +164,6 @@ public final class StompConnection implements RemotingConnection {
this.enableMessageID = ConfigurationHelper.getBooleanProperty(TransportConstants.STOMP_ENABLE_MESSAGE_ID, false, acceptorUsed.getConfiguration());
this.minLargeMessageSize = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, acceptorUsed.getConfiguration());
this.anycastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_ANYCAST_PREFIX, TransportConstants.DEFAULT_STOMP_ANYCAST_PREFIX, acceptorUsed.getConfiguration());
this.multicastPrefix = ConfigurationHelper.getStringProperty(TransportConstants.STOMP_MULTICAST_PREFIX, TransportConstants.DEFAULT_STOMP_MULTICAST_PREFIX, acceptorUsed.getConfiguration());
}
@Override
@ -255,14 +249,14 @@ public final class StompConnection implements RemotingConnection {
// TODO this should take a type - send or receive so it knows whether to check the address or the queue
public void checkDestination(String destination) throws ActiveMQStompException {
if (!manager.destinationExists(destination)) {
if (!manager.destinationExists(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination)).toString())) {
throw BUNDLE.destinationNotExist(destination).setHandler(frameHandler);
}
}
public boolean autoCreateDestinationIfPossible(String queue, RoutingType routingType) throws ActiveMQStompException {
boolean result = false;
ServerSession session = getSession().getSession();
ServerSession session = getSession().getCoreSession();
try {
if (manager.getServer().getAddressInfo(SimpleString.toSimpleString(queue)) == null) {
@ -291,9 +285,9 @@ public final class StompConnection implements RemotingConnection {
}
public void checkRoutingSemantics(String destination, RoutingType routingType) throws ActiveMQStompException {
Set<RoutingType> actualDeliveryModesOfAddres = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes();
if (routingType != null && !actualDeliveryModesOfAddres.contains(routingType)) {
throw BUNDLE.illegalSemantics(routingType.toString(), actualDeliveryModesOfAddres.toString());
Set<RoutingType> actualDeliveryModesOfAddress = manager.getServer().getAddressInfo(getSession().getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes();
if (routingType != null && !actualDeliveryModesOfAddress.contains(routingType)) {
throw BUNDLE.illegalSemantics(routingType.toString(), actualDeliveryModesOfAddress.toString());
}
}
@ -757,14 +751,6 @@ public final class StompConnection implements RemotingConnection {
return minLargeMessageSize;
}
public String getAnycastPrefix() {
return anycastPrefix;
}
public String getMulticastPrefix() {
return multicastPrefix;
}
public StompProtocolManager getManager() {
return manager;
}

View File

@ -261,9 +261,9 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
StompSession session = sessions.remove(connection.getID());
if (session != null) {
try {
session.getSession().stop();
session.getSession().rollback(true);
session.getSession().close(false);
session.getCoreSession().stop();
session.getCoreSession().rollback(true);
session.getCoreSession().close(false);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.errorCleaningStompConn(e);
}
@ -274,7 +274,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
while (iterator.hasNext()) {
Map.Entry<String, StompSession> entry = iterator.next();
if (entry.getValue().getConnection() == connection) {
ServerSession serverSession = entry.getValue().getSession();
ServerSession serverSession = entry.getValue().getCoreSession();
try {
serverSession.rollback(true);
serverSession.close(false);
@ -355,7 +355,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
throw new ActiveMQStompException(connection, "No transaction started: " + txID);
}
transactedSessions.remove(txID);
session.getSession().commit();
session.getCoreSession().commit();
}
public void abortTransaction(StompConnection connection, String txID) throws Exception {
@ -364,7 +364,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
throw new ActiveMQStompException(connection, "No transaction started: " + txID);
}
transactedSessions.remove(txID);
session.getSession().rollback(false);
session.getCoreSession().rollback(false);
}
// Inner classes -------------------------------------------------

View File

@ -90,7 +90,7 @@ public class StompSession implements SessionCallback {
this.session = session;
}
public ServerSession getSession() {
public ServerSession getCoreSession() {
return session;
}
@ -287,7 +287,7 @@ public class StompSession implements SessionCallback {
receiveCredits = -1;
}
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(SimpleString.toSimpleString(destination)).getRoutingTypes();
Set<RoutingType> routingTypes = manager.getServer().getAddressInfo(getCoreSession().removePrefix(SimpleString.toSimpleString(destination))).getRoutingTypes();
if (routingTypes.size() == 1 && routingTypes.contains(RoutingType.MULTICAST)) {
// subscribes to a topic
pubSub = true;

View File

@ -236,25 +236,25 @@ public abstract class VersionedStompFrameHandler {
return response;
}
public StompFrame onSubscribe(StompFrame request) {
public StompFrame onSubscribe(StompFrame frame) {
StompFrame response = null;
String destination = getDestination(request);
String selector = request.getHeader(Stomp.Headers.Subscribe.SELECTOR);
String ack = request.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
String id = request.getHeader(Stomp.Headers.Subscribe.ID);
String durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = request.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
RoutingType routingType = getRoutingType(request.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), request.getHeader(Headers.Subscribe.DESTINATION));
boolean noLocal = false;
if (request.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
noLocal = Boolean.parseBoolean(request.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
}
try {
String destination = getDestination(frame);
String selector = frame.getHeader(Stomp.Headers.Subscribe.SELECTOR);
String ack = frame.getHeader(Stomp.Headers.Subscribe.ACK_MODE);
String id = frame.getHeader(Stomp.Headers.Subscribe.ID);
String durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIBER_NAME);
if (durableSubscriptionName == null) {
durableSubscriptionName = frame.getHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME);
}
RoutingType routingType = getRoutingType(frame.getHeader(Headers.Subscribe.SUBSCRIPTION_TYPE), frame.getHeader(Headers.Subscribe.DESTINATION));
boolean noLocal = false;
if (frame.hasHeader(Stomp.Headers.Subscribe.NO_LOCAL)) {
noLocal = Boolean.parseBoolean(frame.getHeader(Stomp.Headers.Subscribe.NO_LOCAL));
}
connection.subscribe(destination, selector, ack, id, durableSubscriptionName, noLocal, routingType);
} catch (ActiveMQStompException e) {
response = e.getFrame();
@ -264,14 +264,7 @@ public abstract class VersionedStompFrameHandler {
}
public String getDestination(StompFrame request) {
String destination = request.getHeader(Headers.Subscribe.DESTINATION);
if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) {
destination = destination.substring(connection.getMulticastPrefix().length());
} else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) {
destination = destination.substring(connection.getAnycastPrefix().length());
}
return destination;
return request.getHeader(Headers.Subscribe.DESTINATION);
}
public StompFrame postprocess(StompFrame request) {
@ -344,17 +337,13 @@ public abstract class VersionedStompFrameHandler {
connection.destroy();
}
private RoutingType getRoutingType(String typeHeader, String destination) {
private RoutingType getRoutingType(String typeHeader, String destination) throws ActiveMQStompException {
// null is valid to return here so we know when the user didn't provide any routing info
RoutingType routingType = null;
RoutingType routingType;
if (typeHeader != null) {
routingType = RoutingType.valueOf(typeHeader);
} else if (destination != null && !connection.getAnycastPrefix().equals(connection.getMulticastPrefix())) {
if (connection.getMulticastPrefix().length() > 0 && destination.startsWith(connection.getMulticastPrefix())) {
routingType = RoutingType.MULTICAST;
} else if (connection.getAnycastPrefix().length() > 0 && destination.startsWith(connection.getAnycastPrefix())) {
routingType = RoutingType.ANYCAST;
}
} else {
routingType = connection.getSession().getCoreSession().getAddressAndRoutingType(SimpleString.toSimpleString(destination), null).getB();
}
return routingType;
}

View File

@ -21,6 +21,7 @@ import javax.transaction.xa.Xid;
import java.util.List;
import java.util.Set;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
import org.apache.activemq.artemis.core.persistence.OperationContext;
@ -250,4 +251,36 @@ public interface ServerSession extends SecurityAuth {
SimpleString getMatchingQueue(SimpleString address, SimpleString queueName, RoutingType routingType) throws Exception;
AddressInfo getAddress(SimpleString address);
/**
* Strip the prefix (if it exists) from the address based on the prefixes provided to the ServerSession constructor.
*
* @param address the address to inspect
* @return the canonical (i.e. non-prefixed) address name
*/
SimpleString removePrefix(SimpleString address);
/**
* Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.
*
* @param address the address to inspect
* @param defaultRoutingType the {@code org.apache.activemq.artemis.core.server.RoutingType} to return if no prefix
* match is found.
* @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address
* name and the {@code org.apache.activemq.artemis.core.server.RoutingType} corresponding to the that prefix.
*/
Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address, RoutingType defaultRoutingType);
/**
* Get the canonical (i.e. non-prefixed) address and the corresponding routing-type.
*
* @param address the address to inspect
* @param defaultRoutingTypes a the {@code java.util.Set} of {@code org.apache.activemq.artemis.core.server.RoutingType}
* objects to return if no prefix match is found.
* @return a {@code org.apache.activemq.artemis.api.core.Pair} representing the canonical (i.e. non-prefixed) address
* name and the {@code java.util.Set} of {@code org.apache.activemq.artemis.core.server.RoutingType} objects
* corresponding to the that prefix.
*/
Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
Set<RoutingType> defaultRoutingTypes);
}

View File

@ -1726,14 +1726,16 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
}
private SimpleString removePrefix(SimpleString address) {
@Override
public SimpleString removePrefix(SimpleString address) {
if (prefixEnabled) {
return PrefixUtil.getAddress(address, prefixes);
}
return address;
}
private Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
@Override
public Pair<SimpleString, RoutingType> getAddressAndRoutingType(SimpleString address,
RoutingType defaultRoutingType) {
if (prefixEnabled) {
return PrefixUtil.getAddressAndRoutingType(address, defaultRoutingType, prefixes);
@ -1741,7 +1743,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
return new Pair<>(address, defaultRoutingType);
}
private Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
@Override
public Pair<SimpleString, Set<RoutingType>> getAddressAndRoutingTypes(SimpleString address,
Set<RoutingType> defaultRoutingTypes) {
if (prefixEnabled) {
return PrefixUtil.getAddressAndRoutingTypes(address, defaultRoutingTypes, prefixes);

View File

@ -1306,7 +1306,7 @@ public class StompTest extends StompTestBase {
final String ADDRESS = UUID.randomUUID().toString();
final String PREFIXED_ADDRESS = prefix + ADDRESS;
String param = routingType.toString();
String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix";
String urlParam = param.toLowerCase() + "Prefix";
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
conn.connect(defUser, defPass);
@ -1329,9 +1329,9 @@ public class StompTest extends StompTestBase {
AddressInfo addressInfo = server.getActiveMQServer().getAddressInfo(SimpleString.toSimpleString(ADDRESS));
assertNotNull("No address was created with the name " + ADDRESS, addressInfo);
Set<RoutingType> deliveryModest = new HashSet<>();
deliveryModest.add(RoutingType.valueOf(param));
assertEquals(deliveryModest, addressInfo.getRoutingTypes());
Set<RoutingType> routingTypes = new HashSet<>();
routingTypes.add(RoutingType.valueOf(param));
assertEquals(routingTypes, addressInfo.getRoutingTypes());
conn.disconnect();
}
@ -1360,8 +1360,7 @@ public class StompTest extends StompTestBase {
int port = 61614;
final String ADDRESS = UUID.randomUUID().toString();
final String PREFIXED_ADDRESS = prefix + ADDRESS;
String param = routingType.toString();
String urlParam = "stomp" + param.substring(0, 1) + param.substring(1).toLowerCase() + "Prefix";
String urlParam = routingType.toString().toLowerCase() + "Prefix";
server.getActiveMQServer().getRemotingService().createAcceptor("test", "tcp://" + hostname + ":" + port + "?protocols=" + StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "&" + urlParam + "=" + prefix).start();
conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
conn.connect(defUser, defPass);