NO-JIRA: Fixing test hunging on OpenWire

This commit is contained in:
Clebert Suconic 2017-03-30 21:56:46 -04:00
parent 690b8d24d7
commit d779afe874
4 changed files with 190 additions and 167 deletions

View File

@ -1371,6 +1371,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
session.getCoreSession().resetTX(tx); session.getCoreSession().resetTX(tx);
try { try {
session.send(producerInfo, messageSend, sendProducerAck); session.send(producerInfo, messageSend, sendProducerAck);
} catch (Exception e) {
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
}
throw e;
} finally { } finally {
session.getCoreSession().resetTX(null); session.getCoreSession().resetTX(null);
} }
@ -1387,6 +1392,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
try { try {
AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId()); AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId());
consumerBrokerExchange.acknowledge(ack); consumerBrokerExchange.acknowledge(ack);
} catch (Exception e) {
if (tx != null) {
tx.markAsRollbackOnly(new ActiveMQException(e.getMessage()));
}
} finally { } finally {
session.getCoreSession().resetTX(null); session.getCoreSession().resetTX(null);
} }

View File

@ -0,0 +1,158 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.openwire.amq;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.junit.After;
import org.junit.Before;
public class ProducerFlowControlBaseTest extends BasicOpenWireTest {
ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
protected ActiveMQConnection flowControlConnection;
// used to test sendFailIfNoSpace on SystemUsage
protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
private Thread asyncThread = null;
protected void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
final AtomicBoolean done = new AtomicBoolean(true);
final AtomicBoolean keepGoing = new AtomicBoolean(true);
try {
// Starts an async thread that every time it publishes it sets the done
// flag to false.
// Once the send starts to block it will not reset the done flag
// anymore.
asyncThread = new Thread("Fill thread.") {
@Override
public void run() {
Session session = null;
try {
session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while (keepGoing.get()) {
done.set(false);
producer.send(session.createTextMessage("Hello World"));
}
} catch (JMSException e) {
} finally {
safeClose(session);
}
}
};
asyncThread.start();
waitForBlockedOrResourceLimit(done);
} finally {
keepGoing.set(false);
}
}
protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) throws InterruptedException {
while (true) {
Thread.sleep(100);
// the producer is blocked once the done flag stays true or there is a
// resource exception
if (done.get() || gotResourceException.get()) {
break;
}
done.set(true);
}
}
protected CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
final CountDownLatch done = new CountDownLatch(1);
new Thread("Send thread.") {
@Override
public void run() {
Session session = null;
try {
session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(session.createTextMessage(message));
done.countDown();
} catch (JMSException e) {
e.printStackTrace();
} finally {
safeClose(session);
}
}
}.start();
return done;
}
@Override
protected void extraServerConfig(Configuration serverConfig) {
String match = "#";
Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
this.makeSureCoreQueueExist("QUEUE.A");
this.makeSureCoreQueueExist("QUEUE.B");
}
@Override
@After
public void tearDown() throws Exception {
try {
if (flowControlConnection != null) {
TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class);
try {
flowControlConnection.getTransport().stop();
flowControlConnection.close();
} catch (Throwable ignored) {
// sometimes the disposed up can make the test to fail
// even worse I have seen this breaking every single test after this
// if not caught here
}
t.getTransportListener().onException(new IOException("Disposed."));
}
if (asyncThread != null) {
asyncThread.join();
asyncThread = null;
}
} finally {
super.tearDown();
}
}
}

View File

@ -34,13 +34,14 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
* adapted from: org.apache.activemq.ProducerFlowControlSendFailTest * adapted from: org.apache.activemq.ProducerFlowControlSendFailTest
*/ */
public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest { public class ProducerFlowControlSendFailTest extends ProducerFlowControlBaseTest {
@Override @Override
@Before @Before
@ -61,20 +62,8 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL);
} }
@Override
public void test2ndPublisherWithStandardConnectionThatIsBlocked() throws Exception {
// with sendFailIfNoSpace set, there is no blocking of the connection
}
@Override
public void testAsyncPublisherRecoverAfterBlock() throws Exception {
// sendFail means no flowControllwindow as there is no producer ack, just
// an exception
}
@Override
@Test @Test
public void testPublisherRecoverAfterBlock() throws Exception { public void testPublishWithTX() throws Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) getConnectionFactory(); ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) getConnectionFactory();
// with sendFail, there must be no flowControllwindow // with sendFail, there must be no flowControllwindow
// sendFail is an alternative flow control mechanism that does not block // sendFail is an alternative flow control mechanism that does not block
@ -82,45 +71,38 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest {
this.flowControlConnection = (ActiveMQConnection) factory.createConnection(); this.flowControlConnection = (ActiveMQConnection) factory.createConnection();
this.flowControlConnection.start(); this.flowControlConnection.start();
final Session session = this.flowControlConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); final Session session = this.flowControlConnection.createSession(true, Session.SESSION_TRANSACTED);
final MessageProducer producer = session.createProducer(queueA); final MessageProducer producer = session.createProducer(queueA);
final AtomicBoolean keepGoing = new AtomicBoolean(true); int successSent = 0;
boolean exception = false;
Thread thread = new Thread("Filler") { try {
@Override for (int i = 0; i < 5000; i++) {
public void run() { producer.send(session.createTextMessage("Test message"));
while (keepGoing.get()) { session.commit();
try { successSent++;
producer.send(session.createTextMessage("Test message"));
if (gotResourceException.get()) {
System.out.println("got exception");
// do not flood the broker with requests when full as we
// are sending async and they
// will be limited by the network buffers
Thread.sleep(200);
}
} catch (Exception e) {
// with async send, there will be no exceptions
e.printStackTrace();
}
}
} }
}; } catch (Exception e) {
thread.start(); exception = true;
waitForBlockedOrResourceLimit(new AtomicBoolean(false)); // with async send, there will be no exceptions
e.printStackTrace();
}
Assert.assertTrue(exception);
// resourceException on second message, resumption if we // resourceException on second message, resumption if we
// can receive 10 // can receive 10
MessageConsumer consumer = session.createConsumer(queueA); MessageConsumer consumer = session.createConsumer(queueA);
TextMessage msg; TextMessage msg;
for (int idx = 0; idx < 10; ++idx) { for (int idx = 0; idx < successSent; ++idx) {
msg = (TextMessage) consumer.receive(1000); msg = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(msg);
System.out.println("Received " + msg);
if (msg != null) { if (msg != null) {
msg.acknowledge(); msg.acknowledge();
} }
session.commit();
} }
keepGoing.set(false);
consumer.close(); consumer.close();
} }

