From e90f1decb2bac727378b4754b101fa9f79faeed4 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 15 Oct 2014 10:35:36 -0400 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-5391 Fix temp queue creation. --- .../transport/amqp/AmqpProtocolConverter.java | 28 ++++++++----------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 472aeb92cf..f4df997322 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -392,8 +392,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { if (rh != null) { rh.onResponse(this, response); } else { - // Pass down any unexpected errors. Should this close the - // connection? + // Pass down any unexpected errors. Should this close the connection? if (response.isException()) { Throwable exception = ((ExceptionResponse) response).getException(); handleException(exception); @@ -415,8 +414,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) { - // Pass down any unexpected async errors. Should this close the - // connection? + // Pass down any unexpected async errors. Should this close the connection? Throwable exception = ((ConnectionError) command).getException(); handleException(exception); } else if (command.isBrokerInfo()) { @@ -623,9 +621,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } message.setProducerId(producerId); - // Always override the AMQP client's MessageId with our own. - // Preserve the - // original in the TextView property for later Ack. + // Always override the AMQP client's MessageId with our own. Preserve + // the original in the TextView property for later Ack. MessageId messageId = new MessageId(producerId, messageIdGenerator.getNextSequenceId()); MessageId amqpMessageId = message.getMessageId(); @@ -648,11 +645,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { message.setTransactionId(new LocalTransactionId(connectionId, txid)); } - // Lets handle the case where the expiration was set, but the - // timestamp - // was not set by the client. Lets assign the timestamp now, and - // adjust the - // expiration. + // Lets handle the case where the expiration was set, but the timestamp + // was not set by the client. Lets assign the timestamp now, and adjust + // the expiration. if (message.getExpiration() != 0) { if (message.getTimestamp() == 0) { message.setTimestamp(System.currentTimeMillis()); @@ -840,8 +835,9 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++); ActiveMQDestination dest = null; boolean anonymous = false; + String targetNodeName = target.getAddress(); - if (target.getAddress().equals(amqpTransport.getWireFormat().getAnonymousNodeName())) { + if (targetNodeName != null && targetNodeName.equals(amqpTransport.getWireFormat().getAnonymousNodeName())) { anonymous = true; } else if (target.getDynamic()) { dest = createTempQueue(); @@ -1014,10 +1010,8 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { ActiveMQMessage temp = null; if (md.getMessage() != null) { - // Topics can dispatch the same Message to more than one - // consumer - // so we must copy to prevent concurrent read / write to - // the same + // Topics can dispatch the same Message to more than one consumer + // so we must copy to prevent concurrent read / write to the same // message object. if (md.getDestination().isTopic()) { synchronized (md.getMessage()) {