This commit is contained in:
Clebert Suconic 2017-05-25 10:54:04 -04:00
commit 36c0b3daf7
10 changed files with 81 additions and 26 deletions

View File

@ -129,8 +129,8 @@ public final class DiscoveryGroupConfiguration implements Serializable {
public String toString() { public String toString() {
return "DiscoveryGroupConfiguration{" + return "DiscoveryGroupConfiguration{" +
"name='" + name + '\'' + "name='" + name + '\'' +
",\n refreshTimeout=" + refreshTimeout + ", refreshTimeout=" + refreshTimeout +
",\n discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout + ", discoveryInitialWaitTimeout=" + discoveryInitialWaitTimeout +
"}"; '}';
} }
} }

View File

@ -252,16 +252,15 @@ public class TransportConfiguration implements Serializable {
public static String toStringParameters(Map<String, Object> params, Map<String, Object> extraProps) { public static String toStringParameters(Map<String, Object> params, Map<String, Object> extraProps) {
StringBuilder str = new StringBuilder(); StringBuilder str = new StringBuilder();
str.append("{");
if (params != null) { if (params != null) {
if (!params.isEmpty()) { if (!params.isEmpty()) {
str.append("\n\t\t\t"); str.append("?");
} }
boolean first = true; boolean first = true;
for (Map.Entry<String, Object> entry : params.entrySet()) { for (Map.Entry<String, Object> entry : params.entrySet()) {
if (!first) { if (!first) {
str.append("\n\t\t\t"); str.append("&");
} }
String key = entry.getKey(); String key = entry.getKey();
@ -281,7 +280,7 @@ public class TransportConfiguration implements Serializable {
if (extraProps != null) { if (extraProps != null) {
for (Map.Entry<String, Object> entry : extraProps.entrySet()) { for (Map.Entry<String, Object> entry : extraProps.entrySet()) {
if (!first) { if (!first) {
str.append("\n\t\t\t"); str.append("&");
} }
String key = entry.getKey(); String key = entry.getKey();
@ -293,7 +292,6 @@ public class TransportConfiguration implements Serializable {
} }
} }
} }
str.append("\n\t\t}");
return str.toString(); return str.toString();
} }

View File

@ -1476,12 +1476,12 @@ public final class ServerLocatorImpl implements ServerLocatorInternal, Discovery
return "ServerLocatorImpl (identity=" + identity + return "ServerLocatorImpl (identity=" + identity +
") [initialConnectors=" + ") [initialConnectors=" +
Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) + Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) +
",\n discoveryGroupConfiguration=" + ", discoveryGroupConfiguration=" +
discoveryGroupConfiguration + discoveryGroupConfiguration +
"]"; "]";
} }
return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) + return "ServerLocatorImpl [initialConnectors=" + Arrays.toString(initialConnectors == null ? new TransportConfiguration[0] : initialConnectors) +
",\n discoveryGroupConfiguration=" + ", discoveryGroupConfiguration=" +
discoveryGroupConfiguration + discoveryGroupConfiguration +
"]"; "]";
} }

View File

