diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index fa0e024861..816008fc9e 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -31,6 +31,9 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import javax.jms.Destination; import javax.jms.InvalidClientIDException; @@ -87,6 +90,7 @@ import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedLong; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Modified; @@ -136,8 +140,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); private static final Symbol COPY = Symbol.getSymbol("copy"); - private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector"); - private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); + private static final UnsignedLong JMS_SELECTOR = UnsignedLong.valueOf(0x0000468C00000004L); + private static final UnsignedLong NO_LOCAL = UnsignedLong.valueOf(0x0000468C00000003L); private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); @@ -844,12 +848,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } - long nextTransactionId = 1; - - class Transaction { - } - - HashMap transactions = new HashMap(); + private final AtomicLong nextTransactionId = new AtomicLong(); public byte[] toBytes(long value) { Buffer buffer = new Buffer(8); @@ -885,7 +884,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { throw new Exception("don't know how to handle a declare /w a set GlobalId"); } - long txid = nextTransactionId++; + long txid = nextTransactionId.incrementAndGet(); TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN); sendToActiveMQ(txinfo, null); LOG.trace("started transaction {}", txid); @@ -1404,7 +1403,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { private final ConcurrentHashMap subscriptionsByConsumerId = new ConcurrentHashMap(); - @SuppressWarnings("rawtypes") + @SuppressWarnings("unchecked") void onSenderOpen(final Sender sender, final AmqpSessionContext sessionContext) { org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource(); @@ -1415,21 +1414,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { String selector = null; if (source != null) { - Map filter = source.getFilter(); + DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR); if (filter != null) { - DescribedType value = (DescribedType) filter.get(JMS_SELECTOR); - if (value != null) { - selector = value.getDescribed().toString(); - // Validate the Selector. - try { - SelectorParser.parse(selector); - } catch (InvalidSelectorException e) { - sender.setSource(null); - sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); - sender.close(); - consumerContext.closed = true; - return; - } + selector = filter.getDescribed().toString(); + // Validate the Selector. + try { + SelectorParser.parse(selector); + } catch (InvalidSelectorException e) { + sender.setSource(null); + sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); + sender.close(); + consumerContext.closed = true; + return; } } } @@ -1507,12 +1503,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { consumerInfo.setSubscriptionName(sender.getName()); } - Map filter = source.getFilter(); + DescribedType filter = findFilter(source.getFilter(), NO_LOCAL); if (filter != null) { - DescribedType value = (DescribedType) filter.get(NO_LOCAL); - if (value != null) { - consumerInfo.setNoLocal(true); - } + consumerInfo.setNoLocal(true); } sendToActiveMQ(consumerInfo, new ResponseHandler() { @@ -1623,7 +1616,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } private boolean contains(Symbol[] symbols, Symbol key) { - if (symbols == null) { + if (symbols == null || symbols.length == 0) { return false; } @@ -1636,25 +1629,34 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { return false; } + private DescribedType findFilter(Map filters, UnsignedLong filterId) { + if (filters == null || filters.isEmpty()) { + return null; + } + + for (Object value : filters.values()) { + if (value instanceof DescribedType) { + DescribedType describedType = (DescribedType) value; + if (describedType.getDescriptor().equals(filterId)) { + return describedType; + } + } + } + + return null; + } + // ////////////////////////////////////////////////////////////////////////// // // Implementation methods // // ////////////////////////////////////////////////////////////////////////// - private final Object commnadIdMutex = new Object(); - private int lastCommandId; - - int generateCommandId() { - synchronized (commnadIdMutex) { - return lastCommandId++; - } - } - - private final ConcurrentHashMap resposeHandlers = new ConcurrentHashMap(); + private final AtomicInteger lastCommandId = new AtomicInteger(); + private final ConcurrentMap resposeHandlers = new ConcurrentHashMap(); void sendToActiveMQ(Command command, ResponseHandler handler) { - command.setCommandId(generateCommandId()); + command.setCommandId(lastCommandId.incrementAndGet()); if (handler != null) { command.setResponseRequired(true); resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);