ARTEMIS-1059 option to monitor Paging counters

Adding System.property artemis.debug.paging.interval (in seconds)
to debug paging counters.
This commit is contained in:
Clebert Suconic 2017-03-22 12:17:20 -04:00 committed by Justin Bertram
parent 633b9c75dd
commit 1c88c06abb
6 changed files with 132 additions and 0 deletions
artemis-server/src/main/java/org/apache/activemq/artemis/core/paging
tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client

View File

@ -107,4 +107,8 @@ public interface PagingManager extends ActiveMQComponent, HierarchicalRepository
boolean isDiskFull();
default long getGlobalSize() {
return 0;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.paging;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
@ -49,4 +50,14 @@ public interface PagingStoreFactory {
void injectMonitor(FileStoreMonitor monitor) throws Exception;
default ScheduledExecutorService getScheduledExecutor() {
return null;
}
default Executor newExecutor() {
return null;
}
}

View File

@ -23,6 +23,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -31,6 +32,7 @@ import org.apache.activemq.artemis.core.paging.PageTransactionInfo;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.PagingStoreFactory;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.files.FileStoreMonitor;
import org.apache.activemq.artemis.core.settings.HierarchicalRepository;
@ -40,6 +42,8 @@ import org.jboss.logging.Logger;
public final class PagingManagerImpl implements PagingManager {
private static final int ARTEMIS_DEBUG_PAGING_INTERVAL = Integer.valueOf(System.getProperty("artemis.debug.paging.interval", "0"));
private static final Logger logger = Logger.getLogger(PagingManagerImpl.class);
private volatile boolean started = false;
@ -62,6 +66,8 @@ public final class PagingManagerImpl implements PagingManager {
private final AtomicLong globalSizeBytes = new AtomicLong(0);
private final AtomicLong numberOfMessages = new AtomicLong(0);
private final long maxSize;
private volatile boolean cleanupEnabled = true;
@ -70,6 +76,8 @@ public final class PagingManagerImpl implements PagingManager {
private final ConcurrentMap</*TransactionID*/Long, PageTransactionInfo> transactions = new ConcurrentHashMap<>();
private ActiveMQScheduledComponent scheduledComponent = null;
// Static
// --------------------------------------------------------------------------------------------------------------------------
@ -109,6 +117,13 @@ public final class PagingManagerImpl implements PagingManager {
@Override
public PagingManagerImpl addSize(int size) {
if (size > 0) {
numberOfMessages.incrementAndGet();
} else {
numberOfMessages.decrementAndGet();
}
long newSize = globalSizeBytes.addAndGet(size);
if (newSize < 0) {
@ -121,6 +136,11 @@ public final class PagingManagerImpl implements PagingManager {
return this;
}
@Override
public long getGlobalSize() {
return globalSizeBytes.get();
}
protected void checkMemoryRelease() {
if (!diskFull && (maxSize < 0 || globalSizeBytes.get() < maxSize) && !blockedStored.isEmpty()) {
Iterator<PagingStore> storeIterator = blockedStored.iterator();
@ -314,12 +334,28 @@ public final class PagingManagerImpl implements PagingManager {
reloadStores();
if (ARTEMIS_DEBUG_PAGING_INTERVAL > 0) {
this.scheduledComponent = new ActiveMQScheduledComponent(pagingStoreFactory.getScheduledExecutor(), pagingStoreFactory.newExecutor(), ARTEMIS_DEBUG_PAGING_INTERVAL, TimeUnit.SECONDS, false) {
@Override
public void run() {
debug();
}
};
this.scheduledComponent.start();
}
started = true;
} finally {
unlock();
}
}
public void debug() {
logger.info("size = " + globalSizeBytes + " bytes, messages = " + numberOfMessages);
}
@Override
public synchronized void stop() throws Exception {
if (!started) {
@ -327,6 +363,11 @@ public final class PagingManagerImpl implements PagingManager {
}
started = false;
if (scheduledComponent != null) {
this.scheduledComponent.stop();
this.scheduledComponent = null;
}
lock();
try {

View File

@ -79,6 +79,16 @@ public class PagingStoreFactoryDatabase implements PagingStoreFactory {
private JDBCSequentialFile directoryList;
@Override
public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}
@Override
public Executor newExecutor() {
return executorFactory.getExecutor();
}
private boolean started = false;
public PagingStoreFactoryDatabase(final DatabaseStorageConfiguration dbConf,

View File

@ -92,6 +92,16 @@ public class PagingStoreFactoryNIO implements PagingStoreFactory {
// Public --------------------------------------------------------
@Override
public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}
@Override
public Executor newExecutor() {
return executorFactory.getExecutor();
}
@Override
public void stop() {
}

View File

@ -308,8 +308,51 @@ public class ConsumerTest extends ActiveMQTestBase {
}
public void internalSimpleSend(int protocolSender, int protocolConsumer) throws Throwable {
ConnectionFactory factorySend = createFactory(protocolSender);
ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer);
Connection connection = factorySend.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(QUEUE.toString());
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage msg = session.createTextMessage("hello");
msg.setIntProperty("mycount", 0);
producer.send(msg);
connection.close();
connection = factoryConsume.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(QUEUE.toString());
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals(0, message.getIntProperty("mycount"));
Assert.assertEquals("hello", message.getText());
Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
} finally {
connection.close();
}
}
public void internalSend(int protocolSender, int protocolConsumer) throws Throwable {
internalSimpleSend(protocolSender, protocolConsumer);
ConnectionFactory factorySend = createFactory(protocolSender);
ConnectionFactory factoryConsume = protocolConsumer == protocolSender ? factorySend : createFactory(protocolConsumer);
@ -414,6 +457,19 @@ public class ConsumerTest extends ActiveMQTestBase {
TextMessage msg = (TextMessage) consumer.receive(1000);
Assert.assertEquals("testSelectorExampleFromSpecs:2", msg.getText());
consumer.close();
consumer = session.createConsumer(queue);
msg = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(msg);
Assert.assertNull(consumer.receiveNoWait());
Wait.waitFor(() -> server.getPagingManager().getGlobalSize() == 0, 5000, 100);
Assert.assertEquals(0, server.getPagingManager().getGlobalSize());
} finally {
connection.close();
}