@ -149,6 +149,6 @@ public final class TopologyMemberImpl implements TopologyMember {
@Override @Override
public String toString() { public String toString() {
return "TopologyMember[\n\t\tid = " + nodeId + "\n\t\tconnector=" + connector + "\n\t\tbackupGroupName=" + backupGroupName + "\n\t\tscaleDownGroupName=" + scaleDownGroupName + "]"; return "TopologyMember[id = " + nodeId + ", connector=" + connector + ", backupGroupName=" + backupGroupName + ", scaleDownGroupName=" + scaleDownGroupName + "]";
} }
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.core;
import javax.transaction.xa.XAResource; import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid; import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -27,6 +28,8 @@ import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached; import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType; import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.core.exception.ActiveMQXAException; import org.apache.activemq.artemis.core.exception.ActiveMQXAException;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
@ -323,6 +326,19 @@ public class ServerSessionPacketHandler implements ChannelHandler {
requiresResponse = true; requiresResponse = true;
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet; SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion())); BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion()));
/* if the session is JMS and it's from an older client then we need to add the old prefix to the queue
* names otherwise the older client won't realize the queue exists and will try to create it and receive
* an error
*/
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
List<SimpleString> queueNames = new ArrayList<>();
for (SimpleString queueName : result.getQueueNames()) {
queueNames.add(PacketImpl.OLD_QUEUE_PREFIX.concat(queueName));
}
result = new BindingQueryResult(result.isExists(), queueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
}
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) { if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {
response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers()); response = new SessionBindingQueryResponseMessage_V4(result.isExists(), result.getQueueNames(), result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
} else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) { } else if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V3)) {

View File

@ -742,13 +742,13 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
@Override @Override
public String toString() { public String toString() {
return "\n " + this.getClass().getSimpleName() + "@" + return this.getClass().getSimpleName() + "@" +
Integer.toHexString(System.identityHashCode(this)) + Integer.toHexString(System.identityHashCode(this)) +
"\n [name=" + " [name=" +
name + name +
",\n queue=" + ", queue=" +
queue + queue +
"\n targetConnector=" + " targetConnector=" +
this.serverLocator + this.serverLocator +
"]"; "]";
} }

View File

@ -1961,7 +1961,7 @@ public class QueueImpl implements Queue {
@Override @Override
public String toString() { public String toString() {
return "QueueImpl[name=" + name.toString() + ",\n postOffice=" + this.postOffice + ",\n temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this)); return "QueueImpl[name=" + name.toString() + ", postOffice=" + this.postOffice + ", temp=" + this.temporary + "]@" + Integer.toHexString(System.identityHashCode(this));
} }
private synchronized void internalAddTail(final MessageReference ref) { private synchronized void internalAddTail(final MessageReference ref) {

View File

@ -32,7 +32,9 @@ import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType; import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
import org.apache.activemq.artemis.api.core.management.ManagementHelper; import org.apache.activemq.artemis.api.core.management.ManagementHelper;
import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl; import org.apache.activemq.artemis.core.client.impl.ClientConsumerImpl;
@ -42,6 +44,8 @@ import org.apache.activemq.artemis.core.message.impl.CoreMessage;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.QueueBinding; import org.apache.activemq.artemis.core.postoffice.QueueBinding;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
@ -149,6 +153,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private long acks; private long acks;
private boolean requiresLegacyPrefix = false;
private boolean anycast = false;
// Constructors --------------------------------------------------------------------------------- // Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id, public ServerConsumerImpl(final long id,
@ -226,6 +234,16 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
this.server = server; this.server = server;
if (session.getRemotingConnection() instanceof CoreRemotingConnection) {
CoreRemotingConnection coreRemotingConnection = (CoreRemotingConnection) session.getRemotingConnection();
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && coreRemotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
requiresLegacyPrefix = true;
if (getQueue().getRoutingType().equals(RoutingType.ANYCAST)) {
anycast = true;
}
}
}
} }
@Override @Override
@ -535,6 +553,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence); forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
forcedDeliveryMessage.setAddress(messageQueue.getName()); forcedDeliveryMessage.setAddress(messageQueue.getName());
applyPrefixForLegacyConsumer(forcedDeliveryMessage);
callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0); callback.sendMessage(null, forcedDeliveryMessage, ServerConsumerImpl.this, 0);
}); });
@ -1053,7 +1072,8 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
* @param ref * @param ref
* @param message * @param message
*/ */
private void deliverStandardMessage(final MessageReference ref, final Message message) throws ActiveMQException { private void deliverStandardMessage(final MessageReference ref, Message message) throws ActiveMQException {
applyPrefixForLegacyConsumer(message);
int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount()); int packetSize = callback.sendMessage(ref, message, ServerConsumerImpl.this, ref.getDeliveryCount());
if (availableCredits != null) { if (availableCredits != null) {
@ -1068,6 +1088,28 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
} }
} }
private void applyPrefixForLegacyConsumer(Message message) {
/**
* check to see if:
* 1) This is a "core" connection
* 2) The "core" connection belongs to a JMS client
* 3) The JMS client is an "old" client which needs address prefixes
*
* If 1, 2, & 3 are true then apply the "old" prefix for queues and topics as appropriate.
*/
if (requiresLegacyPrefix) {
if (anycast) {
if (!message.getAddress().startsWith(PacketImpl.OLD_QUEUE_PREFIX.toString())) {
message.setAddress(PacketImpl.OLD_QUEUE_PREFIX + message.getAddress());
}
} else {
if (!message.getAddress().startsWith(PacketImpl.OLD_TOPIC_PREFIX.toString())) {
message.setAddress(PacketImpl.OLD_TOPIC_PREFIX + message.getAddress());
}
}
}
}
// Inner classes // Inner classes
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------

View File

@ -334,12 +334,11 @@ public class AddressControlTest extends ManagementTestBase {
session.createQueue(address, RoutingType.ANYCAST, address); session.createQueue(address, RoutingType.ANYCAST, address);
producer.send(session.createMessage(false)); producer.send(session.createMessage(false));
assertEquals(1, addressControl.getMessageCount()); assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 1, 2000, 100));
session.createQueue(address, RoutingType.ANYCAST, address.concat('2')); session.createQueue(address, RoutingType.ANYCAST, address.concat('2'));
producer.send(session.createMessage(false)); producer.send(session.createMessage(false));
Wait.waitFor(() -> addressControl.getMessageCount() == 2); assertTrue(Wait.waitFor(() -> addressControl.getMessageCount() == 2, 2000, 100));
assertEquals(2, addressControl.getMessageCount());
} }
@Test @Test

View File

@ -1495,8 +1495,8 @@ public class StompTest extends StompTestBase {
send(conn, addressA, null, "Hello World!", true, RoutingType.ANYCAST); send(conn, addressA, null, "Hello World!", true, RoutingType.ANYCAST);
assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100));
assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount()); assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() == 0, 2000, 100));
} }
@Test @Test
@ -1517,8 +1517,8 @@ public class StompTest extends StompTestBase {
send(conn, addressA, null, "Hello World!", true, RoutingType.MULTICAST); send(conn, addressA, null, "Hello World!", true, RoutingType.MULTICAST);
assertEquals(0, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount()); assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() == 0, 2000, 100));
assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 2, 2000, 100));
} }
@Test @Test
@ -1541,7 +1541,7 @@ public class StompTest extends StompTestBase {
send(conn, addressA, null, "Hello World!", true); send(conn, addressA, null, "Hello World!", true);
assertEquals(1, activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount()); assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueA)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueB)).getMessageCount() == 1, 2000, 100));
assertEquals(2, activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount()); assertTrue(Wait.waitFor(() -> activeMQServer.locateQueue(SimpleString.toSimpleString(queueC)).getMessageCount() + activeMQServer.locateQueue(SimpleString.toSimpleString(queueD)).getMessageCount() == 2, 2000, 100));
} }
} }