This commit is contained in:
Clebert Suconic 2019-11-26 21:05:01 -05:00
commit 1fe910f8ed
8 changed files with 360 additions and 14 deletions

View File

@ -75,6 +75,28 @@ public abstract class ActiveMQScheduledComponent implements ActiveMQComponent, R
this.onDemand = onDemand;
}
/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
*
* @param scheduledExecutorService the {@link ScheduledExecutorService} that periodically trigger {@link #run()} on the configured {@code executor}
* @param initialDelay the time to delay first execution
* @param checkPeriod the delay between the termination of one execution and the start of the next
* @param timeUnit the time unit of the {@code initialDelay} and {@code checkPeriod} parameters
* @param onDemand if {@code true} the task won't be scheduled on {@link #start()}, {@code false} otherwise
*/
public ActiveMQScheduledComponent(ScheduledExecutorService scheduledExecutorService,
long initialDelay,
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
this.executor = null;
this.scheduledExecutorService = scheduledExecutorService;
this.initialDelay = initialDelay;
this.period = checkPeriod;
this.timeUnit = timeUnit;
this.onDemand = onDemand;
}
/**
* It creates a scheduled component that can trigger {@link #run()} with a fixed {@code checkPeriod} on a configured {@code executor}.
*

View File

@ -505,12 +505,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
logger.debug("Detected no Shared Store HA options on JDBC store");
}
//LIVE_ONLY should be the default HA option when HA isn't configured
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout());
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout(), scheduledPool);
} else {
throw new IllegalArgumentException("JDBC persistence allows only Shared Store HA options");
}
} else {
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout());
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout(), scheduledPool);
}
return manager;
}

View File

@ -22,11 +22,17 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID;
@ -54,7 +60,9 @@ public class FileLockNodeManager extends NodeManager {
private static final long LOCK_ACCESS_FAILURE_WAIT_TIME = 2000;
private FileLock liveLock;
private static final int LOCK_MONITOR_TIMEOUT_MILLIES = 2000;
private volatile FileLock liveLock;
private FileLock backupLock;
@ -64,13 +72,23 @@ public class FileLockNodeManager extends NodeManager {
protected boolean interrupted = false;
public FileLockNodeManager(final File directory, boolean replicatedBackup) {
private ScheduledExecutorService scheduledPool;
public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
this.scheduledPool = scheduledPool;
}
public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout) {
public FileLockNodeManager(final File directory, boolean replicatedBackup) {
super(replicatedBackup, directory);
this.scheduledPool = null;
}
public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout,
ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
this.scheduledPool = scheduledPool;
this.lockAcquisitionTimeout = lockAcquisitionTimeout;
}
@ -215,6 +233,7 @@ public class FileLockNodeManager extends NodeManager {
public void activationComplete() {
try {
setLive();
startLockMonitoring();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
@ -224,6 +243,7 @@ public class FileLockNodeManager extends NodeManager {
@Override
public void pauseLiveServer() throws Exception {
stopLockMonitoring();
setPaused();
if (liveLock != null) {
liveLock.release();
@ -232,6 +252,7 @@ public class FileLockNodeManager extends NodeManager {
@Override
public void crashLiveServer() throws Exception {
stopLockMonitoring();
if (liveLock != null) {
liveLock.release();
liveLock = null;
@ -388,4 +409,117 @@ public class FileLockNodeManager extends NodeManager {
return null;
}
private synchronized void startLockMonitoring() {
logger.debug("Starting the lock monitor");
if (monitorLock == null) {
monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_MILLIES, LOCK_MONITOR_TIMEOUT_MILLIES, TimeUnit.MILLISECONDS, false);
monitorLock.start();
} else {
logger.debug("Lock monitor was already started");
}
}
private synchronized void stopLockMonitoring() {
logger.debug("Stopping the lock monitor");
if (monitorLock != null) {
monitorLock.stop();
monitorLock = null;
} else {
logger.debug("The lock monitor was already stopped");
}
}
private void notifyLostLock() {
// Additional check we are not initializing or have no locking object anymore
// because of a shutdown
if (lockListeners != null && liveLock != null) {
Set<LockListener> lockListenersSnapshot = null;
// Snapshot of the set because I'm not sure if we can trigger concurrent
// modification exception here if we don't
synchronized (lockListeners) {
lockListenersSnapshot = new HashSet<>(lockListeners);
}
lockListenersSnapshot.forEach(lockListener -> {
try {
lockListener.lostLock();
} catch (Exception e) {
// Need to notify everyone so ignore any exception
}
});
}
}
public void registerLockListener(LockListener lockListener) {
lockListeners.add(lockListener);
}
public void unregisterLockListener(LockListener lockListener) {
lockListeners.remove(lockListener);
}
protected final Set<LockListener> lockListeners = Collections.synchronizedSet(new HashSet<LockListener>());
private MonitorLock monitorLock;
public abstract class LockListener {
protected abstract void lostLock() throws Exception;
protected void unregisterListener() {
lockListeners.remove(this);
}
}
public class MonitorLock extends ActiveMQScheduledComponent {
public MonitorLock(ScheduledExecutorService scheduledExecutorService,
long initialDelay,
long checkPeriod,
TimeUnit timeUnit,
boolean onDemand) {
super(scheduledExecutorService, initialDelay, checkPeriod, timeUnit, onDemand);
}
@Override
public void run() {
boolean lostLock = true;
try {
if (liveLock == null) {
logger.debug("Livelock is null");
}
lostLock = (liveLock != null && !liveLock.isValid()) || liveLock == null;
if (!lostLock) {
logger.debug("Server still has the lock, double check status is live");
// Java always thinks the lock is still valid even when there is no filesystem
// so we do another check
// Should be able to retrieve the status unless something is wrong
// When EFS is gone, this locks. Which can be solved but is a lot of threading
// work where we need to
// manage the timeout ourselves and interrupt the thread used to claim the lock.
byte state = getState();
if (state == LIVE) {
logger.debug("Status is set to live");
} else {
logger.debug("Status is not live");
}
}
} catch (Exception exception) {
// If something went wrong we probably lost the lock
logger.error(exception.getMessage(), exception);
lostLock = true;
}
if (lostLock) {
logger.warn("Lost the lock according to the monitor, notifying listeners");
notifyLostLock();
}
}
}
}

View File

@ -16,20 +16,26 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
import org.jboss.logging.Logger;
public final class SharedStoreLiveActivation extends LiveActivation {
private static final Logger logger = Logger.getLogger(SharedStoreLiveActivation.class);
//this is how we act when we initially start as live
// this is how we act when we initially start as live
private SharedStoreMasterPolicy sharedStoreMasterPolicy;
private ActiveMQServerImpl activeMQServer;
private volatile FileLockNodeManager.LockListener activeLockListener;
private volatile ActivateCallback nodeManagerActivateCallback;
public SharedStoreLiveActivation(ActiveMQServerImpl server, SharedStoreMasterPolicy sharedStoreMasterPolicy) {
this.activeMQServer = server;
this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
@ -51,8 +57,8 @@ public final class SharedStoreLiveActivation extends LiveActivation {
if (activeMQServer.getNodeManager().isBackupLive()) {
/*
* looks like we've failed over at some point need to inform that we are the backup
* so when the current live goes down they failover to us
* looks like we've failed over at some point need to inform that we are the
* backup so when the current live goes down they failover to us
*/
if (logger.isDebugEnabled()) {
logger.debug("announcing backup to the former live" + this);
@ -65,9 +71,12 @@ public final class SharedStoreLiveActivation extends LiveActivation {
activeMQServer.getBackupManager().announceBackup();
}
activeMQServer.registerActivateCallback(activeMQServer.getNodeManager().startLiveNode());
nodeManagerActivateCallback = activeMQServer.getNodeManager().startLiveNode();
activeMQServer.registerActivateCallback(nodeManagerActivateCallback);
addLockListener(activeMQServer, activeMQServer.getNodeManager());
if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED
|| activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
return;
}
@ -82,17 +91,76 @@ public final class SharedStoreLiveActivation extends LiveActivation {
}
}
private void addLockListener(ActiveMQServerImpl activeMQServer, NodeManager nodeManager) {
if (nodeManager instanceof FileLockNodeManager) {
FileLockNodeManager fileNodeManager = (FileLockNodeManager) nodeManager;
activeLockListener = fileNodeManager.new LockListener() {
@Override
public void lostLock() {
stopStartServerInSeperateThread(activeMQServer);
}
};
fileNodeManager.registerLockListener(activeLockListener);
} // else no business registering a listener
}
/**
* We need to do this in a new thread because this takes to long to finish in
* the scheduled thread Also this is not the responsibility of the scheduled
* thread
* @param activeMQServer
*/
private void stopStartServerInSeperateThread(ActiveMQServerImpl activeMQServer) {
try {
Runnable startServerRunnable = new Runnable() {
@Override
public void run() {
try {
activeMQServer.stop(true, false);
} catch (Exception e) {
logger.warn("Failed to stop artemis server after loosing the lock", e);
}
try {
activeMQServer.start();
} catch (Exception e) {
logger.error("Failed to start artemis server after recovering from loosing the lock", e);
}
}
};
Thread startServer = new Thread(startServerRunnable);
startServer.start();
} catch (Exception e) {
logger.error(e.getMessage());
}
}
@Override
public void close(boolean permanently, boolean restarting) throws Exception {
// TO avoid a NPE from stop
NodeManager nodeManagerInUse = activeMQServer.getNodeManager();
if (nodeManagerInUse != null) {
LockListener closeLockListener = activeLockListener;
if (closeLockListener != null) {
closeLockListener.unregisterListener();
}
ActivateCallback activateCallback = nodeManagerActivateCallback;
if (activateCallback != null) {
activeMQServer.unregisterActivateCallback(activateCallback);
}
if (sharedStoreMasterPolicy.isFailoverOnServerShutdown() || permanently) {
nodeManagerInUse.crashLiveServer();
} else {
nodeManagerInUse.pauseLiveServer();
}
}
}
}

