This commit is contained in:
Kevin Earls 2014-03-28 17:33:41 +01:00
parent 5da7ab3c0e
commit 4faf11d821
1 changed files with 19 additions and 4 deletions

View File

@ -37,10 +37,17 @@ import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap; import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue; import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.util.Wait; import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.BlockJUnit4ClassRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
// see: https://issues.apache.org/activemq/browse/AMQ-2651 // see: https://issues.apache.org/activemq/browse/AMQ-2651
@RunWith(BlockJUnit4ClassRunner.class)
public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport { public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(OnePrefetchAsyncConsumerTest.class); private static final Logger LOG = LoggerFactory.getLogger(OnePrefetchAsyncConsumerTest.class);
@ -50,6 +57,8 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
private final AtomicBoolean completed = new AtomicBoolean(); private final AtomicBoolean completed = new AtomicBoolean();
private final AtomicBoolean success = new AtomicBoolean(); private final AtomicBoolean success = new AtomicBoolean();
@Ignore("https://issues.apache.org/jira/browse/AMQ-5126")
@Test(timeout = 60 * 1000)
public void testPrefetchExtension() throws Exception { public void testPrefetchExtension() throws Exception {
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue); MessageProducer producer = session.createProducer(queue);
@ -83,8 +92,8 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString()); return new ActiveMQConnectionFactory(broker.getTransportConnectors().get(0).getPublishableConnectString());
} }
@Override @Before
protected void setUp() throws Exception { public void setUp() throws Exception {
setAutoFail(true); setAutoFail(true);
bindAddress = "tcp://localhost:0"; bindAddress = "tcp://localhost:0";
super.setUp(); super.setUp();
@ -96,8 +105,8 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
connection.start(); connection.start();
} }
@Override @After
protected void tearDown() throws Exception { public void tearDown() throws Exception {
connectionConsumer.close(); connectionConsumer.close();
connection.close(); connection.close();
super.tearDown(); super.tearDown();
@ -169,10 +178,14 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
session.run(); session.run();
// commit the tx and return ServerSession to pool // commit the tx and return ServerSession to pool
LOG.debug("Waiting on pool");
synchronized (pool) { synchronized (pool) {
try { try {
LOG.debug("About to call session.commit");
session.commit(); session.commit();
LOG.debug("Commit completed");
} catch (JMSException e) { } catch (JMSException e) {
LOG.error("In start", e);
} }
pool.serverSessionInUse = false; pool.serverSessionInUse = false;
} }
@ -199,7 +212,9 @@ public class OnePrefetchAsyncConsumerTest extends EmbeddedBrokerTestSupport {
TimeUnit.SECONDS.sleep(4); TimeUnit.SECONDS.sleep(4);
} }
} catch (JMSException e) { } catch (JMSException e) {
LOG.error("in onMessage", e);
} catch (InterruptedException e) { } catch (InterruptedException e) {
LOG.error("in onMessage",e);
} }
} }
} }