Set state to accepted if message received is properly handled.
This commit is contained in:
Timothy Bish 2014-01-08 17:28:39 -05:00
parent be8990e06d
commit efc51fa448
2 changed files with 17 additions and 17 deletions

View File

@ -579,6 +579,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter {
} }
} }
receiver.flow(1); receiver.flow(1);
delivery.disposition(Accepted.getInstance());
delivery.settle(); delivery.settle();
pumpProtonToSocket(); pumpProtonToSocket();
} }

View File

@ -47,7 +47,6 @@ import org.apache.activemq.util.Wait;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl; import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl; import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
import org.junit.Ignore;
import org.junit.Rule; import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.rules.TestName; import org.junit.rules.TestName;
@ -353,33 +352,22 @@ public class JMSClientTest extends AmqpTestSupport {
assertNull(message); assertNull(message);
} }
@Ignore
@Test(timeout=30000) @Test(timeout=30000)
public void testTTL() throws Exception { public void testSyncSends() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
Connection connection = null; Connection connection = null;
try { try {
QueueImpl queue = new QueueImpl("queue://" + name); QueueImpl queue = new QueueImpl("queue://" + name);
connection = createConnection(); connection = createConnection(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start(); connection.start();
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
producer.setTimeToLive(1000); producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message toSend = session.createTextMessage("Sample text"); Message toSend = session.createTextMessage("Sample text");
producer.send(toSend); producer.send(toSend);
MessageConsumer consumer = session.createConsumer(queue); MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000); Message received = consumer.receive(5000);
assertNotNull(received); assertNotNull(received);
LOG.info("Message JMSExpiration = {}", received.getJMSExpiration());
producer.setTimeToLive(100);
producer.send(toSend);
TimeUnit.SECONDS.sleep(2);
received = consumer.receive(5000);
if (received != null) {
LOG.info("Message JMSExpiration = {} JMSTimeStamp = {} TTL = {}",
new Object[] { received.getJMSExpiration(), received.getJMSTimestamp(),
received.getJMSExpiration() - received.getJMSTimestamp()});
}
assertNull(received);
} finally { } finally {
connection.close(); connection.close();
} }
@ -551,11 +539,22 @@ public class JMSClientTest extends AmqpTestSupport {
} }
private Connection createConnection() throws JMSException { private Connection createConnection() throws JMSException {
return createConnection(name.toString()); return createConnection(name.toString(), false);
}
private Connection createConnection(boolean syncPublish) throws JMSException {
return createConnection(name.toString(), syncPublish);
} }
private Connection createConnection(String clientId) throws JMSException { private Connection createConnection(String clientId) throws JMSException {
return createConnection(clientId, false);
}
private Connection createConnection(String clientId, boolean syncPublish) throws JMSException {
final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password"); final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, "admin", "password");
factory.setSyncPublish(syncPublish);
final Connection connection = factory.createConnection(); final Connection connection = factory.createConnection();
if (clientId != null && !clientId.isEmpty()) { if (clientId != null && !clientId.isEmpty()) {
connection.setClientID(clientId); connection.setClientID(clientId);