From 4a6dc4f7bad09b098ebe24056b1c0da41490015c Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Tue, 23 May 2017 09:57:21 -0500 Subject: [PATCH 1/3] Fixing failing tests after sync changes --- .../integration/management/AddressControlTest.java | 5 ++--- .../artemis/tests/integration/stomp/StompTest.java | 12 ++++++------ 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java index 6c58a8a6bd..d466341d16 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/management/AddressControlTest.java @@ -334,12 +334,11 @@ public class AddressControlTest extends ManagementTestBase { session.createQueue(address, RoutingType.ANYCAST, address); producer.send(session.createMessage(false)); - assertEquals(1, addressControl.getMessageCount()); + assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 1, 2000, 100)); session.createQueue(address, RoutingType.ANYCAST, address.concat('2')); producer.send(session.createMessage(false)); - Wait.waitFor(() -> addressControl.getMessageCount() == 2); - assertEquals(2, addressControl.getMessageCount()); + assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100)); } @Test diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java index 194ad9c192..40dbd9583c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTest.java @@ -1495,8 +1495,8 @@ public class StompTest extends StompTestBase { send(conn, addressA, null, "Hello World!", true, RoutingType.ANYCAST); - assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); - assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100)); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() == 0, 2000, 100)); } @Test @@ -1517,8 +1517,8 @@ public class StompTest extends StompTestBase { send(conn, addressA, null, "Hello World!", true, RoutingType.MULTICAST); - assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); - assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() == 0, 2000, 100)); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 2, 2000, 100)); } @Test @@ -1541,7 +1541,7 @@ public class StompTest extends StompTestBase { send(conn, addressA, null, "Hello World!", true); - assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); - assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount()); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100)); + assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount() == 2, 2000, 100)); } } From 5a9830af17581855f55be23c4446e2df5a590255 Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Wed, 24 May 2017 17:50:45 -0500 Subject: [PATCH 2/3] Revert "[ARTEMIS-1171] Improve log readability around clustering" This reverts commit 744221dcb5f250d724a72f0321144630ecb2eab7. Broke o.a.a.a.t.i.m.QueueControlUsingCoreTest.testListDeliveringMessages --- .../artemis/api/core/DiscoveryGroupConfiguration.java | 6 +++--- .../activemq/artemis/api/core/TransportConfiguration.java | 8 +++----- .../artemis/core/client/impl/ServerLocatorImpl.java | 4 ++-- .../artemis/core/client/impl/TopologyMemberImpl.java | 2 +- .../artemis/core/server/cluster/impl/BridgeImpl.java | 8 ++++---- .../activemq/artemis/core/server/impl/QueueImpl.java | 2 +- 6 files changed, 14 insertions(+), 16 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java index 347e0c9a3b..11985fb8a0 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/DiscoveryGroupConfiguration.java @@ -129,8 +129,8 @@ public final class DiscoveryGroupConfiguration implements Serializable { public String toString() { return "DiscoveryGroupConfiguration{" + "name='" + name + '\'' + - ",\n refreshTimeout=" + refreshTimeout + - ",\n discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout + - "}"; + ", refreshTimeout=" + refreshTimeout + + ", discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout + + '}'; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java index d0197354ee..71679b5337 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/api/core/TransportConfiguration.java @@ -252,16 +252,15 @@ public class TransportConfiguration implements Serializable { public static String toStringParameters(Map params, Map extraProps) { StringBuilder str = new StringBuilder(); - str.append("{"); if (params != null) { if (!params.isEmpty()) { - str.append("\n\t\t\t"); + str.append("?"); } boolean first = true; for (Map.Entry entry : params.entrySet()) { if (!first) { - str.append("\n\t\t\t"); + str.append("&"); } String key = entry.getKey(); @@ -281,7 +280,7 @@ public class TransportConfiguration implements Serializable { if (extraProps != null) { for (Map.Entry entry : extraProps.entrySet()) { if (!first) { - str.append("\n\t\t\t"); + str.append("&"); } String key = entry.getKey(); @@ -293,7 +292,6 @@ public class TransportConfiguration implements Serializable { } } } - str.append("\n\t\t}"); return str.toString(); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java index c6bf59b76b..949ef90810 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/ServerLocatorImpl.java @@ -1476,12 +1476,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery return "ServerLocatorImpl (identity=" + identity + ") [initialConnectors=" + Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) + - ",\n discoveryGroupConfiguration=" + + ", discoveryGroupConfiguration=" + discoveryGroupConfiguration + "]"; } return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) + - ",\n discoveryGroupConfiguration=" + + ", discoveryGroupConfiguration=" + discoveryGroupConfiguration + "]"; } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java index b28f78587c..c82a939ba7 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/client/impl/TopologyMemberImpl.java @@ -149,6 +149,6 @@ public final class TopologyMemberImpl implements TopologyMember { @Override public String toString() { - return "TopologyMember[\n\t\tid = " + nodeId + "\n\t\tconnector=" + connector + "\n\t\tbackupGroupName=" + backupGroupName + "\n\t\tscaleDownGroupName=" + scaleDownGroupName + "]"; + return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]"; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java index 234b79950e..f0f5b3bb54 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/impl/BridgeImpl.java @@ -742,13 +742,13 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled @Override public String toString() { - return "\n " + this.getClass().getSimpleName() + "@" + + return this.getClass().getSimpleName() + "@" + Integer.toHexString(System.identityHashCode(this)) + - "\n [name=" + + " [name=" + name + - ",\n queue=" + + ", queue=" + queue + - "\n targetConnector=" + + " targetConnector=" + this.serverLocator + "]"; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java index 2c4f91f5be..002e511bed 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java @@ -1961,7 +1961,7 @@ public class QueueImpl implements Queue { @Override public String toString() { - return "QueueImpl[name=" + name.toString() + ",\n postOffice=" + this.postOffice + ",\n temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this)); + return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this)); } private synchronized void internalAddTail(final MessageReference ref) { From b1ad8f3adf51fa96069e1eccd7b87e9dff85223d Mon Sep 17 00:00:00 2001 From: Justin Bertram Date: Mon, 22 May 2017 11:37:11 -0500 Subject: [PATCH 3/3] ARTEMIS-1178 backwards compatibility issues This commit has 2 changes for backwards compatibility with older clients: 1) A "bindings query" will now detect if the client is both JMS and "old" (i.e. pre-2.0) and will prefix the returned queue names with the old prefix (i.e. "jms.queue."). This will allow the old client to properly detect whether or not a queue exists in its auto-creation logic. 2) When messages are dispatched to a consumer there is logic to detect if the consumer is both JMS and "old" and will prefix the "address" on the message with "jms.queue." or "jms.topic." as appropriate (if it's not already prefixed). --- .../core/ServerSessionPacketHandler.java | 16 +++++++ .../core/server/impl/ServerConsumerImpl.java | 44 ++++++++++++++++++- 2 files changed, 59 insertions(+), 1 deletion(-) diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 385376e123..628312acef 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core; import javax.transaction.xa.XAResource; import javax.transaction.xa.Xid; +import java.util.ArrayList; import java.util.List; import org.apache.activemq.artemis.api.core.ActiveMQException; @@ -27,6 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.persistence.StorageManager; @@ -323,6 +326,19 @@ public class ServerSessionPacketHandler implements ChannelHandler { requiresResponse = true; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion())); + + /* if the session is JMS and it's from an older client then we need to add the old prefix to the queue + * names otherwise the older client won't realize the queue exists and will try to create it and receive + * an error + */ + if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + List queueNames = new ArrayList<>(); + for (SimpleString queueName : result.getQueueNames()) { + queueNames.add(PacketImpl.OLD_QUEUE_PREFIX.concat(queueName)); + } + result = new BindingQueryResult(result.isExists(), queueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); + } + if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index b18bb263c3..4ace4d4d40 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -32,7 +32,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.Message; +import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.api.core.client.ClientSession; import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; @@ -42,6 +44,8 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.QueueBinding; +import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; @@ -149,6 +153,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { private long acks; + private boolean requiresLegacyPrefix = false; + + private boolean anycast = false; + // Constructors --------------------------------------------------------------------------------- public ServerConsumerImpl(final long id, @@ -226,6 +234,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } this.server = server; + + if (session.getRemotingConnection() instanceof CoreRemotingConnection) { + CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection(); + if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) { + requiresLegacyPrefix = true; + if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) { + anycast = true; + } + } + } } @Override @@ -535,6 +553,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); forcedDeliveryMessage.setAddress(messageQueue.getName()); + applyPrefixForLegacyConsumer(forcedDeliveryMessage); callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); }); @@ -1053,7 +1072,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { * @param ref * @param message */ - private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException { + private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException { + applyPrefixForLegacyConsumer(message); int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount()); if (availableCredits != null) { @@ -1068,6 +1088,28 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { } } + private void applyPrefixForLegacyConsumer(Message message) { + /** + * check to see if: + * 1) This is a "core" connection + * 2) The "core" connection belongs to a JMS client + * 3) The JMS client is an "old" client which needs address prefixes + * + * If 1, 2, & 3 are true then apply the "old" prefix for queues and topics as appropriate. + */ + if (requiresLegacyPrefix) { + if (anycast) { + if (!message.getAddress().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) { + message.setAddress(PacketImpl.OLD_QUEUE_PREFIX + message.getAddress()); + } + } else { + if (!message.getAddress().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) { + message.setAddress(PacketImpl.OLD_TOPIC_PREFIX + message.getAddress()); + } + } + } + } + // Inner classes // ------------------------------------------------------------------------