Fix temp queue creation.
This commit is contained in:
Timothy Bish 2014-10-15 10:35:36 -04:00
parent 78cb1120b7
commit e90f1decb2
1 changed files with 11 additions and 17 deletions

View File

@ -392,8 +392,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
if (rh != null) { if (rh != null) {
rh.onResponse(this, response); rh.onResponse(this, response);
} else { } else {
// Pass down any unexpected errors. Should this close the // Pass down any unexpected errors. Should this close the connection?
// connection?
if (response.isException()) { if (response.isException()) {
Throwable exception = ((ExceptionResponse) response).getException(); Throwable exception = ((ExceptionResponse) response).getException();
handleException(exception); handleException(exception);
@ -415,8 +414,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} }
} }
} else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
// Pass down any unexpected async errors. Should this close the // Pass down any unexpected async errors. Should this close the connection?
// connection?
Throwable exception = ((ConnectionError) command).getException(); Throwable exception = ((ConnectionError) command).getException();
handleException(exception); handleException(exception);
} else if (command.isBrokerInfo()) { } else if (command.isBrokerInfo()) {
@ -623,9 +621,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} }
message.setProducerId(producerId); message.setProducerId(producerId);
// Always override the AMQP client's MessageId with our own. // Always override the AMQP client's MessageId with our own. Preserve
// Preserve the // the original in the TextView property for later Ack.
// original in the TextView property for later Ack.
MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
MessageId amqpMessageId = message.getMessageId(); MessageId amqpMessageId = message.getMessageId();
@ -648,11 +645,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
message.setTransactionId(new LocalTransactionId(connectionId, txid)); message.setTransactionId(new LocalTransactionId(connectionId, txid));
} }
// Lets handle the case where the expiration was set, but the // Lets handle the case where the expiration was set, but the timestamp
// timestamp // was not set by the client. Lets assign the timestamp now, and adjust
// was not set by the client. Lets assign the timestamp now, and // the expiration.
// adjust the
// expiration.
if (message.getExpiration() != 0) { if (message.getExpiration() != 0) {
if (message.getTimestamp() == 0) { if (message.getTimestamp() == 0) {
message.setTimestamp(System.currentTimeMillis()); message.setTimestamp(System.currentTimeMillis());
@ -840,8 +835,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++); ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
ActiveMQDestination dest = null; ActiveMQDestination dest = null;
boolean anonymous = false; boolean anonymous = false;
String targetNodeName = target.getAddress();
if (target.getAddress().equals(amqpTransport.getWireFormat().getAnonymousNodeName())) { if (targetNodeName != null && targetNodeName.equals(amqpTransport.getWireFormat().getAnonymousNodeName())) {
anonymous = true; anonymous = true;
} else if (target.getDynamic()) { } else if (target.getDynamic()) {
dest = createTempQueue(); dest = createTempQueue();
@ -1014,10 +1010,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
ActiveMQMessage temp = null; ActiveMQMessage temp = null;
if (md.getMessage() != null) { if (md.getMessage() != null) {
// Topics can dispatch the same Message to more than one // Topics can dispatch the same Message to more than one consumer
// consumer // so we must copy to prevent concurrent read / write to the same
// so we must copy to prevent concurrent read / write to
// the same
// message object. // message object.
if (md.getDestination().isTopic()) { if (md.getDestination().isTopic()) {
synchronized (md.getMessage()) { synchronized (md.getMessage()) {