diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java index 347e0c9a3b..11985fb8a0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java @@ -129,8 +129,8 @@ public final class DiscoveryGroupConfiguration implements Serializable { public String toString() { return "DiscoveryGroupConfiguration{" + "name='" + name + '\'' + - ",\n refreshTimeout=" + refreshTimeout + - ",\n discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout + - "}"; + ", refreshTimeout=" + refreshTimeout + + ", discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout + + '}'; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index d0197354ee..71679b5337 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -252,16 +252,15 @@ public class TransportConfiguration implements Serializable { public static String toStringParameters(Map params, Map extraProps) { StringBuilder str = new StringBuilder(); - str.append("{"); if (params != null) { if (!params.isEmpty()) { - str.append("\n\t\t\t"); + str.append("?"); } boolean first = true; for (Map.Entry entry : params.entrySet()) { if (!first) { - str.append("\n\t\t\t"); + str.append("&"); } String key = entry.getKey(); @@ -281,7 +280,7 @@ public class TransportConfiguration implements Serializable { if (extraProps != null) { for (Map.Entry entry : extraProps.entrySet()) { if (!first) { - str.append("\n\t\t\t"); + str.append("&"); } String key = entry.getKey(); @@ -293,7 +292,6 @@ public class TransportConfiguration implements Serializable { } } } - str.append("\n\t\t}"); return str.toString(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index c6bf59b76b..949ef90810 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -1476,12 +1476,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return "ServerLocatorImpl (identity=" + identity + ") [initialConnectors=" + Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) + - ",\n discoveryGroupConfiguration=" + + ", discoveryGroupConfiguration=" + discoveryGroupConfiguration + "]"; } return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) + - ",\n discoveryGroupConfiguration=" + + ", discoveryGroupConfiguration=" + discoveryGroupConfiguration + "]"; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java index b28f78587c..c82a939ba7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java @@ -149,6 +149,6 @@ public final class TopologyMemberImpl implements TopologyMember { @Override public String toString() { - return "TopologyMember[\n\t\tid = " + nodeId + "\n\t\tconnector=" + connector + "\n\t\tbackupGroupName=" + backupGroupName + "\n\t\tscaleDownGroupName=" + scaleDownGroupName + "]"; + return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]"; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 385376e123..628312acef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.ArrayList; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -27,6 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -323,6 +326,19 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion())); + + /* if the session is JMS and it's from an older client then we need to add the old prefix to the queue + * names otherwise the older client won't realize the queue exists and will try to create it and receive + * an error + */ + if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + List queueNames = new ArrayList<>(); + for (SimpleString queueName : result.getQueueNames()) { + queueNames.add(PacketImpl.OLD_QUEUE_PREFIX.concat(queueName)); + } + result = new BindingQueryResult(result.isExists(), queueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); + } + if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 234b79950e..f0f5b3bb54 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -742,13 +742,13 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public String toString() { - return "\n " + this.getClass().getSimpleName() + "@" + + return this.getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this)) + - "\n [name=" + + " [name=" + name + - ",\n queue=" + + ", queue=" + queue + - "\n targetConnector=" + + " targetConnector=" + this.serverLocator + "]"; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2c4f91f5be..002e511bed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1961,7 +1961,7 @@ public class QueueImpl implements Queue { @Override public String toString() { - return "QueueImpl[name=" + name.toString() + ",\n postOffice=" + this.postOffice + ",\n temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this)); + return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this)); } private synchronized void internalAddTail(final MessageReference ref) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index b18bb263c3..4ace4d4d40 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -32,7 +32,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; @@ -42,6 +44,8 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -149,6 +153,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private long acks; + private boolean requiresLegacyPrefix = false; + + private boolean anycast = false; + // Constructors --------------------------------------------------------------------------------- public ServerConsumerImpl(final long id, @@ -226,6 +234,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } this.server = server; + + if (session.getRemotingConnection() instanceof CoreRemotingConnection) { + CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection(); + if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + requiresLegacyPrefix = true; + if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) { + anycast = true; + } + } + } } @Override @@ -535,6 +553,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); forcedDeliveryMessage.setAddress(messageQueue.getName()); + applyPrefixForLegacyConsumer(forcedDeliveryMessage); callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); }); @@ -1053,7 +1072,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * @param ref * @param message */ - private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException { + private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException { + applyPrefixForLegacyConsumer(message); int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount()); if (availableCredits != null) { @@ -1068,6 +1088,28 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } + private void applyPrefixForLegacyConsumer(Message message) { + /** + * check to see if: + * 1) This is a "core" connection + * 2) The "core" connection belongs to a JMS client + * 3) The JMS client is an "old" client which needs address prefixes + * + * If 1, 2, & 3 are true then apply the "old" prefix for queues and topics as appropriate. + */ + if (requiresLegacyPrefix) { + if (anycast) { + if (!message.getAddress().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) { + message.setAddress(PacketImpl.OLD_QUEUE_PREFIX + message.getAddress()); + } + } else { + if (!message.getAddress().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) { + message.setAddress(PacketImpl.OLD_TOPIC_PREFIX + message.getAddress()); + } + } + } + } + // Inner classes // ------------------------------------------------------------------------ diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 6c58a8a6bd..d466341d16 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -334,12 +334,11 @@ public class AddressControlTest extends ManagementTestBase { session.createQueue(address, RoutingType.ANYCAST, address); producer.send(session.createMessage(false)); - assertEquals(1, addressControl.getMessageCount()); + assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 1, 2000, 100)); session.createQueue(address, RoutingType.ANYCAST, address.concat('2')); producer.send(session.createMessage(false)); - Wait.waitFor(() -> addressControl.getMessageCount() == 2); - assertEquals(2, addressControl.getMessageCount()); + assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100)); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 194ad9c192..40dbd9583c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1495,8 +1495,8 @@ public class StompTest extends StompTestBase { send(conn, addressA, null, "Hello World!", true, RoutingType.ANYCAST); - assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); - assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100)); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() == 0, 2000, 100)); } @Test @@ -1517,8 +1517,8 @@ public class StompTest extends StompTestBase { send(conn, addressA, null, "Hello World!", true, RoutingType.MULTICAST); - assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); - assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() == 0, 2000, 100)); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 2, 2000, 100)); } @Test @@ -1541,7 +1541,7 @@ public class StompTest extends StompTestBase { send(conn, addressA, null, "Hello World!", true); - assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); - assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount()); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100)); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount() == 2, 2000, 100)); } }