From efc51fa4487f80027a1ff95de7e5bd7479eaab47 Mon Sep 17 00:00:00 2001 From: Timothy Bish Date: Wed, 8 Jan 2014 17:28:39 -0500 Subject: [PATCH] https://issues.apache.org/jira/browse/AMQ-4741 Set state to accepted if message received is properly handled. --- .../transport/amqp/AmqpProtocolConverter.java | 1 + .../transport/amqp/JMSClientTest.java | 33 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index 1af64e39be..78ee912c55 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -579,6 +579,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } receiver.flow(1); + delivery.disposition(Accepted.getInstance()); delivery.settle(); pumpProtonToSocket(); } diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java index 4813514056..4002ef20b7 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java @@ -47,7 +47,6 @@ import org.apache.activemq.util.Wait; import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; @@ -353,33 +352,22 @@ public class JMSClientTest extends AmqpTestSupport { assertNull(message); } - @Ignore @Test(timeout=30000) - public void testTTL() throws Exception { + public void testSyncSends() throws Exception { + ActiveMQAdmin.enableJMSFrameTracing(); Connection connection = null; try { QueueImpl queue = new QueueImpl("queue://" + name); - connection = createConnection(); + connection = createConnection(true); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); connection.start(); MessageProducer producer = session.createProducer(queue); - producer.setTimeToLive(1000); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); Message toSend = session.createTextMessage("Sample text"); producer.send(toSend); MessageConsumer consumer = session.createConsumer(queue); Message received = consumer.receive(5000); assertNotNull(received); - LOG.info("Message JMSExpiration = {}", received.getJMSExpiration()); - producer.setTimeToLive(100); - producer.send(toSend); - TimeUnit.SECONDS.sleep(2); - received = consumer.receive(5000); - if (received != null) { - LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}", - new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(), - received.getJMSExpiration() - received.getJMSTimestamp()}); - } - assertNull(received); } finally { connection.close(); } @@ -551,11 +539,22 @@ public class JMSClientTest extends AmqpTestSupport { } private Connection createConnection() throws JMSException { - return createConnection(name.toString()); + return createConnection(name.toString(), false); + } + + private Connection createConnection(boolean syncPublish) throws JMSException { + return createConnection(name.toString(), syncPublish); } private Connection createConnection(String clientId) throws JMSException { + return createConnection(clientId, false); + } + + private Connection createConnection(String clientId, boolean syncPublish) throws JMSException { + final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); + factory.setSyncPublish(syncPublish); + final Connection connection = factory.createConnection(); if (clientId != null && !clientId.isEmpty()) { connection.setClientID(clientId);