Use the type descriptor and not the key, also check object type since
Map from Proton is not enforcing.  Remove some dead code and uneeded
mutex locks for id generation.
This commit is contained in:
Timothy Bish 2015-03-05 12:40:47 -05:00
parent ab28b771e3
commit ace101a03a
1 changed files with 43 additions and 41 deletions

View File

@ -31,6 +31,9 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Destination; import javax.jms.Destination;
import javax.jms.InvalidClientIDException; import javax.jms.InvalidClientIDException;
@ -87,6 +90,7 @@ import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Modified; import org.apache.qpid.proton.amqp.messaging.Modified;
@ -136,8 +140,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix"); private static final Symbol QUEUE_PREFIX = Symbol.valueOf("queue-prefix");
private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix"); private static final Symbol TOPIC_PREFIX = Symbol.valueOf("topic-prefix");
private static final Symbol COPY = Symbol.getSymbol("copy"); private static final Symbol COPY = Symbol.getSymbol("copy");
private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector"); private static final UnsignedLong JMS_SELECTOR = UnsignedLong.valueOf(0x0000468C00000004L);
private static final Symbol NO_LOCAL = Symbol.valueOf("no-local"); private static final UnsignedLong NO_LOCAL = UnsignedLong.valueOf(0x0000468C00000003L);
private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue"); private static final Symbol TEMP_QUEUE_CAPABILITY = Symbol.valueOf("temporary-queue");
private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic"); private static final Symbol TEMP_TOPIC_CAPABILITY = Symbol.valueOf("temporary-topic");
@ -844,12 +848,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} }
} }
long nextTransactionId = 1; private final AtomicLong nextTransactionId = new AtomicLong();
class Transaction {
}
HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
public byte[] toBytes(long value) { public byte[] toBytes(long value) {
Buffer buffer = new Buffer(8); Buffer buffer = new Buffer(8);
@ -885,7 +884,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
throw new Exception("don't know how to handle a declare /w a set GlobalId"); throw new Exception("don't know how to handle a declare /w a set GlobalId");
} }
long txid = nextTransactionId++; long txid = nextTransactionId.incrementAndGet();
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN); TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
sendToActiveMQ(txinfo, null); sendToActiveMQ(txinfo, null);
LOG.trace("started transaction {}", txid); LOG.trace("started transaction {}", txid);
@ -1404,7 +1403,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>(); private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
@SuppressWarnings("rawtypes") @SuppressWarnings("unchecked")
void onSenderOpen(final Sender sender, final AmqpSessionContext sessionContext) { void onSenderOpen(final Sender sender, final AmqpSessionContext sessionContext) {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource(); org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
@ -1415,21 +1414,18 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
String selector = null; String selector = null;
if (source != null) { if (source != null) {
Map filter = source.getFilter(); DescribedType filter = findFilter(source.getFilter(), JMS_SELECTOR);
if (filter != null) { if (filter != null) {
DescribedType value = (DescribedType) filter.get(JMS_SELECTOR); selector = filter.getDescribed().toString();
if (value != null) { // Validate the Selector.
selector = value.getDescribed().toString(); try {
// Validate the Selector. SelectorParser.parse(selector);
try { } catch (InvalidSelectorException e) {
SelectorParser.parse(selector); sender.setSource(null);
} catch (InvalidSelectorException e) { sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage()));
sender.setSource(null); sender.close();
sender.setCondition(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); consumerContext.closed = true;
sender.close(); return;
consumerContext.closed = true;
return;
}
} }
} }
} }
@ -1507,12 +1503,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
consumerInfo.setSubscriptionName(sender.getName()); consumerInfo.setSubscriptionName(sender.getName());
} }
Map filter = source.getFilter(); DescribedType filter = findFilter(source.getFilter(), NO_LOCAL);
if (filter != null) { if (filter != null) {
DescribedType value = (DescribedType) filter.get(NO_LOCAL); consumerInfo.setNoLocal(true);
if (value != null) {
consumerInfo.setNoLocal(true);
}
} }
sendToActiveMQ(consumerInfo, new ResponseHandler() { sendToActiveMQ(consumerInfo, new ResponseHandler() {
@ -1623,7 +1616,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} }
private boolean contains(Symbol[] symbols, Symbol key) { private boolean contains(Symbol[] symbols, Symbol key) {
if (symbols == null) { if (symbols == null || symbols.length == 0) {
return false; return false;
} }
@ -1636,25 +1629,34 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
return false; return false;
} }
private DescribedType findFilter(Map<Symbol, Object> filters, UnsignedLong filterId) {
if (filters == null || filters.isEmpty()) {
return null;
}
for (Object value : filters.values()) {
if (value instanceof DescribedType) {
DescribedType describedType = (DescribedType) value;
if (describedType.getDescriptor().equals(filterId)) {
return describedType;
}
}
}
return null;
}
// ////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////
// //
// Implementation methods // Implementation methods
// //
// ////////////////////////////////////////////////////////////////////////// // //////////////////////////////////////////////////////////////////////////
private final Object commnadIdMutex = new Object(); private final AtomicInteger lastCommandId = new AtomicInteger();
private int lastCommandId; private final ConcurrentMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
int generateCommandId() {
synchronized (commnadIdMutex) {
return lastCommandId++;
}
}
private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
void sendToActiveMQ(Command command, ResponseHandler handler) { void sendToActiveMQ(Command command, ResponseHandler handler) {
command.setCommandId(generateCommandId()); command.setCommandId(lastCommandId.incrementAndGet());
if (handler != null) { if (handler != null) {
command.setResponseRequired(true); command.setResponseRequired(true);
resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler); resposeHandlers.put(Integer.valueOf(command.getCommandId()), handler);