This commit is contained in:
Clebert Suconic 2017-02-23 11:39:23 -05:00
commit 629aad19d0
2 changed files with 50 additions and 1 deletions

View File

@ -48,6 +48,7 @@ import org.apache.activemq.artemis.api.core.UDPBroadcastEndpointFactory;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.api.jms.JMSFactoryType;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.jms.referenceable.ConnectionFactoryObjectFactory;
@ -297,7 +298,9 @@ public class ActiveMQConnectionFactory implements ConnectionFactoryOptions, Exte
case JMSContext.AUTO_ACKNOWLEDGE:
case JMSContext.CLIENT_ACKNOWLEDGE:
case JMSContext.DUPS_OK_ACKNOWLEDGE:
case JMSContext.SESSION_TRANSACTED: {
case JMSContext.SESSION_TRANSACTED:
case ActiveMQJMSConstants.PRE_ACKNOWLEDGE:
case ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE: {
return;
}
default:

View File

@ -184,6 +184,52 @@ public class JmsConsumerTest extends JMSTestBase {
conn.close();
}
@Test
public void testIndividualACKJms2() throws Exception {
JMSContext context = cf.createContext(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
jBossQueue = ActiveMQJMSClient.createQueue(JmsConsumerTest.Q_NAME);
JMSProducer producer = context.createProducer();
JMSConsumer consumer = context.createConsumer(jBossQueue);
int noOfMessages = 100;
for (int i = 0; i < noOfMessages; i++) {
producer.send(jBossQueue, context.createTextMessage("m" + i));
}
context.start();
// Consume even numbers first
for (int i = 0; i < noOfMessages; i++) {
Message m = consumer.receive(500);
Assert.assertNotNull(m);
if (i % 2 == 0) {
m.acknowledge();
}
}
context.close();
context = cf.createContext(ActiveMQJMSConstants.INDIVIDUAL_ACKNOWLEDGE);
consumer = context.createConsumer(jBossQueue);
// Consume odd numbers first
for (int i = 0; i < noOfMessages; i++) {
if (i % 2 == 0) {
continue;
}
TextMessage m = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals("m" + i, m.getText());
}
SimpleString queueName = new SimpleString(JmsConsumerTest.Q_NAME);
Assert.assertEquals(0, ((Queue) server.getPostOffice().getBinding(queueName).getBindable()).getDeliveringCount());
Assert.assertEquals(0, getMessageCount((Queue) server.getPostOffice().getBinding(queueName).getBindable()));
context.close();
}
@Test
public void testIndividualACKMessageConsumer() throws Exception {
Connection conn = cf.createConnection();