ARTEMIS-4569 Blocked Producers will hold runnables until messages are consumed.

When initially developed the expectation was that no more producers would keep connecting but in a scenario like this
the consumers could actually give up and things will just accumulate on the server.

We should cleanup these upon disconnect.
This commit is contained in:
Clebert Suconic 2024-01-16 12:14:28 -05:00 committed by clebertsuconic
parent 1fef332bea
commit cedc050e03
13 changed files with 514 additions and 85 deletions

View File

@ -18,9 +18,13 @@
package org.apache.activemq.artemis.utils.runnables;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.Consumer;
public abstract class AtomicRunnable implements Runnable {
public static AtomicRunnable delegate(Runnable runnable) {
return new AtomicRunnableWithDelegate(runnable);
}
public static AtomicRunnable checkAtomic(Runnable run) {
if (run instanceof AtomicRunnable) {
@ -30,6 +34,27 @@ public abstract class AtomicRunnable implements Runnable {
}
}
private RunnableList acceptedList;
private Consumer<AtomicRunnable> cancelTask;
public RunnableList getAcceptedList() {
return acceptedList;
}
public AtomicRunnable setAcceptedList(RunnableList acceptedList) {
this.acceptedList = acceptedList;
return this;
}
public Consumer<AtomicRunnable> getCancel() {
return cancelTask;
}
public AtomicRunnable setCancel(Consumer<AtomicRunnable> cancelTask) {
this.cancelTask = cancelTask;
return this;
}
private volatile int ran;
private static final AtomicIntegerFieldUpdater<AtomicRunnable> RAN_UPDATE =
@ -52,7 +77,21 @@ public abstract class AtomicRunnable implements Runnable {
@Override
public void run() {
if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
atomicRun();
try {
atomicRun();
} finally {
if (acceptedList != null) {
acceptedList.remove(AtomicRunnable.this);
}
}
}
}
public void cancel() {
if (RAN_UPDATE.compareAndSet(this, 0, 1)) {
if (cancelTask != null) {
cancelTask.accept(AtomicRunnable.this);
}
}
}

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.runnables;
import java.util.HashSet;
import java.util.function.Consumer;
public class RunnableList {
private final HashSet<AtomicRunnable> list = new HashSet<>();
public RunnableList() {
}
public synchronized void add(AtomicRunnable runnable) {
runnable.setAcceptedList(this);
list.add(runnable);
}
public int size() {
return list.size();
}
public synchronized void remove(AtomicRunnable runnable) {
list.remove(runnable);
}
public synchronized void cancel() {
list.forEach(this::cancel);
list.clear();
}
private void cancel(AtomicRunnable atomicRunnable) {
atomicRunnable.cancel();
}
public void forEach(Consumer<AtomicRunnable> consumerRunnable) {
list.forEach(consumerRunnable);
}
}

View File

@ -0,0 +1,135 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.utils.runnables;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Test;
public class RunnableListTest {
HashSet<AtomicRunnable> masterList = new HashSet<>();
@Test
public void testRunning() {
AtomicInteger result = new AtomicInteger();
RunnableList listA = new RunnableList();
RunnableList listB = new RunnableList();
RunnableList listC = new RunnableList();
RunnableList[] lists = new RunnableList[]{listA, listB, listC};
for (RunnableList l : lists) {
for (int i = 0; i < 10; i++) {
AtomicRunnable runnable = new AtomicRunnable() {
@Override
public void atomicRun() {
result.incrementAndGet();
masterList.remove(this);
}
};
addItem(l, runnable);
}
}
Assert.assertEquals(30, masterList.size());
runList(listA);
Assert.assertEquals(10, result.get());
Assert.assertEquals(20, masterList.size());
Assert.assertEquals(0, listA.size());
Assert.assertEquals(10, listB.size());
Assert.assertEquals(10, listC.size());
HashSet<AtomicRunnable> copyList = new HashSet<>();
copyList.addAll(masterList);
copyList.forEach(r -> r.run());
for (RunnableList l : lists) {
Assert.assertEquals(0, l.size());
}
Assert.assertEquals(30, result.get());
}
@Test
public void testCancel() {
AtomicInteger result = new AtomicInteger();
RunnableList listA = new RunnableList();
RunnableList listB = new RunnableList();
RunnableList listC = new RunnableList();
RunnableList[] lists = new RunnableList[]{listA, listB, listC};
for (RunnableList l : lists) {
for (int i = 0; i < 10; i++) {
AtomicRunnable runnable = new AtomicRunnable() {
@Override
public void atomicRun() {
result.incrementAndGet();
masterList.remove(this);
}
};
addItem(l, runnable);
}
}
Assert.assertEquals(30, masterList.size());
listA.cancel();
Assert.assertEquals(0, result.get());
Assert.assertEquals(20, masterList.size());
Assert.assertEquals(0, listA.size());
Assert.assertEquals(10, listB.size());
Assert.assertEquals(10, listC.size());
listB.cancel();
listC.cancel();
for (RunnableList l : lists) {
Assert.assertEquals(0, l.size());
}
Assert.assertEquals(0, masterList.size());
}
// runs all AtomicRunnables inside the list
private void runList(RunnableList list) {
// make a copy of all the tasks to a new list
ArrayList<AtomicRunnable> toRun = new ArrayList<>();
list.forEach(toRun::add);
// run all the elements
toRun.forEach(r -> r.run());
}
private void addItem(RunnableList list, AtomicRunnable runnable) {
list.add(runnable);
runnable.setCancel(masterList::remove);
masterList.add(runnable);
}
}

View File

@ -64,6 +64,7 @@ import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SelectorTranslator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.artemis.utils.runnables.RunnableList;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
@ -112,6 +113,8 @@ public class AMQPSessionCallback implements SessionCallback {
private ProtonTransactionHandler transactionHandler;
private final RunnableList blockedRunnables = new RunnableList();
public AMQPSessionCallback(AMQPConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection,
@ -384,6 +387,7 @@ public class AMQPSessionCallback implements SessionCallback {
}
public void close() throws Exception {
blockedRunnables.cancel();
//need to check here as this can be called if init fails
if (serverSession != null) {
// we cannot hold the nettyExecutor on this rollback here, otherwise other connections will be waiting
@ -610,7 +614,7 @@ public class AMQPSessionCallback implements SessionCallback {
} else {
final PagingStore store = manager.getServer().getPagingManager().getPageStore(address);
if (store != null) {
store.checkMemory(runnable);
store.checkMemory(runnable, blockedRunnables::add);
} else {
runnable.run();
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.protocol.amqp.broker;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingManager;
@ -157,7 +158,7 @@ public class AMQPSessionCallbackTest {
session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
Mockito.verify(pagingStore).checkMemory(argument.capture(), Mockito.isA(Consumer.class));
assertNotNull(argument.getValue());
argument.getValue().run();
@ -188,7 +189,7 @@ public class AMQPSessionCallbackTest {
session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(AMQP_CREDITS_DEFAULT, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
Mockito.verify(pagingStore).checkMemory(argument.capture(), Mockito.isA(Consumer.class));
assertNotNull(argument.getValue());
argument.getValue().run();
@ -249,7 +250,7 @@ public class AMQPSessionCallbackTest {
session.flow(new SimpleString("test"), ProtonServerReceiverContext.createCreditRunnable(1, AMQP_LOW_CREDITS_DEFAULT, receiver, connection));
// Run the credit refill code.
Mockito.verify(pagingStore).checkMemory(argument.capture());
Mockito.verify(pagingStore).checkMemory(argument.capture(), Mockito.isA(Consumer.class));
assertNotNull(argument.getValue());
argument.getValue().run();

View File

@ -51,6 +51,8 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.apache.activemq.artemis.utils.runnables.RunnableList;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
@ -78,6 +80,8 @@ public class AMQSession implements SessionCallback {
private final ActiveMQServer server;
private final OpenWireConnection connection;
private final RunnableList blockedRunnables = new RunnableList();
private final AtomicBoolean started = new AtomicBoolean(false);
private final ScheduledExecutorService scheduledPool;
@ -320,8 +324,7 @@ public class AMQSession implements SessionCallback {
@Override
public void closed() {
// TODO Auto-generated method stub
blockedRunnables.cancel();
}
@Override
@ -338,6 +341,7 @@ public class AMQSession implements SessionCallback {
@Override
public void disconnect(ServerConsumer serverConsumer, String errorMessage) {
blockedRunnables.cancel();
// for an openwire consumer this is fatal because unlike with activemq5 sending
// to the address will not auto create the consumer binding and it will be in limbo.
// forcing disconnect allows it to failover and recreate its binding.
@ -412,7 +416,7 @@ public class AMQSession implements SessionCallback {
sendShouldBlockProducer(producerInfo, messageSend, sendProducerAck, store, dest, count, coreMsg, address);
} else {
if (store != null) {
if (!store.checkMemory(true, this::restoreAutoRead, this::blockConnection)) {
if (!store.checkMemory(true, AtomicRunnable.delegate(this::restoreAutoRead), AtomicRunnable.delegate(this::blockConnection), this.blockedRunnables::add)) {
restoreAutoRead();
throw new ResourceAllocationException("Queue is full " + address);
}
@ -440,62 +444,65 @@ public class AMQSession implements SessionCallback {
final AtomicInteger count,
final org.apache.activemq.artemis.api.core.Message coreMsg,
final SimpleString address) throws ResourceAllocationException {
final Runnable task = () -> {
Exception exceptionToSend = null;
try {
getCoreSession().send(coreMsg, false, producerInfo.getProducerId().toString(), dest.isTemporary());
} catch (Exception e) {
logger.debug("Sending exception to the client", e);
exceptionToSend = e;
}
connection.enableTtl();
if (count == null || count.decrementAndGet() == 0) {
if (exceptionToSend != null) {
this.connection.getContext().setDontSendReponse(false);
connection.sendException(exceptionToSend);
} else {
server.getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
if (sendProducerAck) {
try {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchAsync(ack);
} catch (Exception e) {
final AtomicRunnable task = new AtomicRunnable() {
@Override
public void atomicRun() {
Exception exceptionToSend = null;
try {
getCoreSession().send(coreMsg, false, producerInfo.getProducerId().toString(), dest.isTemporary());
} catch (Exception e) {
logger.debug("Sending exception to the client", e);
exceptionToSend = e;
}
connection.enableTtl();
if (count == null || count.decrementAndGet() == 0) {
if (exceptionToSend != null) {
connection.getContext().setDontSendReponse(false);
connection.sendException(exceptionToSend);
} else {
server.getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
if (sendProducerAck) {
try {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchAsync(ack);
} catch (Exception e) {
connection.getContext().setDontSendReponse(false);
logger.warn(e.getMessage(), e);
connection.sendException(e);
}
} else {
connection.getContext().setDontSendReponse(false);
logger.warn(e.getMessage(), e);
connection.sendException(e);
}
} else {
connection.getContext().setDontSendReponse(false);
try {
Response response = new Response();
response.setCorrelationId(messageSend.getCommandId());
connection.dispatchAsync(response);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
connection.sendException(e);
try {
Response response = new Response();
response.setCorrelationId(messageSend.getCommandId());
connection.dispatchAsync(response);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
connection.sendException(e);
}
}
}
}
@Override
public void onError(int errorCode, String errorMessage) {
try {
final IOException e = new IOException(errorMessage);
logger.warn(errorMessage);
connection.serviceException(e);
} catch (Exception ex) {
logger.debug(ex.getMessage(), ex);
@Override
public void onError(int errorCode, String errorMessage) {
try {
final IOException e = new IOException(errorMessage);
logger.warn(errorMessage);
connection.serviceException(e);
} catch (Exception ex) {
logger.debug(ex.getMessage(), ex);
}
}
}
});
});
}
}
}
};
if (store != null) {
if (!store.checkMemory(false, task, null)) {
if (!store.checkMemory(false, task, null, blockedRunnables::add)) {
this.connection.getContext().setDontSendReponse(false);
connection.enableTtl();
throw new ResourceAllocationException("Queue is full " + address);
@ -542,11 +549,12 @@ public class AMQSession implements SessionCallback {
}
public void close() throws Exception {
this.coreSession.close(false);
this.close(false);
}
@Override
public void close(boolean failed) {
blockedRunnables.cancel();
try {
this.coreSession.close(failed);
} catch (Exception bestEffort) {

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging;
import java.io.File;
import java.util.Collection;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RefCountMessageListener;
@ -35,6 +36,7 @@ import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
/**
* <p>
@ -175,9 +177,9 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
addSize(size, false);
}
boolean checkMemory(Runnable runnable);
boolean checkMemory(Runnable runnable, Consumer<AtomicRunnable> blockedCallback);
boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable runWhenBlocking);
boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable runWhenBlocking, Consumer<AtomicRunnable> blockedCallback);
boolean isFull();

View File

@ -1030,16 +1030,25 @@ public class PagingStoreImpl implements PagingStore {
}
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
return checkMemory(true, runWhenAvailable, null);
public boolean checkMemory(final Runnable runWhenAvailable, Consumer<AtomicRunnable> blockedCallback) {
return checkMemory(true, runWhenAvailable, null, blockedCallback);
}
private void addToBlockList(AtomicRunnable atomicRunnable, Consumer<AtomicRunnable> accepted) {
onMemoryFreedRunnables.add(atomicRunnable);
atomicRunnable.setCancel(onMemoryFreedRunnables::remove);
if (accepted != null) {
accepted.accept(atomicRunnable);
}
}
@Override
public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable, Runnable runWhenBlocking) {
public boolean checkMemory(boolean runOnFailure, Runnable runWhenAvailableParameter, Runnable runWhenBlocking, Consumer<AtomicRunnable> blockedCallback) {
AtomicRunnable runWhenAvailable = AtomicRunnable.checkAtomic(runWhenAvailableParameter);
if (blockedViaAddressControl) {
if (runWhenAvailable != null) {
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
addToBlockList(runWhenAvailable, blockedCallback);
}
return false;
}
@ -1047,7 +1056,7 @@ public class PagingStoreImpl implements PagingStore {
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || maxMessages != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
if (isFull()) {
if (runOnFailure && runWhenAvailable != null) {
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
addToBlockList(runWhenAvailable, blockedCallback);
}
return false;
}
@ -1057,8 +1066,7 @@ public class PagingStoreImpl implements PagingStore {
runWhenBlocking.run();
}
AtomicRunnable atomicRunWhenAvailable = AtomicRunnable.checkAtomic(runWhenAvailable);
onMemoryFreedRunnables.add(atomicRunWhenAvailable);
addToBlockList(runWhenAvailable, blockedCallback);
// We check again to avoid a race condition where the size can come down just after the element
// has been added, but the check to execute was done before the element was added
@ -1066,7 +1074,8 @@ public class PagingStoreImpl implements PagingStore {
// MUCH better performance in a highly concurrent environment
if (!pagingManager.isGlobalFull() && !full) {
// run it now
atomicRunWhenAvailable.run();
runWhenAvailable.run();
onMemoryFreedRunnables.remove(runWhenAvailable);
} else {
if (usingGlobalMaxSize || pagingManager.isDiskFull()) {
pagingManager.addBlockedStore(this);
@ -1122,13 +1131,11 @@ public class PagingStoreImpl implements PagingStore {
@Override
public boolean checkReleasedMemory() {
if (!blockedViaAddressControl && !pagingManager.isGlobalFull() && !full) {
if (!onMemoryFreedRunnables.isEmpty()) {
executor.execute(this::memoryReleased);
if (blocking) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, getPageInfo());
blocking = false;
return true;
}
executor.execute(this::memoryReleased);
if (blocking) {
ActiveMQServerLogger.LOGGER.unblockingMessageProduction(address, getPageInfo());
blocking = false;
return true;
}
}

View File

@ -106,6 +106,8 @@ import org.apache.activemq.artemis.utils.CompositeAddress;
import org.apache.activemq.artemis.utils.JsonLoader;
import org.apache.activemq.artemis.utils.PrefixUtil;
import org.apache.activemq.artemis.utils.collections.TypedProperties;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.apache.activemq.artemis.utils.runnables.RunnableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -140,6 +142,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
protected final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<>();
private final RunnableList blockedRunnables = new RunnableList();
protected final ServerProducers serverProducers;
protected volatile Transaction tx;
@ -391,6 +395,8 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
}
protected void doClose(final boolean failed) throws Exception {
blockedRunnables.cancel();
if (callback != null) {
callback.close(failed);
}
@ -2007,12 +2013,12 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
if (store == null) {
callback.sendProducerCreditsMessage(credits, address);
} else if (!store.checkMemory(new Runnable() {
} else if (!store.checkMemory(new AtomicRunnable() {
@Override
public void run() {
public void atomicRun() {
callback.sendProducerCreditsMessage(credits, address);
}
})) {
}, blockedRunnables::add)) {
callback.sendProducerCreditsFailMessage(credits, address);
}
}

View File

@ -39,8 +39,12 @@ public class MemoryAssertions {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
/** most tests should have these as 0 after execution. */
public static void basicMemoryAsserts() throws Exception {
basicMemoryAsserts(true);
}
/** most tests should have these as 0 after execution. */
public static void basicMemoryAsserts(boolean validateMessages) throws Exception {
CheckLeak checkLeak = new CheckLeak();
assertMemory(checkLeak, 0, OpenWireConnection.class.getName());
assertMemory(checkLeak, 0, ProtonServerSenderContext.class.getName());
@ -53,7 +57,9 @@ public class MemoryAssertions {
assertMemory(checkLeak, 0, AMQPSessionContext.class.getName());
assertMemory(checkLeak, 0, ServerConsumerImpl.class.getName());
assertMemory(checkLeak, 0, RoutingContextImpl.class.getName());
assertMemory(checkLeak, 0, MessageReferenceImpl.class.getName());
if (validateMessages) {
assertMemory(checkLeak, 0, MessageReferenceImpl.class.getName());
}
}
public static void assertMemory(CheckLeak checkLeak, int maxExpected, String clazz) throws Exception {

View File

@ -0,0 +1,163 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.leak;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.lang.invoke.MethodHandles;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import io.github.checkleak.core.CheckLeak;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptor;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.activemq.artemis.utils.Wait;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerBlockedLeakTest extends ActiveMQTestBase {
private static final int OK = 100;
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String QUEUE_NAME = "TEST_BLOCKED_QUEUE";
ActiveMQServer server;
@BeforeClass
public static void beforeClass() throws Exception {
Assume.assumeTrue(CheckLeak.isLoaded());
}
@Override
@Before
public void setUp() throws Exception {
super.setUp();
server = createServer(true, createDefaultConfig(1, true));
server.getConfiguration().getAddressSettings().clear();
server.getConfiguration().getAddressSettings().put("#", new AddressSettings().setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK).setMaxSizeMessages(10));
server.start();
}
@Test
public void testOPENWIRE() throws Exception {
testBlocked("OPENWIRE");
}
@Test
public void testCORE() throws Exception {
testBlocked("CORE");
}
@Test
public void testAMQP() throws Exception {
testBlocked("AMQP");
}
private void testBlocked(String protocol) throws Exception {
testBody(protocol);
MemoryAssertions.basicMemoryAsserts(false);
Queue queue = server.locateQueue(QUEUE_NAME);
queue.deleteAllReferences();
MemoryAssertions.basicMemoryAsserts(true);
server.stop();
}
// separating the test into a sub-method just to allow removing local references
// so they would be gone when basicMemoryAsserts is called
private void testBody(String protocol) throws Exception {
try (AssertionLoggerHandler loggerHandler = new AssertionLoggerHandler()) {
AtomicInteger messagesSent = new AtomicInteger(0);
server.addAddressInfo(new AddressInfo(QUEUE_NAME).addRoutingType(RoutingType.ANYCAST));
server.createQueue(new QueueConfiguration(QUEUE_NAME).setAddress(QUEUE_NAME).setRoutingType(RoutingType.ANYCAST).setDurable(true));
// clients need to be disconnected while blocked. For that reason a new VM is being spawned
Process process = SpawnedVMSupport.spawnVM(ProducerBlockedLeakTest.class.getName(), protocol, "10");
// checking the logs that the destination is blocked...
Wait.assertTrue(() -> loggerHandler.findText("AMQ222183"), 5000, 10);
process.destroyForcibly();
Assert.assertTrue(process.waitFor(10, TimeUnit.SECONDS));
// Making sure there are no connections anywhere in Acceptors or RemotingService.
// Just to speed up the test especially in OpenWire
server.getRemotingService().getConnections().forEach(c -> c.fail(new ActiveMQException("this is it!")));
Wait.assertEquals(0, () -> server.getRemotingService().getConnectionCount());
server.getRemotingService().getAcceptors().forEach((a, b) -> {
if (b instanceof NettyAcceptor) {
((NettyAcceptor) b).getConnections().clear();
}
});
}
}
public static void main(String[] arg) {
String protocol = arg[0];
int threads = Integer.parseInt(arg[1]);
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
ExecutorService executorService = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++) {
executorService.execute(() -> {
try {
Connection connection = factory.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
for (int send = 0; send < 100; send++) {
producer.send(session.createTextMessage("hello"));
session.commit();
}
} catch (Exception e) {
logger.warn(e.getMessage(), e);
Runtime.getRuntime().halt(-1);
}
});
}
try {
while (true) {
Thread.sleep(1000);
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
Runtime.getRuntime().halt(-1);
}
}
}

View File

@ -22,6 +22,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.Message;
@ -44,6 +45,7 @@ import org.apache.activemq.artemis.core.settings.impl.PageFullMessagePolicy;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.runnables.AtomicRunnable;
import org.junit.Assert;
import org.junit.Test;
@ -493,12 +495,12 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
}
@Override
public boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable ignoredRunnable) {
public boolean checkMemory(boolean runOnFailure, Runnable runnable, Runnable ignoredRunnable, Consumer<AtomicRunnable> accepted) {
return false;
}
@Override
public boolean checkMemory(Runnable runnable) {
public boolean checkMemory(Runnable runnable, Consumer<AtomicRunnable> accepted) {
return false;
}

View File

@ -1252,11 +1252,11 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
};
store.applySetting(new AddressSettings().setMaxSizeBytes(1000).setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK));
store.addSize(100);
store.checkMemory(trackMemoryChecks);
store.checkMemory(trackMemoryChecks, null);
assertEquals(1, calls.get());
store.block();
store.checkMemory(trackMemoryChecks);
store.checkMemory(trackMemoryChecks, null);
assertEquals(1, calls.get());
store.unblock();
@ -1272,7 +1272,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
assertEquals(100, store.getAddressLimitPercent());
// address full blocks
store.checkMemory(trackMemoryChecks);
store.checkMemory(trackMemoryChecks, null);
assertEquals(2, calls.get());
store.block();
@ -1300,7 +1300,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
store.addSize(900);
assertEquals(100, store.getAddressLimitPercent());
store.checkMemory(trackMemoryChecks);
store.checkMemory(trackMemoryChecks, null);
assertEquals("no change", 3, calls.get());
assertEquals("no change to be sure to be sure!", 3, calls.get());
@ -1493,7 +1493,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
// Do an initial check
final CountingRunnable trackMemoryCheck1 = new CountingRunnable();
assertEquals(0, trackMemoryCheck1.getCount());
store.checkMemory(trackMemoryCheck1);
store.checkMemory(trackMemoryCheck1, null);
assertEquals(1, trackMemoryCheck1.getCount());
// Do another check, this time indicate the disk is full during the first couple
@ -1501,7 +1501,7 @@ public class PagingStoreImplTest extends ActiveMQTestBase {
final CountingRunnable trackMemoryCheck2 = new CountingRunnable();
Mockito.when(mockManager.isDiskFull()).thenReturn(true, true, false);
assertEquals(0, trackMemoryCheck2.getCount());
store.checkMemory(trackMemoryCheck2);
store.checkMemory(trackMemoryCheck2, null);
assertEquals(1, trackMemoryCheck2.getCount());
// Now run the released memory checks. The task should NOT execute again, verify it doesnt.