This commit is contained in:
Clebert Suconic 2017-06-27 14:23:57 -04:00
commit c48d968931
2 changed files with 60 additions and 7 deletions

View File

@ -17,6 +17,10 @@
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
@ -54,6 +58,53 @@ public abstract class QueueAbstractPacket extends PacketImpl {
}
}
/**
* It converts the given {@code queueNames} using the JMS prefix found on {@link #address} when {@code clientVersion < }{@link #ADDRESSING_CHANGE_VERSION}.
* If no conversion has occurred, it returns {@code queueNames}.
*
* @param clientVersion version of the client
* @param queueNames names of the queues to be converted
* @return the converted queues names or {@code queueNames} when no conversion has occurred
*/
public final List<SimpleString> convertQueueNames(int clientVersion, List<SimpleString> queueNames) {
if (clientVersion < ADDRESSING_CHANGE_VERSION) {
return applyAddressPrefixTo(queueNames);
} else {
return queueNames;
}
}
private List<SimpleString> applyAddressPrefixTo(List<SimpleString> queueNames) {
final int names = queueNames.size();
if (names == 0) {
return Collections.emptyList();
} else {
final SimpleString address = this.address;
final SimpleString prefix = jmsPrefixOf(address);
if (prefix != null) {
final List<SimpleString> prefixedQueueNames = new ArrayList<>(names);
for (int i = 0; i < names; i++) {
final SimpleString oldQueueNames = queueNames.get(i);
final SimpleString prefixedQueueName = prefix.concat(oldQueueNames);
prefixedQueueNames.add(prefixedQueueName);
}
return prefixedQueueNames;
} else {
return queueNames;
}
}
}
private static SimpleString jmsPrefixOf(SimpleString address) {
if (address.startsWith(OLD_QUEUE_PREFIX)) {
return OLD_QUEUE_PREFIX;
} else if (address.startsWith(OLD_TOPIC_PREFIX)) {
return OLD_TOPIC_PREFIX;
} else {
return null;
}
}
public QueueAbstractPacket(byte type) {
super(type);
}

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.core.protocol.core;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.util.ArrayList;
import java.util.List;
import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -325,18 +324,21 @@ public class ServerSessionPacketHandler implements ChannelHandler {
case SESS_BINDINGQUERY: {
requiresResponse = true;
SessionBindingQueryMessage request = (SessionBindingQueryMessage) packet;
BindingQueryResult result = session.executeBindingQuery(request.getAddress(remotingConnection.getClientVersion()));
final int clientVersion = remotingConnection.getClientVersion();
BindingQueryResult result = session.executeBindingQuery(request.getAddress(clientVersion));
/* if the session is JMS and it's from an older client then we need to add the old prefix to the queue
* names otherwise the older client won't realize the queue exists and will try to create it and receive
* an error
*/
if (session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null && remotingConnection.getClientVersion() < PacketImpl.ADDRESSING_CHANGE_VERSION) {
List<SimpleString> queueNames = new ArrayList<>();
for (SimpleString queueName : result.getQueueNames()) {
queueNames.add(PacketImpl.OLD_QUEUE_PREFIX.concat(queueName));
if (clientVersion < PacketImpl.ADDRESSING_CHANGE_VERSION && session.getMetaData(ClientSession.JMS_SESSION_IDENTIFIER_PROPERTY) != null) {
final List<SimpleString> queueNames = result.getQueueNames();
if (!queueNames.isEmpty()) {
final List<SimpleString> convertedQueueNames = request.convertQueueNames(clientVersion, queueNames);
if (convertedQueueNames != queueNames) {
result = new BindingQueryResult(result.isExists(), convertedQueueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
}
}
result = new BindingQueryResult(result.isExists(), queueNames, result.isAutoCreateQueues(), result.isAutoCreateAddresses(), result.isDefaultPurgeOnNoConsumers(), result.getDefaultMaxConsumers());
}
if (channel.supports(PacketImpl.SESS_BINDINGQUERY_RESP_V4)) {