View File

@ -16,40 +16,22 @@
*/ */
package org.apache.activemq.artemis.tests.integration.openwire.amq; package org.apache.activemq.artemis.tests.integration.openwire.amq;
import javax.jms.DeliveryMode;
import javax.jms.JMSException; import javax.jms.JMSException;
import javax.jms.MessageConsumer; import javax.jms.MessageConsumer;
import javax.jms.MessageProducer; import javax.jms.MessageProducer;
import javax.jms.Session; import javax.jms.Session;
import javax.jms.TextMessage; import javax.jms.TextMessage;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.ActiveMQConnection; import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.junit.After;
import org.junit.Before;
import org.junit.Test; import org.junit.Test;
/** /**
* adapted from: org.apache.activemq.ProducerFlowControlTest * adapted from: org.apache.activemq.ProducerFlowControlTest
*/ */
public class ProducerFlowControlTest extends BasicOpenWireTest { public class ProducerFlowControlTest extends ProducerFlowControlBaseTest {
ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A");
ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B");
protected ActiveMQConnection flowControlConnection;
// used to test sendFailIfNoSpace on SystemUsage
protected final AtomicBoolean gotResourceException = new AtomicBoolean(false);
private Thread asyncThread = null;
@Test @Test
public void test2ndPublisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception { public void test2ndPublisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception {
@ -247,112 +229,4 @@ public class ProducerFlowControlTest extends BasicOpenWireTest {
CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1"); CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS)); assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS));
} }
private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException {
final AtomicBoolean done = new AtomicBoolean(true);
final AtomicBoolean keepGoing = new AtomicBoolean(true);
// Starts an async thread that every time it publishes it sets the done
// flag to false.
// Once the send starts to block it will not reset the done flag
// anymore.
asyncThread = new Thread("Fill thread.") {
@Override
public void run() {
Session session = null;
try {
session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
while (keepGoing.get()) {
done.set(false);
producer.send(session.createTextMessage("Hello World"));
}
} catch (JMSException e) {
} finally {
safeClose(session);
}
}
};
asyncThread.start();
waitForBlockedOrResourceLimit(done);
keepGoing.set(false);
}
protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) throws InterruptedException {
while (true) {
Thread.sleep(100);
System.out.println("check done: " + done.get() + " ex: " + gotResourceException.get());
// the producer is blocked once the done flag stays true or there is a
// resource exception
if (done.get() || gotResourceException.get()) {
break;
}
done.set(true);
}
}
private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException {
final CountDownLatch done = new CountDownLatch(1);
new Thread("Send thread.") {
@Override
public void run() {
Session session = null;
try {
session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
producer.send(session.createTextMessage(message));
done.countDown();
} catch (JMSException e) {
e.printStackTrace();
} finally {
safeClose(session);
}
}
}.start();
return done;
}
@Override
protected void extraServerConfig(Configuration serverConfig) {
String match = "#";
Map<String, AddressSettings> asMap = serverConfig.getAddressesSettings();
asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK);
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
this.makeSureCoreQueueExist("QUEUE.A");
this.makeSureCoreQueueExist("QUEUE.B");
}
@Override
@After
public void tearDown() throws Exception {
try {
if (flowControlConnection != null) {
TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class);
try {
flowControlConnection.getTransport().stop();
flowControlConnection.close();
} catch (Throwable ignored) {
// sometimes the disposed up can make the test to fail
// even worse I have seen this breaking every single test after this
// if not caught here
}
t.getTransportListener().onException(new IOException("Disposed."));
}
if (asyncThread != null) {
asyncThread.join();
asyncThread = null;
}
} finally {
super.tearDown();
}
}
} }