diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java index f90c0b74e4..0bcff66e45 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java @@ -183,6 +183,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se private final Set knownDestinations = new ConcurrentHashSet<>(); + private AtomicBoolean disableTtl = new AtomicBoolean(false); + // TODO-NOW: check on why there are two connections created for every createConnection on the client. public OpenWireConnection(Connection connection, ActiveMQServer server, @@ -776,6 +778,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se this.connectionEntry = connectionEntry; } + @Override + public boolean checkDataReceived() { + if (disableTtl.get()) { + return true; + } + return super.checkDataReceived(); + } + public void setUpTtl(final long inactivityDuration, final long inactivityDurationInitialDelay, final boolean useKeepAlive) { @@ -818,6 +828,14 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se } } + public void disableTtl() { + disableTtl.set(true); + } + + public void enableTtl() { + disableTtl.set(false); + } + class SlowConsumerDetection implements SlowConsumerDetectionListener { @Override diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 9c592ca490..006f05e9a3 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -40,9 +40,7 @@ import org.apache.activemq.artemis.core.server.ServerConsumer; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.ServerSession; import org.apache.activemq.artemis.core.server.SlowConsumerDetectionListener; -import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.spi.core.protocol.SessionCallback; -import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; @@ -53,12 +51,12 @@ import org.apache.activemq.command.Message; import org.apache.activemq.command.MessageDispatch; import org.apache.activemq.command.ProducerAck; import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.Response; import org.apache.activemq.command.SessionInfo; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.wireformat.WireFormat; public class AMQSession implements SessionCallback { - // ConsumerID is generated inside the session, 0, 1, 2, ... as many consumers as you have on the session protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0); @@ -303,108 +301,103 @@ public class AMQSession implements SessionCallback { originalCoreMsg.putStringProperty(org.apache.activemq.artemis.api.core.Message.HDR_DUPLICATE_DETECTION_ID.toString(), messageSend.getMessageId().toString()); } - Runnable runnable; + boolean shouldBlockProducer = producerInfo.getWindowSize() > 0 || messageSend.isResponseRequired(); - if (sendProducerAck) { - runnable = new Runnable() { - @Override - public void run() { - try { - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - connection.dispatchSync(ack); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - connection.sendException(e); - } + final AtomicInteger count = new AtomicInteger(actualDestinations.length); - } - }; - } else { - final Connection transportConnection = connection.getTransportConnection(); + final Exception[] anyException = new Exception[] {null}; - if (transportConnection == null) { - // I don't think this could happen, but just in case, avoiding races - runnable = null; - } else { - runnable = new Runnable() { - @Override - public void run() { - transportConnection.setAutoRead(true); - } - }; - } + if (shouldBlockProducer) { + connection.getContext().setDontSendReponse(true); } - internalSend(actualDestinations, originalCoreMsg, runnable); - } - - private void internalSend(ActiveMQDestination[] actualDestinations, - ServerMessage originalCoreMsg, - final Runnable onComplete) throws Exception { - - Runnable runToUse; - - if (actualDestinations.length <= 1 || onComplete == null) { - // if onComplete is null, this will be null ;) - runToUse = onComplete; - } else { - final AtomicInteger count = new AtomicInteger(actualDestinations.length); - runToUse = new Runnable() { - @Override - public void run() { - if (count.decrementAndGet() == 0) { - onComplete.run(); - } - } - }; - } - - SimpleString[] addresses = new SimpleString[actualDestinations.length]; - PagingStore[] pagingStores = new PagingStore[actualDestinations.length]; - - // We fillup addresses, pagingStores and we will throw failure if that's the case for (int i = 0; i < actualDestinations.length; i++) { ActiveMQDestination dest = actualDestinations[i]; - addresses[i] = new SimpleString(dest.getPhysicalName()); - pagingStores[i] = server.getPagingManager().getPageStore(addresses[i]); - if (pagingStores[i].getAddressFullMessagePolicy() == AddressFullMessagePolicy.FAIL && pagingStores[i].isFull()) { - throw new ResourceAllocationException("Queue is full"); - } - } - - for (int i = 0; i < actualDestinations.length; i++) { - + SimpleString address = new SimpleString(dest.getPhysicalName()); ServerMessage coreMsg = originalCoreMsg.copy(); - - coreMsg.setAddress(addresses[i]); - - PagingStore store = pagingStores[i]; - - if (store.isFull()) { - connection.getTransportConnection().setAutoRead(false); - } + coreMsg.setAddress(address); if (actualDestinations[i].isQueue()) { checkAutoCreateQueue(new SimpleString(actualDestinations[i].getPhysicalName()), actualDestinations[i].isTemporary()); - } - - if (actualDestinations[i].isQueue()) { coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.ANYCAST.getType()); } else { coreMsg.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, RoutingType.MULTICAST.getType()); } - RoutingStatus result = getCoreSession().send(coreMsg, false, actualDestinations[i].isTemporary()); + PagingStore store = server.getPagingManager().getPageStore(address); - if (result == RoutingStatus.NO_BINDINGS && actualDestinations[i].isQueue()) { - throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + actualDestinations[i]); - } - if (runToUse != null) { - // if the timeout is >0, it will wait this much milliseconds - // before running the the runToUse - // this will eventually unblock blocked destinations - // playing flow control - store.checkMemory(runToUse); + this.connection.disableTtl(); + if (shouldBlockProducer) { + if (!store.checkMemory(() -> { + try { + RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary()); + + if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) { + throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest); + } + } catch (Exception e) { + if (anyException[0] == null) { + anyException[0] = e; + } + } + connection.enableTtl(); + if (count.decrementAndGet() == 0) { + if (anyException[0] != null) { + this.connection.getContext().setDontSendReponse(false); + ActiveMQServerLogger.LOGGER.warn(anyException[0].getMessage(), anyException[0]); + connection.sendException(anyException[0]); + } else { + if (sendProducerAck) { + try { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + connection.dispatchAsync(ack); + } catch (Exception e) { + this.connection.getContext().setDontSendReponse(false); + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); + } + } else { + connection.getContext().setDontSendReponse(false); + try { + Response response = new Response(); + response.setCorrelationId(messageSend.getCommandId()); + connection.dispatchAsync(response); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + connection.sendException(e); + } + } + } + } + })) { + this.connection.getContext().setDontSendReponse(false); + connection.enableTtl(); + throw new ResourceAllocationException("Queue is full " + address); + } + } else { + //non-persistent messages goes here, by default we stop reading from + //transport + connection.getTransportConnection().setAutoRead(false); + if (!store.checkMemory(() -> { + connection.getTransportConnection().setAutoRead(true); + connection.enableTtl(); + })) { + connection.getTransportConnection().setAutoRead(true); + connection.enableTtl(); + throw new ResourceAllocationException("Queue is full " + address); + } + + RoutingStatus result = getCoreSession().send(coreMsg, false, dest.isTemporary()); + if (result == RoutingStatus.NO_BINDINGS && dest.isQueue()) { + throw new InvalidDestinationException("Cannot publish to a non-existent Destination: " + dest); + } + + if (count.decrementAndGet() == 0) { + if (sendProducerAck) { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + connection.dispatchAsync(ack); + } + } } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java index 6be92f88f5..d01e237bd6 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/BasicOpenWireTest.java @@ -45,8 +45,8 @@ public class BasicOpenWireTest extends OpenWireTestBase { public TestName name = new TestName(); protected static final String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true"; - protected ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(urlString); - protected ActiveMQXAConnectionFactory xaFactory = new ActiveMQXAConnectionFactory(urlString); + protected ActiveMQConnectionFactory factory; + protected ActiveMQXAConnectionFactory xaFactory; protected ActiveMQConnection connection; protected String topicName = "amqTestTopic1"; @@ -64,6 +64,9 @@ public class BasicOpenWireTest extends OpenWireTestBase { @Before public void setUp() throws Exception { super.setUp(); + System.setProperty("org.apache.activemq.transport.AbstractInactivityMonitor.keepAliveTime", "5"); + factory = new ActiveMQConnectionFactory(getConnectionUrl()); + xaFactory = new ActiveMQXAConnectionFactory(getConnectionUrl()); SimpleString coreQueue = new SimpleString(queueName); this.server.createQueue(coreQueue, RoutingType.ANYCAST, coreQueue, null, false, false, -1, false, true); testQueues.put(queueName, coreQueue); @@ -81,6 +84,10 @@ public class BasicOpenWireTest extends OpenWireTestBase { } } + protected String getConnectionUrl() { + return urlString; + } + @Override @After public void tearDown() throws Exception { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java new file mode 100644 index 0000000000..8473d676a4 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/amq/ProducerBlockingTtlTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.openwire.amq; + +import org.apache.activemq.ActiveMQConnection; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.transport.tcp.TcpTransport; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import javax.jms.Connection; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +public class ProducerBlockingTtlTest extends BasicOpenWireTest { + + ActiveMQQueue queueA = new ActiveMQQueue("QUEUE.A"); + protected ActiveMQConnection flowControlConnection; + + @Override + protected void extraServerConfig(Configuration serverConfig) { + String match = "#"; + Map asMap = serverConfig.getAddressesSettings(); + asMap.get(match).setMaxSizeBytes(1).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + this.makeSureCoreQueueExist("QUEUE.A"); + } + + @Override + @After + public void tearDown() throws Exception { + try { + if (flowControlConnection != null) { + TcpTransport t = flowControlConnection.getTransport().narrow(TcpTransport.class); + try { + flowControlConnection.getTransport().stop(); + flowControlConnection.close(); + } catch (Throwable ignored) { + } + } + } finally { + super.tearDown(); + } + } + + //set ttl to 1000 + @Override + protected String getConnectionUrl() { + return urlString + "&wireFormat.maxInactivityDuration=1000&wireFormat.maxInactivityDurationInitalDelay=1000"; + } + + @Test + public void testProducerBlockWontGetTimeout() throws Exception { + + flowControlConnection = (ActiveMQConnection) factory.createConnection(); + Connection consumerConnection = factory.createConnection(); + Thread fillThread = null; + AtomicBoolean keepGoing = new AtomicBoolean(true); + try { + flowControlConnection.start(); + + final Session session = flowControlConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + final MessageProducer producer = session.createProducer(queueA); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + final String text = "Hello World"; + final int num = 10; + + fillThread = new Thread("Fill thread.") { + @Override + public void run() { + try { + for (int i = 0; i < num && keepGoing.get(); i++) { + producer.send(session.createTextMessage(text + i)); + } + } catch (JMSException e) { + } + } + }; + + fillThread.start(); + + //longer enough than TTL (1000) + Thread.sleep(4000); + + //receive messages and unblock the producer + consumerConnection.start(); + Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = consumerSession.createConsumer(queueA); + + for (int i = 0; i < num; i++) { + TextMessage m = (TextMessage) consumer.receive(5000); + assertNotNull(m); + assertEquals("Hello World" + i, m.getText()); + } + assertNull(consumer.receive(3)); + + } catch (Exception e) { + e.printStackTrace(); + } finally { + + if (fillThread != null) { + keepGoing.set(false); + fillThread.interrupt(); + fillThread.join(); + } + try { + flowControlConnection.close(); + flowControlConnection = null; + } catch (Throwable t) { + } + try { + consumerConnection.close(); + } catch (Throwable t) { + } + } + } +}