View File

@ -65,7 +65,7 @@ public class ColocatedActiveMQServer extends ActiveMQServerImpl {
@Override
protected NodeManager createNodeManager(final File directory, boolean replicatingBackup) {
if (replicatingBackup) {
return new FileLockNodeManager(directory, replicatingBackup, getConfiguration().getJournalLockAcquisitionTimeout());
return new FileLockNodeManager(directory, replicatingBackup, getConfiguration().getJournalLockAcquisitionTimeout(), null);
} else {
if (backup) {
return nodeManagerBackup;

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class FileLockMonitorTest {
private File sharedDir;
private volatile boolean lostLock = false;
private volatile FileLockNodeManager nodeManager;
private ScheduledThreadPoolExecutor executor;
@Before
public void handleLockFile() throws IOException {
sharedDir = File.createTempFile("shared-dir", "");
sharedDir.delete();
Assert.assertTrue(sharedDir.mkdir());
lostLock = false;
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "sun.nio.ch.FileLockImpl", targetMethod = "isValid", action = "return false;") })
public void testLockMonitorInvalid() throws Exception {
lostLock = false;
startServer();
Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
nodeManager.isStarted();
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "throw new java.io.IOException(\"EFS is disconnected\");") })
public void testLockMonitorIOException() throws Exception {
lostLock = false;
startServer();
Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
public void testLockMonitorHasCorrectLockAndState() throws Exception {
lostLock = false;
startServer();
Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") })
public void testLockMonitorHasLockWrongState() throws Exception {
lostLock = false;
startServer();
Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
nodeManager.crashLiveServer();
executor.shutdown();
}
public LockListener startServer() throws Exception {
executor = new ScheduledThreadPoolExecutor(2);
nodeManager = new FileLockNodeManager(sharedDir, false, executor);
LockListener listener = nodeManager.new LockListener() {
@Override
protected void lostLock() throws Exception {
lostLock = true;
nodeManager.crashLiveServer();
}
};
nodeManager.registerLockListener(listener);
try {
nodeManager.start();
ActivateCallback startLiveNode = nodeManager.startLiveNode();
startLiveNode.activationComplete();
} catch (Exception exception) {
exception.printStackTrace();
}
return listener;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.jboss.byteman.contrib.bmunit.BMRule;
@ -66,7 +67,7 @@ public class FileLockNodeManagerTest {
}
private long measureLockAcquisisionTimeout(long lockAcquisitionTimeout) throws Exception {
FileLockNodeManager manager = new FileLockNodeManager(sharedDir, false, lockAcquisitionTimeout);
FileLockNodeManager manager = new FileLockNodeManager(sharedDir, false, lockAcquisitionTimeout, new ScheduledThreadPoolExecutor(1));
manager.start();
// try to lock and measure real timeout

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -123,8 +124,8 @@ public class SharedStoreBackupActivationTest extends FailoverTestBase {
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
// nodes must use separate FileLockNodeManager instances!
NodeManager liveNodeManager = new FileLockNodeManager(sharedDir, false);
NodeManager backupNodeManager = new FileLockNodeManager(sharedDir, false);
NodeManager liveNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
NodeManager backupNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
backupConfig = super.createDefaultConfig(false)
.clearAcceptorConfigurations()