diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java index 9ec5e8f48d..aa2b7c1dea 100644 --- a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/AtomicRunnable.java @@ -18,9 +18,13 @@ package org.apache.activemq.artemis.utils.runnables; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.function.Consumer; public abstract class AtomicRunnable implements Runnable { + public static AtomicRunnable delegate(Runnable runnable) { + return new AtomicRunnableWithDelegate(runnable); + } public static AtomicRunnable checkAtomic(Runnable run) { if (run instanceof AtomicRunnable) { @@ -30,6 +34,27 @@ public abstract class AtomicRunnable implements Runnable { } } + private RunnableList acceptedList; + private Consumer cancelTask; + + public RunnableList getAcceptedList() { + return acceptedList; + } + + public AtomicRunnable setAcceptedList(RunnableList acceptedList) { + this.acceptedList = acceptedList; + return this; + } + + public Consumer getCancel() { + return cancelTask; + } + + public AtomicRunnable setCancel(Consumer cancelTask) { + this.cancelTask = cancelTask; + return this; + } + private volatile int ran; private static final AtomicIntegerFieldUpdater RAN_UPDATE = @@ -52,7 +77,21 @@ public abstract class AtomicRunnable implements Runnable { @Override public void run() { if (RAN_UPDATE.compareAndSet(this, 0, 1)) { - atomicRun(); + try { + atomicRun(); + } finally { + if (acceptedList != null) { + acceptedList.remove(AtomicRunnable.this); + } + } + } + } + + public void cancel() { + if (RAN_UPDATE.compareAndSet(this, 0, 1)) { + if (cancelTask != null) { + cancelTask.accept(AtomicRunnable.this); + } } } diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/RunnableList.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/RunnableList.java new file mode 100644 index 0000000000..a2bef5eec6 --- /dev/null +++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/utils/runnables/RunnableList.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.runnables; + +import java.util.HashSet; +import java.util.function.Consumer; + +public class RunnableList { + + private final HashSet list = new HashSet<>(); + + public RunnableList() { + } + + public synchronized void add(AtomicRunnable runnable) { + runnable.setAcceptedList(this); + list.add(runnable); + } + + public int size() { + return list.size(); + } + + public synchronized void remove(AtomicRunnable runnable) { + list.remove(runnable); + } + + public synchronized void cancel() { + list.forEach(this::cancel); + list.clear(); + } + + private void cancel(AtomicRunnable atomicRunnable) { + atomicRunnable.cancel(); + } + + public void forEach(Consumer consumerRunnable) { + list.forEach(consumerRunnable); + } + +} diff --git a/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/runnables/RunnableListTest.java b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/runnables/RunnableListTest.java new file mode 100644 index 0000000000..8d21fd58d6 --- /dev/null +++ b/artemis-commons/src/test/java/org/apache/activemq/artemis/utils/runnables/RunnableListTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.utils.runnables; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Assert; +import org.junit.Test; + +public class RunnableListTest { + + HashSet masterList = new HashSet<>(); + + @Test + public void testRunning() { + AtomicInteger result = new AtomicInteger(); + + RunnableList listA = new RunnableList(); + RunnableList listB = new RunnableList(); + RunnableList listC = new RunnableList(); + + RunnableList[] lists = new RunnableList[]{listA, listB, listC}; + for (RunnableList l : lists) { + for (int i = 0; i < 10; i++) { + AtomicRunnable runnable = new AtomicRunnable() { + @Override + public void atomicRun() { + result.incrementAndGet(); + masterList.remove(this); + } + }; + addItem(l, runnable); + } + } + + Assert.assertEquals(30, masterList.size()); + + runList(listA); + + Assert.assertEquals(10, result.get()); + + Assert.assertEquals(20, masterList.size()); + Assert.assertEquals(0, listA.size()); + Assert.assertEquals(10, listB.size()); + Assert.assertEquals(10, listC.size()); + + HashSet copyList = new HashSet<>(); + copyList.addAll(masterList); + + copyList.forEach(r -> r.run()); + + for (RunnableList l : lists) { + Assert.assertEquals(0, l.size()); + } + + Assert.assertEquals(30, result.get()); + } + + @Test + public void testCancel() { + AtomicInteger result = new AtomicInteger(); + + RunnableList listA = new RunnableList(); + RunnableList listB = new RunnableList(); + RunnableList listC = new RunnableList(); + + RunnableList[] lists = new RunnableList[]{listA, listB, listC}; + for (RunnableList l : lists) { + for (int i = 0; i < 10; i++) { + AtomicRunnable runnable = new AtomicRunnable() { + @Override + public void atomicRun() { + result.incrementAndGet(); + masterList.remove(this); + } + }; + addItem(l, runnable); + } + } + + Assert.assertEquals(30, masterList.size()); + + listA.cancel(); + + Assert.assertEquals(0, result.get()); + + Assert.assertEquals(20, masterList.size()); + Assert.assertEquals(0, listA.size()); + Assert.assertEquals(10, listB.size()); + Assert.assertEquals(10, listC.size()); + + listB.cancel(); + listC.cancel(); + + for (RunnableList l : lists) { + Assert.assertEquals(0, l.size()); + } + + Assert.assertEquals(0, masterList.size()); + } + + // runs all AtomicRunnables inside the list + private void runList(RunnableList list) { + // make a copy of all the tasks to a new list + ArrayList toRun = new ArrayList<>(); + list.forEach(toRun::add); + + // run all the elements + toRun.forEach(r -> r.run()); + } + + private void addItem(RunnableList list, AtomicRunnable runnable) { + list.add(runnable); + runnable.setCancel(masterList::remove); + masterList.add(runnable); + } + +} \ No newline at end of file diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 47941c8621..c8f081b30b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -64,6 +64,7 @@ import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SelectorTranslator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.apache.activemq.artemis.utils.runnables.RunnableList; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Accepted; @@ -112,6 +113,8 @@ public class AMQPSessionCallback implements SessionCallback { private ProtonTransactionHandler transactionHandler; + private final RunnableList blockedRunnables = new RunnableList(); + public AMQPSessionCallback(AMQPConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection, @@ -384,6 +387,7 @@ public class AMQPSessionCallback implements SessionCallback { } public void close() throws Exception { + blockedRunnables.cancel(); //need to check here as this can be called if init fails if (serverSession != null) { // we cannot hold the nettyExecutor on this rollback here, otherwise other connections will be waiting @@ -610,7 +614,7 @@ public class AMQPSessionCallback implements SessionCallback { } else { final PagingStore store = manager.getServer().getPagingManager().getPageStore(address); if (store != null) { - store.checkMemory(runnable); + store.checkMemory(runnable, blockedRunnables::add); } else { runnable.run(); } diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java index a902a26894..244a65baaf 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallbackTest.java @@ -17,6 +17,7 @@ package org.apache.activemq.artemis.protocol.amqp.broker; import java.util.concurrent.Executor; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.paging.PagingManager; @@ -157,7 +158,7 @@ public class AMQPSessionCallbackTest { session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection)); // Run the credit refill code. - Mockito.verify(pagingStore).checkMemory(argument.capture()); + Mockito.verify(pagingStore).checkMemory(argument.capture(), Mockito.isA(Consumer.class)); assertNotNull(argument.getValue()); argument.getValue().run(); @@ -188,7 +189,7 @@ public class AMQPSessionCallbackTest { session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection)); // Run the credit refill code. - Mockito.verify(pagingStore).checkMemory(argument.capture()); + Mockito.verify(pagingStore).checkMemory(argument.capture(), Mockito.isA(Consumer.class)); assertNotNull(argument.getValue()); argument.getValue().run(); @@ -249,7 +250,7 @@ public class AMQPSessionCallbackTest { session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection)); // Run the credit refill code. - Mockito.verify(pagingStore).checkMemory(argument.capture()); + Mockito.verify(pagingStore).checkMemory(argument.capture(), Mockito.isA(Consumer.class)); assertNotNull(argument.getValue()); argument.getValue().run(); diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java index 9c096fe545..1e267d27c8 100644 --- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java +++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQSession.java @@ -51,6 +51,8 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.IDGenerator; import org.apache.activemq.artemis.utils.SimpleIDGenerator; +import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; +import org.apache.activemq.artemis.utils.runnables.RunnableList; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ConnectionInfo; import org.apache.activemq.command.ConsumerInfo; @@ -78,6 +80,8 @@ public class AMQSession implements SessionCallback { private final ActiveMQServer server; private final OpenWireConnection connection; + private final RunnableList blockedRunnables = new RunnableList(); + private final AtomicBoolean started = new AtomicBoolean(false); private final ScheduledExecutorService scheduledPool; @@ -320,8 +324,7 @@ public class AMQSession implements SessionCallback { @Override public void closed() { - // TODO Auto-generated method stub - + blockedRunnables.cancel(); } @Override @@ -338,6 +341,7 @@ public class AMQSession implements SessionCallback { @Override public void disconnect(ServerConsumer serverConsumer, String errorMessage) { + blockedRunnables.cancel(); // for an openwire consumer this is fatal because unlike with activemq5 sending // to the address will not auto create the consumer binding and it will be in limbo. // forcing disconnect allows it to failover and recreate its binding. @@ -412,7 +416,7 @@ public class AMQSession implements SessionCallback { sendShouldBlockProducer(producerInfo, messageSend, sendProducerAck, store, dest, count, coreMsg, address); } else { if (store != null) { - if (!store.checkMemory(true, this::restoreAutoRead, this::blockConnection)) { + if (!store.checkMemory(true, AtomicRunnable.delegate(this::restoreAutoRead), AtomicRunnable.delegate(this::blockConnection), this.blockedRunnables::add)) { restoreAutoRead(); throw new ResourceAllocationException("Queue is full " + address); } @@ -440,62 +444,65 @@ public class AMQSession implements SessionCallback { final AtomicInteger count, final org.apache.activemq.artemis.api.core.Message coreMsg, final SimpleString address) throws ResourceAllocationException { - final Runnable task = () -> { - Exception exceptionToSend = null; - try { - getCoreSession().send(coreMsg, false, producerInfo.getProducerId().toString(), dest.isTemporary()); - } catch (Exception e) { - logger.debug("Sending exception to the client", e); - exceptionToSend = e; - } - connection.enableTtl(); - if (count == null || count.decrementAndGet() == 0) { - if (exceptionToSend != null) { - this.connection.getContext().setDontSendReponse(false); - connection.sendException(exceptionToSend); - } else { - server.getStorageManager().afterCompleteOperations(new IOCallback() { - @Override - public void done() { - if (sendProducerAck) { - try { - ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); - connection.dispatchAsync(ack); - } catch (Exception e) { + final AtomicRunnable task = new AtomicRunnable() { + @Override + public void atomicRun() { + Exception exceptionToSend = null; + try { + getCoreSession().send(coreMsg, false, producerInfo.getProducerId().toString(), dest.isTemporary()); + } catch (Exception e) { + logger.debug("Sending exception to the client", e); + exceptionToSend = e; + } + connection.enableTtl(); + if (count == null || count.decrementAndGet() == 0) { + if (exceptionToSend != null) { + connection.getContext().setDontSendReponse(false); + connection.sendException(exceptionToSend); + } else { + server.getStorageManager().afterCompleteOperations(new IOCallback() { + @Override + public void done() { + if (sendProducerAck) { + try { + ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize()); + connection.dispatchAsync(ack); + } catch (Exception e) { + connection.getContext().setDontSendReponse(false); + logger.warn(e.getMessage(), e); + connection.sendException(e); + } + } else { connection.getContext().setDontSendReponse(false); - logger.warn(e.getMessage(), e); - connection.sendException(e); - } - } else { - connection.getContext().setDontSendReponse(false); - try { - Response response = new Response(); - response.setCorrelationId(messageSend.getCommandId()); - connection.dispatchAsync(response); - } catch (Exception e) { - logger.warn(e.getMessage(), e); - connection.sendException(e); + try { + Response response = new Response(); + response.setCorrelationId(messageSend.getCommandId()); + connection.dispatchAsync(response); + } catch (Exception e) { + logger.warn(e.getMessage(), e); + connection.sendException(e); + } } } - } - @Override - public void onError(int errorCode, String errorMessage) { - try { - final IOException e = new IOException(errorMessage); - logger.warn(errorMessage); - connection.serviceException(e); - } catch (Exception ex) { - logger.debug(ex.getMessage(), ex); + @Override + public void onError(int errorCode, String errorMessage) { + try { + final IOException e = new IOException(errorMessage); + logger.warn(errorMessage); + connection.serviceException(e); + } catch (Exception ex) { + logger.debug(ex.getMessage(), ex); + } } - } - }); + }); + } } } }; if (store != null) { - if (!store.checkMemory(false, task, null)) { + if (!store.checkMemory(false, task, null, blockedRunnables::add)) { this.connection.getContext().setDontSendReponse(false); connection.enableTtl(); throw new ResourceAllocationException("Queue is full " + address); @@ -542,11 +549,12 @@ public class AMQSession implements SessionCallback { } public void close() throws Exception { - this.coreSession.close(false); + this.close(false); } @Override public void close(boolean failed) { + blockedRunnables.cancel(); try { this.coreSession.close(failed); } catch (Exception bestEffort) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java index b9221ba186..6ee69e07bd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/PagingStore.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging; import java.io.File; import java.util.Collection; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.core.Message; import org.apache.activemq.artemis.api.core.RefCountMessageListener; @@ -35,6 +36,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings; import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; /** *

@@ -175,9 +177,9 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener addSize(size, false); } - boolean checkMemory(Runnable runnable); + boolean checkMemory(Runnable runnable, Consumer blockedCallback); - boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable runWhenBlocking); + boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable runWhenBlocking, Consumer blockedCallback); boolean isFull(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java index adbc1028a6..0fedf9a647 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/paging/impl/PagingStoreImpl.java @@ -1030,16 +1030,25 @@ public class PagingStoreImpl implements PagingStore { } @Override - public boolean checkMemory(final Runnable runWhenAvailable) { - return checkMemory(true, runWhenAvailable, null); + public boolean checkMemory(final Runnable runWhenAvailable, Consumer blockedCallback) { + return checkMemory(true, runWhenAvailable, null, blockedCallback); + } + + private void addToBlockList(AtomicRunnable atomicRunnable, Consumer accepted) { + onMemoryFreedRunnables.add(atomicRunnable); + atomicRunnable.setCancel(onMemoryFreedRunnables::remove); + if (accepted != null) { + accepted.accept(atomicRunnable); + } } @Override - public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable, Runnable runWhenBlocking) { + public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParameter, Runnable runWhenBlocking, Consumer blockedCallback) { + AtomicRunnable runWhenAvailable = AtomicRunnable.checkAtomic(runWhenAvailableParameter); if (blockedViaAddressControl) { if (runWhenAvailable != null) { - onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); + addToBlockList(runWhenAvailable, blockedCallback); } return false; } @@ -1047,7 +1056,7 @@ public class PagingStoreImpl implements PagingStore { if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || maxMessages != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) { if (isFull()) { if (runOnFailure && runWhenAvailable != null) { - onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable)); + addToBlockList(runWhenAvailable, blockedCallback); } return false; } @@ -1057,8 +1066,7 @@ public class PagingStoreImpl implements PagingStore { runWhenBlocking.run(); } - AtomicRunnable atomicRunWhenAvailable = AtomicRunnable.checkAtomic(runWhenAvailable); - onMemoryFreedRunnables.add(atomicRunWhenAvailable); + addToBlockList(runWhenAvailable, blockedCallback); // We check again to avoid a race condition where the size can come down just after the element // has been added, but the check to execute was done before the element was added @@ -1066,7 +1074,8 @@ public class PagingStoreImpl implements PagingStore { // MUCH better performance in a highly concurrent environment if (!pagingManager.isGlobalFull() && !full) { // run it now - atomicRunWhenAvailable.run(); + runWhenAvailable.run(); + onMemoryFreedRunnables.remove(runWhenAvailable); } else { if (usingGlobalMaxSize || pagingManager.isDiskFull()) { pagingManager.addBlockedStore(this); @@ -1122,13 +1131,11 @@ public class PagingStoreImpl implements PagingStore { @Override public boolean checkReleasedMemory() { if (!blockedViaAddressControl && !pagingManager.isGlobalFull() && !full) { - if (!onMemoryFreedRunnables.isEmpty()) { - executor.execute(this::memoryReleased); - if (blocking) { - ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, getPageInfo()); - blocking = false; - return true; - } + executor.execute(this::memoryReleased); + if (blocking) { + ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, getPageInfo()); + blocking = false; + return true; } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java index d2f7eba3bc..55cf83596a 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java @@ -106,6 +106,8 @@ import org.apache.activemq.artemis.utils.CompositeAddress; import org.apache.activemq.artemis.utils.JsonLoader; import org.apache.activemq.artemis.utils.PrefixUtil; import org.apache.activemq.artemis.utils.collections.TypedProperties; +import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; +import org.apache.activemq.artemis.utils.runnables.RunnableList; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +142,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { protected final Map consumers = new ConcurrentHashMap<>(); + private final RunnableList blockedRunnables = new RunnableList(); + protected final ServerProducers serverProducers; protected volatile Transaction tx; @@ -391,6 +395,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener { } protected void doClose(final boolean failed) throws Exception { + blockedRunnables.cancel(); + if (callback != null) { callback.close(failed); } @@ -2007,12 +2013,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener { if (store == null) { callback.sendProducerCreditsMessage(credits, address); - } else if (!store.checkMemory(new Runnable() { + } else if (!store.checkMemory(new AtomicRunnable() { @Override - public void run() { + public void atomicRun() { callback.sendProducerCreditsMessage(credits, address); } - })) { + }, blockedRunnables::add)) { callback.sendProducerCreditsFailMessage(credits, address); } } diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java index 0345e1c7b2..937387241c 100644 --- a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/MemoryAssertions.java @@ -39,8 +39,12 @@ public class MemoryAssertions { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - /** most tests should have these as 0 after execution. */ public static void basicMemoryAsserts() throws Exception { + basicMemoryAsserts(true); + } + + /** most tests should have these as 0 after execution. */ + public static void basicMemoryAsserts(boolean validateMessages) throws Exception { CheckLeak checkLeak = new CheckLeak(); assertMemory(checkLeak, 0, OpenWireConnection.class.getName()); assertMemory(checkLeak, 0, ProtonServerSenderContext.class.getName()); @@ -53,7 +57,9 @@ public class MemoryAssertions { assertMemory(checkLeak, 0, AMQPSessionContext.class.getName()); assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName()); assertMemory(checkLeak, 0, RoutingContextImpl.class.getName()); - assertMemory(checkLeak, 0, MessageReferenceImpl.class.getName()); + if (validateMessages) { + assertMemory(checkLeak, 0, MessageReferenceImpl.class.getName()); + } } public static void assertMemory(CheckLeak checkLeak, int maxExpected, String clazz) throws Exception { diff --git a/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ProducerBlockedLeakTest.java b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ProducerBlockedLeakTest.java new file mode 100644 index 0000000000..3785a7705b --- /dev/null +++ b/tests/leak-tests/src/test/java/org/apache/activemq/artemis/tests/leak/ProducerBlockedLeakTest.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.leak; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import java.lang.invoke.MethodHandles; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.github.checkleak.core.CheckLeak; +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.RoutingType; +import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.Queue; +import org.apache.activemq.artemis.core.server.impl.AddressInfo; +import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.logs.AssertionLoggerHandler; +import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.tests.util.CFUtil; +import org.apache.activemq.artemis.utils.SpawnedVMSupport; +import org.apache.activemq.artemis.utils.Wait; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ProducerBlockedLeakTest extends ActiveMQTestBase { + + private static final int OK = 100; + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private static final String QUEUE_NAME = "TEST_BLOCKED_QUEUE"; + + ActiveMQServer server; + + @BeforeClass + public static void beforeClass() throws Exception { + Assume.assumeTrue(CheckLeak.isLoaded()); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + server = createServer(true, createDefaultConfig(1, true)); + server.getConfiguration().getAddressSettings().clear(); + server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK).setMaxSizeMessages(10)); + server.start(); + } + + @Test + public void testOPENWIRE() throws Exception { + testBlocked("OPENWIRE"); + } + + @Test + public void testCORE() throws Exception { + testBlocked("CORE"); + } + + @Test + public void testAMQP() throws Exception { + testBlocked("AMQP"); + } + + private void testBlocked(String protocol) throws Exception { + testBody(protocol); + MemoryAssertions.basicMemoryAsserts(false); + Queue queue = server.locateQueue(QUEUE_NAME); + queue.deleteAllReferences(); + MemoryAssertions.basicMemoryAsserts(true); + server.stop(); + } + + // separating the test into a sub-method just to allow removing local references + // so they would be gone when basicMemoryAsserts is called + private void testBody(String protocol) throws Exception { + try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) { + AtomicInteger messagesSent = new AtomicInteger(0); + + server.addAddressInfo(new AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST)); + server.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true)); + + // clients need to be disconnected while blocked. For that reason a new VM is being spawned + Process process = SpawnedVMSupport.spawnVM(ProducerBlockedLeakTest.class.getName(), protocol, "10"); + + // checking the logs that the destination is blocked... + Wait.assertTrue(() -> loggerHandler.findText("AMQ222183"), 5000, 10); + + process.destroyForcibly(); + Assert.assertTrue(process.waitFor(10, TimeUnit.SECONDS)); + + // Making sure there are no connections anywhere in Acceptors or RemotingService. + // Just to speed up the test especially in OpenWire + server.getRemotingService().getConnections().forEach(c -> c.fail(new ActiveMQException("this is it!"))); + Wait.assertEquals(0, () -> server.getRemotingService().getConnectionCount()); + server.getRemotingService().getAcceptors().forEach((a, b) -> { + if (b instanceof NettyAcceptor) { + ((NettyAcceptor) b).getConnections().clear(); + } + }); + } + } + + public static void main(String[] arg) { + String protocol = arg[0]; + int threads = Integer.parseInt(arg[1]); + ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616"); + ExecutorService executorService = Executors.newFixedThreadPool(threads); + + for (int i = 0; i < threads; i++) { + executorService.execute(() -> { + try { + Connection connection = factory.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED); + MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME)); + for (int send = 0; send < 100; send++) { + producer.send(session.createTextMessage("hello")); + session.commit(); + } + } catch (Exception e) { + logger.warn(e.getMessage(), e); + Runtime.getRuntime().halt(-1); + } + }); + } + try { + while (true) { + Thread.sleep(1000); + } + } catch (Throwable e) { + logger.warn(e.getMessage(), e); + Runtime.getRuntime().halt(-1); + } + } + +} \ No newline at end of file diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java index 3ab5633355..49722908fa 100644 --- a/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java +++ b/tests/performance-tests/src/test/java/org/apache/activemq/artemis/tests/performance/storage/PersistMultiThreadTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.core.Message; @@ -44,6 +45,7 @@ import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.artemis.utils.runnables.AtomicRunnable; import org.junit.Assert; import org.junit.Test; @@ -493,12 +495,12 @@ public class PersistMultiThreadTest extends ActiveMQTestBase { } @Override - public boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable ignoredRunnable) { + public boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable ignoredRunnable, Consumer accepted) { return false; } @Override - public boolean checkMemory(Runnable runnable) { + public boolean checkMemory(Runnable runnable, Consumer accepted) { return false; } diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java index 013f241cce..3c0835fa3c 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/paging/impl/PagingStoreImplTest.java @@ -1252,11 +1252,11 @@ public class PagingStoreImplTest extends ActiveMQTestBase { }; store.applySetting(new AddressSettings().setMaxSizeBytes(1000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK)); store.addSize(100); - store.checkMemory(trackMemoryChecks); + store.checkMemory(trackMemoryChecks, null); assertEquals(1, calls.get()); store.block(); - store.checkMemory(trackMemoryChecks); + store.checkMemory(trackMemoryChecks, null); assertEquals(1, calls.get()); store.unblock(); @@ -1272,7 +1272,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { assertEquals(100, store.getAddressLimitPercent()); // address full blocks - store.checkMemory(trackMemoryChecks); + store.checkMemory(trackMemoryChecks, null); assertEquals(2, calls.get()); store.block(); @@ -1300,7 +1300,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { store.addSize(900); assertEquals(100, store.getAddressLimitPercent()); - store.checkMemory(trackMemoryChecks); + store.checkMemory(trackMemoryChecks, null); assertEquals("no change", 3, calls.get()); assertEquals("no change to be sure to be sure!", 3, calls.get()); @@ -1493,7 +1493,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { // Do an initial check final CountingRunnable trackMemoryCheck1 = new CountingRunnable(); assertEquals(0, trackMemoryCheck1.getCount()); - store.checkMemory(trackMemoryCheck1); + store.checkMemory(trackMemoryCheck1, null); assertEquals(1, trackMemoryCheck1.getCount()); // Do another check, this time indicate the disk is full during the first couple @@ -1501,7 +1501,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase { final CountingRunnable trackMemoryCheck2 = new CountingRunnable(); Mockito.when(mockManager.isDiskFull()).thenReturn(true, true, false); assertEquals(0, trackMemoryCheck2.getCount()); - store.checkMemory(trackMemoryCheck2); + store.checkMemory(trackMemoryCheck2, null); assertEquals(1, trackMemoryCheck2.getCount()); // Now run the released memory checks. The task should NOT execute again, verify it doesnt.