diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index 60a8dca3f1..60876b9519 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -1371,6 +1371,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se session.getCoreSession().resetTX(tx); try { session.send(producerInfo, messageSend, sendProducerAck); + } catch (Exception e) { + if (tx != null) { + tx.markAsRollbackOnly(new ActiveMQException(e.getMessage())); + } + throw e; } finally { session.getCoreSession().resetTX(null); } @@ -1387,6 +1392,10 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se try { AMQConsumerBrokerExchange consumerBrokerExchange = consumerExchanges.get(ack.getConsumerId()); consumerBrokerExchange.acknowledge(ack); + } catch (Exception e) { + if (tx != null) { + tx.markAsRollbackOnly(new ActiveMQException(e.getMessage())); + } } finally { session.getCoreSession().resetTX(null); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java new file mode 100644 index 0000000000..2166201f1e --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlBaseTest.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.openwire.amq; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.junit.After; +import org.junit.Before; + +public class ProducerFlowControlBaseTest extends BasicOpenWireTest { + ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A"); + ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B"); + protected ActiveMQConnection flowControlConnection; + // used to test sendFailIfNoSpace on SystemUsage + protected final AtomicBoolean gotResourceException = new AtomicBoolean(false); + private Thread asyncThread = null; + + + protected void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException { + final AtomicBoolean done = new AtomicBoolean(true); + final AtomicBoolean keepGoing = new AtomicBoolean(true); + + try { + // Starts an async thread that every time it publishes it sets the done + // flag to false. + // Once the send starts to block it will not reset the done flag + // anymore. + asyncThread = new Thread("Fill thread.") { + @Override + public void run() { + Session session = null; + try { + session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + while (keepGoing.get()) { + done.set(false); + producer.send(session.createTextMessage("Hello World")); + } + } catch (JMSException e) { + } finally { + safeClose(session); + } + } + }; + asyncThread.start(); + + waitForBlockedOrResourceLimit(done); + } finally { + keepGoing.set(false); + } + } + + protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) throws InterruptedException { + while (true) { + Thread.sleep(100); + // the producer is blocked once the done flag stays true or there is a + // resource exception + if (done.get() || gotResourceException.get()) { + break; + } + done.set(true); + } + } + + protected CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException { + final CountDownLatch done = new CountDownLatch(1); + new Thread("Send thread.") { + @Override + public void run() { + Session session = null; + try { + session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + producer.send(session.createTextMessage(message)); + done.countDown(); + } catch (JMSException e) { + e.printStackTrace(); + } finally { + safeClose(session); + } + } + }.start(); + return done; + } + + @Override + protected void extraServerConfig(Configuration serverConfig) { + String match = "#"; + Map asMap = serverConfig.getAddressesSettings(); + asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + this.makeSureCoreQueueExist("QUEUE.A"); + this.makeSureCoreQueueExist("QUEUE.B"); + } + + @Override + @After + public void tearDown() throws Exception { + try { + if (flowControlConnection != null) { + TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class); + try { + flowControlConnection.getTransport().stop(); + flowControlConnection.close(); + } catch (Throwable ignored) { + // sometimes the disposed up can make the test to fail + // even worse I have seen this breaking every single test after this + // if not caught here + } + t.getTransportListener().onException(new IOException("Disposed.")); + } + if (asyncThread != null) { + asyncThread.join(); + asyncThread = null; + } + } finally { + super.tearDown(); + } + } + + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java index baacd16b06..e03ae27409 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlSendFailTest.java @@ -34,13 +34,14 @@ import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; /** * adapted from: org.apache.activemq.ProducerFlowControlSendFailTest */ -public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest { +public class ProducerFlowControlSendFailTest extends ProducerFlowControlBaseTest { @Override @Before @@ -61,20 +62,8 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest { asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.FAIL); } - @Override - public void test2ndPublisherWithStandardConnectionThatIsBlocked() throws Exception { - // with sendFailIfNoSpace set, there is no blocking of the connection - } - - @Override - public void testAsyncPublisherRecoverAfterBlock() throws Exception { - // sendFail means no flowControllwindow as there is no producer ack, just - // an exception - } - - @Override @Test - public void testPublisherRecoverAfterBlock() throws Exception { + public void testPublishWithTX() throws Exception { ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) getConnectionFactory(); // with sendFail, there must be no flowControllwindow // sendFail is an alternative flow control mechanism that does not block @@ -82,45 +71,38 @@ public class ProducerFlowControlSendFailTest extends ProducerFlowControlTest { this.flowControlConnection = (ActiveMQConnection) factory.createConnection(); this.flowControlConnection.start(); - final Session session = this.flowControlConnection.createSession(false, Session.CLIENT_ACKNOWLEDGE); + final Session session = this.flowControlConnection.createSession(true, Session.SESSION_TRANSACTED); final MessageProducer producer = session.createProducer(queueA); - final AtomicBoolean keepGoing = new AtomicBoolean(true); - - Thread thread = new Thread("Filler") { - @Override - public void run() { - while (keepGoing.get()) { - try { - producer.send(session.createTextMessage("Test message")); - if (gotResourceException.get()) { - System.out.println("got exception"); - // do not flood the broker with requests when full as we - // are sending async and they - // will be limited by the network buffers - Thread.sleep(200); - } - } catch (Exception e) { - // with async send, there will be no exceptions - e.printStackTrace(); - } - } + int successSent = 0; + boolean exception = false; + try { + for (int i = 0; i < 5000; i++) { + producer.send(session.createTextMessage("Test message")); + session.commit(); + successSent++; } - }; - thread.start(); - waitForBlockedOrResourceLimit(new AtomicBoolean(false)); + } catch (Exception e) { + exception = true; + // with async send, there will be no exceptions + e.printStackTrace(); + } + + Assert.assertTrue(exception); // resourceException on second message, resumption if we // can receive 10 MessageConsumer consumer = session.createConsumer(queueA); TextMessage msg; - for (int idx = 0; idx < 10; ++idx) { + for (int idx = 0; idx < successSent; ++idx) { msg = (TextMessage) consumer.receive(1000); + Assert.assertNotNull(msg); + System.out.println("Received " + msg); if (msg != null) { msg.acknowledge(); } + session.commit(); } - keepGoing.set(false); consumer.close(); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java index c085d0f710..bde8b7909e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerFlowControlTest.java @@ -16,40 +16,22 @@ */ package org.apache.activemq.artemis.tests.integration.openwire.amq; -import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageConsumer; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; -import java.io.IOException; -import java.util.Map; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.ActiveMQConnection; -import org.apache.activemq.artemis.core.config.Configuration; -import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; -import org.apache.activemq.artemis.core.settings.impl.AddressSettings; -import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; -import org.apache.activemq.command.ActiveMQQueue; -import org.apache.activemq.transport.tcp.TcpTransport; -import org.junit.After; -import org.junit.Before; import org.junit.Test; /** * adapted from: org.apache.activemq.ProducerFlowControlTest */ -public class ProducerFlowControlTest extends BasicOpenWireTest { - - ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A"); - ActiveMQQueue queueB = new ActiveMQQueue("QUEUE.B"); - protected ActiveMQConnection flowControlConnection; - // used to test sendFailIfNoSpace on SystemUsage - protected final AtomicBoolean gotResourceException = new AtomicBoolean(false); - private Thread asyncThread = null; +public class ProducerFlowControlTest extends ProducerFlowControlBaseTest { @Test public void test2ndPublisherWithProducerWindowSendConnectionThatIsBlocked() throws Exception { @@ -247,112 +229,4 @@ public class ProducerFlowControlTest extends BasicOpenWireTest { CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1"); assertFalse(pubishDoneToQeueuB.await(2, TimeUnit.SECONDS)); } - - private void fillQueue(final ActiveMQQueue queue) throws JMSException, InterruptedException { - final AtomicBoolean done = new AtomicBoolean(true); - final AtomicBoolean keepGoing = new AtomicBoolean(true); - - // Starts an async thread that every time it publishes it sets the done - // flag to false. - // Once the send starts to block it will not reset the done flag - // anymore. - asyncThread = new Thread("Fill thread.") { - @Override - public void run() { - Session session = null; - try { - session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - while (keepGoing.get()) { - done.set(false); - producer.send(session.createTextMessage("Hello World")); - } - } catch (JMSException e) { - } finally { - safeClose(session); - } - } - }; - asyncThread.start(); - - waitForBlockedOrResourceLimit(done); - keepGoing.set(false); - } - - protected void waitForBlockedOrResourceLimit(final AtomicBoolean done) throws InterruptedException { - while (true) { - Thread.sleep(100); - System.out.println("check done: " + done.get() + " ex: " + gotResourceException.get()); - // the producer is blocked once the done flag stays true or there is a - // resource exception - if (done.get() || gotResourceException.get()) { - break; - } - done.set(true); - } - } - - private CountDownLatch asyncSendTo(final ActiveMQQueue queue, final String message) throws JMSException { - final CountDownLatch done = new CountDownLatch(1); - new Thread("Send thread.") { - @Override - public void run() { - Session session = null; - try { - session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); - MessageProducer producer = session.createProducer(queue); - producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); - producer.send(session.createTextMessage(message)); - done.countDown(); - } catch (JMSException e) { - e.printStackTrace(); - } finally { - safeClose(session); - } - } - }.start(); - return done; - } - - @Override - protected void extraServerConfig(Configuration serverConfig) { - String match = "#"; - Map asMap = serverConfig.getAddressesSettings(); - asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - this.makeSureCoreQueueExist("QUEUE.A"); - this.makeSureCoreQueueExist("QUEUE.B"); - } - - @Override - @After - public void tearDown() throws Exception { - try { - if (flowControlConnection != null) { - TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class); - try { - flowControlConnection.getTransport().stop(); - flowControlConnection.close(); - } catch (Throwable ignored) { - // sometimes the disposed up can make the test to fail - // even worse I have seen this breaking every single test after this - // if not caught here - } - t.getTransportListener().onException(new IOException("Disposed.")); - } - if (asyncThread != null) { - asyncThread.join(); - asyncThread = null; - } - } finally { - super.tearDown(); - } - } - }