This closes #993

This commit is contained in:
Martyn Taylor 2017-02-07 14:46:35 +00:00
commit 8c77e25163
4 changed files with 56 additions and 2 deletions

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.protocol.openwire.util.OpenWireUtil;
import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.TypedProperties;
@ -772,7 +773,12 @@ public class OpenWireMessageConverter implements MessageConverter {
if (prop instanceof SimpleString) {
amqMsg.setObjectProperty(s.toString(), prop.toString());
} else {
amqMsg.setObjectProperty(s.toString(), prop);
if (keyStr.equals(MessageUtil.JMSXDELIVERYCOUNT) && prop instanceof Long) {
Long l = (Long) prop;
amqMsg.setObjectProperty(s.toString(), l.intValue());
} else {
amqMsg.setObjectProperty(s.toString(), prop);
}
}
} catch (JMSException e) {
throw new IOException("exception setting property " + s + " : " + prop, e);

View File

@ -34,6 +34,13 @@ public class QueueManagerImpl implements QueueManager {
@Override
public void run() {
Queue queue = server.locateQueue(queueName);
//the queue may already have been deleted and this is a result of that
if (queue == null) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug("pno queue to delete \"" + queueName + ".\"");
}
return;
}
SimpleString address = queue.getAddress();
AddressSettings settings = server.getAddressSettingsRepository().getMatch(address.toString());
long consumerCount = queue.getConsumerCount();

View File

@ -830,7 +830,7 @@ public class ProtonTest extends ProtonTestBase {
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisfied() throws Exception {
return amqpConnection.isClosed();
return receiver.isClosed();
}
});
assertTrue(receiver.isClosed());

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.crossprotocol;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
@ -25,6 +26,7 @@ import javax.jms.Queue;
import javax.jms.Session;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
@ -37,7 +39,14 @@ import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -113,4 +122,36 @@ public class AMQPToOpenwireTest extends ActiveMQTestBase {
}
}
}
@Test
public void testDeliveryCountMessage() throws Exception {
AmqpClient client = new AmqpClient(new URI("tcp://127.0.0.1:61616"), null, null);
AmqpConnection amqpconnection = client.connect();
try {
AmqpSession session = amqpconnection.createSession();
AmqpSender sender = session.createSender(queueName);
AmqpMessage message = new AmqpMessage();
message.setMessageId("MessageID:" + 0);
message.getWrappedMessage().setHeader(new Header());
message.getWrappedMessage().getHeader().setDeliveryCount(new UnsignedInteger(2));
sender.send(message);
} finally {
amqpconnection.close();
}
Connection connection = null;
try {
connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message receive = consumer.receive(5000);
assertNotNull(receive);
} finally {
if (connection != null) {
connection.close();
}
}
}
}