ARTEMIS-2421 Using ActiveMQScheduledComponent

This commit is contained in:
Clebert Suconic 2019-11-26 09:04:20 -05:00
parent e12f3ddc6f
commit 0b1afd35dc
8 changed files with 53 additions and 25 deletions

View File

@ -26,8 +26,9 @@ import java.util.Set;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers; import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
@ -61,9 +62,6 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory; import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)") @Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
public class PrintData extends DBOption { public class PrintData extends DBOption {
@ -131,7 +129,7 @@ public class PrintData extends DBOption {
if (serverLockFile.isFile()) { if (serverLockFile.isFile()) {
try { try {
FileLockNodeManager fileLock = new FileLockNodeManager(messagesDirectory, false, new ScheduledThreadPoolExecutor(1)); FileLockNodeManager fileLock = new FileLockNodeManager(messagesDirectory, false);
fileLock.start(); fileLock.start();
printBanner(out, "Server's ID=" + fileLock.getNodeId().toString()); printBanner(out, "Server's ID=" + fileLock.getNodeId().toString());
fileLock.stop(); fileLock.stop();

View File

@ -75,6 +75,28 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
this.onDemand = onDemand; this.onDemand = onDemand;
} }
/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
*
* @param scheduledExecutorService the {@link ScheduledExecutorService} that periodically trigger {@link #run()} on the configured {@code executor}
* @param initialDelay the time to delay first execution
* @param checkPeriod the delay between the termination of one execution and the start of the next
* @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters
* @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise
*/
public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
long initialDelay,
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
this.executor = null;
this.scheduledExecutorService = scheduledExecutorService;
this.initialDelay = initialDelay;
this.period = checkPeriod;
this.timeUnit = timeUnit;
this.onDemand = onDemand;
}
/** /**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}. * It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
* *

View File

@ -26,13 +26,13 @@ import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException; import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
@ -79,6 +79,11 @@ public class FileLockNodeManager extends NodeManager {
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
} }
public FileLockNodeManager(final File directory, boolean replicatedBackup) {
super(replicatedBackup, directory);
this.scheduledPool = null;
}
public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout, public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory); super(replicatedBackup, directory);
@ -406,10 +411,9 @@ public class FileLockNodeManager extends NodeManager {
private synchronized void startLockMonitoring() { private synchronized void startLockMonitoring() {
logger.debug("Starting the lock monitor"); logger.debug("Starting the lock monitor");
if (scheduledLockMonitor == null) { if (monitorLock == null) {
MonitorLock monitorLock = new MonitorLock(); monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_MILLIES, LOCK_MONITOR_TIMEOUT_MILLIES, TimeUnit.MILLISECONDS, false);
scheduledLockMonitor = scheduledPool.scheduleAtFixedRate(monitorLock, LOCK_MONITOR_TIMEOUT_MILLIES, monitorLock.start();
LOCK_MONITOR_TIMEOUT_MILLIES, TimeUnit.MILLISECONDS);
} else { } else {
logger.debug("Lock monitor was already started"); logger.debug("Lock monitor was already started");
} }
@ -417,9 +421,9 @@ public class FileLockNodeManager extends NodeManager {
private synchronized void stopLockMonitoring() { private synchronized void stopLockMonitoring() {
logger.debug("Stopping the lock monitor"); logger.debug("Stopping the lock monitor");
if (scheduledLockMonitor != null) { if (monitorLock != null) {
scheduledLockMonitor.cancel(true); monitorLock.stop();
scheduledLockMonitor = null; monitorLock = null;
} else { } else {
logger.debug("The lock monitor was already stopped"); logger.debug("The lock monitor was already stopped");
} }
@ -457,7 +461,7 @@ public class FileLockNodeManager extends NodeManager {
protected final Set<LockListener> lockListeners = Collections.synchronizedSet(new HashSet<LockListener>()); protected final Set<LockListener> lockListeners = Collections.synchronizedSet(new HashSet<LockListener>());
private ScheduledFuture<?> scheduledLockMonitor; private MonitorLock monitorLock;
public abstract class LockListener { public abstract class LockListener {
protected abstract void lostLock() throws Exception; protected abstract void lostLock() throws Exception;
@ -467,7 +471,16 @@ public class FileLockNodeManager extends NodeManager {
} }
} }
public class MonitorLock implements Runnable {
public class MonitorLock extends ActiveMQScheduledComponent {
public MonitorLock(ScheduledExecutorService scheduledExecutorService,
long initialDelay,
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
super(scheduledExecutorService, initialDelay, checkPeriod, timeUnit, onDemand);
}
@Override @Override
public void run() { public void run() {

View File

@ -18,7 +18,6 @@ package org.apache.activemq.artemis.tests.util;
import javax.management.MBeanServer; import javax.management.MBeanServer;
import java.io.File; import java.io.File;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
@ -66,7 +65,7 @@ public class ColocatedActiveMQServer extends ActiveMQServerImpl {
@Override @Override
protected NodeManager createNodeManager(final File directory, boolean replicatingBackup) { protected NodeManager createNodeManager(final File directory, boolean replicatingBackup) {
if (replicatingBackup) { if (replicatingBackup) {
return new FileLockNodeManager(directory, replicatingBackup, getConfiguration().getJournalLockAcquisitionTimeout(), new ScheduledThreadPoolExecutor(1)); return new FileLockNodeManager(directory, replicatingBackup, getConfiguration().getJournalLockAcquisitionTimeout(), null);
} else { } else {
if (backup) { if (backup) {
return nodeManagerBackup; return nodeManagerBackup;

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.tests.integration.cluster; package org.apache.activemq.artemis.tests.integration.cluster;
import java.io.File; import java.io.File;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
@ -118,7 +117,7 @@ public class NodeManagerAction {
} }
NodeManagerAction nodeManagerAction = new NodeManagerAction(work1); NodeManagerAction nodeManagerAction = new NodeManagerAction(work1);
FileLockNodeManager nodeManager = new FileLockNodeManager(new File("."), false, new ScheduledThreadPoolExecutor(1)); FileLockNodeManager nodeManager = new FileLockNodeManager(new File("."), false);
nodeManager.start(); nodeManager.start();
try { try {
nodeManagerAction.performWork(nodeManager); nodeManagerAction.performWork(nodeManager);

View File

@ -19,7 +19,6 @@ package org.apache.activemq.artemis.tests.integration.cluster;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
@ -33,7 +32,7 @@ public class RealNodeManagerTest extends NodeManagerTest {
@Test @Test
public void testId() throws Exception { public void testId() throws Exception {
NodeManager nodeManager = new FileLockNodeManager(new File(getTemporaryDir()), false, new ScheduledThreadPoolExecutor(1)); NodeManager nodeManager = new FileLockNodeManager(new File(getTemporaryDir()), false);
nodeManager.start(); nodeManager.start();
UUID id1 = nodeManager.getUUID(); UUID id1 = nodeManager.getUUID();
nodeManager.stop(); nodeManager.stop();

View File

@ -24,7 +24,6 @@ import java.util.Map;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -133,7 +132,7 @@ public class NettyFailoverTest extends FailoverTest {
if (useSeparateLockFolder) { if (useSeparateLockFolder) {
config.getNodeManagerLockLocation().mkdirs(); config.getNodeManagerLockLocation().mkdirs();
} }
return new FileLockNodeManager(config.getNodeManagerLockLocation(), false, new ScheduledThreadPoolExecutor(1)); return new FileLockNodeManager(config.getNodeManagerLockLocation(), false);
default: default:
throw new AssertionError("enum type not supported!"); throw new AssertionError("enum type not supported!");

View File

@ -17,7 +17,6 @@
package org.apache.activemq.artemis.tests.unit.core.server.impl; package org.apache.activemq.artemis.tests.unit.core.server.impl;
import java.io.File; import java.io.File;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -36,7 +35,7 @@ public class FileLockTest extends ActiveMQTestBase {
@Test @Test
public void testNIOLock() throws Exception { public void testNIOLock() throws Exception {
doTestLock(new FileLockNodeManager(getTestDirfile(), false, new ScheduledThreadPoolExecutor(1)), new FileLockNodeManager(getTestDirfile(), false, new ScheduledThreadPoolExecutor(1))); doTestLock(new FileLockNodeManager(getTestDirfile(), false), new FileLockNodeManager(getTestDirfile(), false));
} }