ARTEMIS-2421 periodic journal lock evaluation

If a broker loses its file lock on the journal and doesn't notice (e.g.
network connection failure to an NFS mount) then it can continue to run
after its backup activates resulting in split-brain.

This commit implements periodic journal lock evaluation so that if a live
server loses its lock it will automatically restart itself.
This commit is contained in:
Bas Elzinga 2019-02-08 15:20:26 +01:00 committed by Clebert Suconic
parent c2c0890178
commit e12f3ddc6f
12 changed files with 339 additions and 21 deletions

View File

@ -26,9 +26,8 @@ import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -62,6 +61,9 @@ import org.apache.activemq.artemis.utils.ActiveMQThreadFactory;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
@Command(name = "print", description = "Print data records information (WARNING: don't use while a production server is running)")
public class PrintData extends DBOption {
@ -129,7 +131,7 @@ public class PrintData extends DBOption {
if (serverLockFile.isFile()) {
try {
FileLockNodeManager fileLock = new FileLockNodeManager(messagesDirectory, false);
FileLockNodeManager fileLock = new FileLockNodeManager(messagesDirectory, false, new ScheduledThreadPoolExecutor(1));
fileLock.start();
printBanner(out, "Server's ID=" + fileLock.getNodeId().toString());
fileLock.stop();

View File

@ -505,12 +505,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
logger.debug("Detected no Shared Store HA options on JDBC store");
}
//LIVE_ONLY should be the default HA option when HA isn't configured
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout());
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout(), scheduledPool);
} else {
throw new IllegalArgumentException("JDBC persistence allows only Shared Store HA options");
}
} else {
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout());
manager = new FileLockNodeManager(directory, replicatingBackup, configuration.getJournalLockAcquisitionTimeout(), scheduledPool);
}
return manager;
}

View File

@ -22,6 +22,12 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString;
@ -54,7 +60,9 @@ public class FileLockNodeManager extends NodeManager {
private static final long LOCK_ACCESS_FAILURE_WAIT_TIME = 2000;
private FileLock liveLock;
private static final int LOCK_MONITOR_TIMEOUT_MILLIES = 2000;
private volatile FileLock liveLock;
private FileLock backupLock;
@ -64,13 +72,18 @@ public class FileLockNodeManager extends NodeManager {
protected boolean interrupted = false;
public FileLockNodeManager(final File directory, boolean replicatedBackup) {
private ScheduledExecutorService scheduledPool;
public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
this.scheduledPool = scheduledPool;
}
public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout) {
public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout,
ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
this.scheduledPool = scheduledPool;
this.lockAcquisitionTimeout = lockAcquisitionTimeout;
}
@ -215,6 +228,7 @@ public class FileLockNodeManager extends NodeManager {
public void activationComplete() {
try {
setLive();
startLockMonitoring();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
@ -224,6 +238,7 @@ public class FileLockNodeManager extends NodeManager {
@Override
public void pauseLiveServer() throws Exception {
stopLockMonitoring();
setPaused();
if (liveLock != null) {
liveLock.release();
@ -232,6 +247,7 @@ public class FileLockNodeManager extends NodeManager {
@Override
public void crashLiveServer() throws Exception {
stopLockMonitoring();
if (liveLock != null) {
liveLock.release();
liveLock = null;
@ -388,4 +404,109 @@ public class FileLockNodeManager extends NodeManager {
return null;
}
private synchronized void startLockMonitoring() {
logger.debug("Starting the lock monitor");
if (scheduledLockMonitor == null) {
MonitorLock monitorLock = new MonitorLock();
scheduledLockMonitor = scheduledPool.scheduleAtFixedRate(monitorLock, LOCK_MONITOR_TIMEOUT_MILLIES,
LOCK_MONITOR_TIMEOUT_MILLIES, TimeUnit.MILLISECONDS);
} else {
logger.debug("Lock monitor was already started");
}
}
private synchronized void stopLockMonitoring() {
logger.debug("Stopping the lock monitor");
if (scheduledLockMonitor != null) {
scheduledLockMonitor.cancel(true);
scheduledLockMonitor = null;
} else {
logger.debug("The lock monitor was already stopped");
}
}
private void notifyLostLock() {
// Additional check we are not initializing or have no locking object anymore
// because of a shutdown
if (lockListeners != null && liveLock != null) {
Set<LockListener> lockListenersSnapshot = null;
// Snapshot of the set because I'm not sure if we can trigger concurrent
// modification exception here if we don't
synchronized (lockListeners) {
lockListenersSnapshot = new HashSet<>(lockListeners);
}
lockListenersSnapshot.forEach(lockListener -> {
try {
lockListener.lostLock();
} catch (Exception e) {
// Need to notify everyone so ignore any exception
}
});
}
}
public void registerLockListener(LockListener lockListener) {
lockListeners.add(lockListener);
}
public void unregisterLockListener(LockListener lockListener) {
lockListeners.remove(lockListener);
}
protected final Set<LockListener> lockListeners = Collections.synchronizedSet(new HashSet<LockListener>());
private ScheduledFuture<?> scheduledLockMonitor;
public abstract class LockListener {
protected abstract void lostLock() throws Exception;
protected void unregisterListener() {
lockListeners.remove(this);
}
}
public class MonitorLock implements Runnable {
@Override
public void run() {
boolean lostLock = true;
try {
if (liveLock == null) {
logger.debug("Livelock is null");
}
lostLock = (liveLock != null && !liveLock.isValid()) || liveLock == null;
if (!lostLock) {
logger.debug("Server still has the lock, double check status is live");
// Java always thinks the lock is still valid even when there is no filesystem
// so we do another check
// Should be able to retrieve the status unless something is wrong
// When EFS is gone, this locks. Which can be solved but is a lot of threading
// work where we need to
// manage the timeout ourselves and interrupt the thread used to claim the lock.
byte state = getState();
if (state == LIVE) {
logger.debug("Status is set to live");
} else {
logger.debug("Status is not live");
}
}
} catch (Exception exception) {
// If something went wrong we probably lost the lock
logger.error(exception.getMessage(), exception);
lostLock = true;
}
if (lostLock) {
logger.warn("Lost the lock according to the monitor, notifying listeners");
notifyLostLock();
}
}
}
}

View File

@ -16,20 +16,26 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
import org.jboss.logging.Logger;
public final class SharedStoreLiveActivation extends LiveActivation {
private static final Logger logger = Logger.getLogger(SharedStoreLiveActivation.class);
//this is how we act when we initially start as live
// this is how we act when we initially start as live
private SharedStoreMasterPolicy sharedStoreMasterPolicy;
private ActiveMQServerImpl activeMQServer;
private volatile FileLockNodeManager.LockListener activeLockListener;
private volatile ActivateCallback nodeManagerActivateCallback;
public SharedStoreLiveActivation(ActiveMQServerImpl server, SharedStoreMasterPolicy sharedStoreMasterPolicy) {
this.activeMQServer = server;
this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
@ -51,8 +57,8 @@ public final class SharedStoreLiveActivation extends LiveActivation {
if (activeMQServer.getNodeManager().isBackupLive()) {
/*
* looks like we've failed over at some point need to inform that we are the backup
* so when the current live goes down they failover to us
* looks like we've failed over at some point need to inform that we are the
* backup so when the current live goes down they failover to us
*/
if (logger.isDebugEnabled()) {
logger.debug("announcing backup to the former live" + this);
@ -65,9 +71,12 @@ public final class SharedStoreLiveActivation extends LiveActivation {
activeMQServer.getBackupManager().announceBackup();
}
activeMQServer.registerActivateCallback(activeMQServer.getNodeManager().startLiveNode());
nodeManagerActivateCallback = activeMQServer.getNodeManager().startLiveNode();
activeMQServer.registerActivateCallback(nodeManagerActivateCallback);
addLockListener(activeMQServer, activeMQServer.getNodeManager());
if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED
|| activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
return;
}
@ -82,17 +91,76 @@ public final class SharedStoreLiveActivation extends LiveActivation {
}
}
private void addLockListener(ActiveMQServerImpl activeMQServer, NodeManager nodeManager) {
if (nodeManager instanceof FileLockNodeManager) {
FileLockNodeManager fileNodeManager = (FileLockNodeManager) nodeManager;
activeLockListener = fileNodeManager.new LockListener() {
@Override
public void lostLock() {
stopStartServerInSeperateThread(activeMQServer);
}
};
fileNodeManager.registerLockListener(activeLockListener);
} // else no business registering a listener
}
/**
* We need to do this in a new thread because this takes to long to finish in
* the scheduled thread Also this is not the responsibility of the scheduled
* thread
* @param activeMQServer
*/
private void stopStartServerInSeperateThread(ActiveMQServerImpl activeMQServer) {
try {
Runnable startServerRunnable = new Runnable() {
@Override
public void run() {
try {
activeMQServer.stop(true, false);
} catch (Exception e) {
logger.warn("Failed to stop artemis server after loosing the lock", e);
}
try {
activeMQServer.start();
} catch (Exception e) {
logger.error("Failed to start artemis server after recovering from loosing the lock", e);
}
}
};
Thread startServer = new Thread(startServerRunnable);
startServer.start();
} catch (Exception e) {
logger.error(e.getMessage());
}
}
@Override
public void close(boolean permanently, boolean restarting) throws Exception {
// TO avoid a NPE from stop
NodeManager nodeManagerInUse = activeMQServer.getNodeManager();
if (nodeManagerInUse != null) {
LockListener closeLockListener = activeLockListener;
if (closeLockListener != null) {
closeLockListener.unregisterListener();
}
ActivateCallback activateCallback = nodeManagerActivateCallback;
if (activateCallback != null) {
activeMQServer.unregisterActivateCallback(activateCallback);
}
if (sharedStoreMasterPolicy.isFailoverOnServerShutdown() || permanently) {
nodeManagerInUse.crashLiveServer();
} else {
nodeManagerInUse.pauseLiveServer();
}
}
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.util;
import javax.management.MBeanServer;
import java.io.File;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
@ -65,7 +66,7 @@ public class ColocatedActiveMQServer extends ActiveMQServerImpl {
@Override
protected NodeManager createNodeManager(final File directory, boolean replicatingBackup) {
if (replicatingBackup) {
return new FileLockNodeManager(directory, replicatingBackup, getConfiguration().getJournalLockAcquisitionTimeout());
return new FileLockNodeManager(directory, replicatingBackup, getConfiguration().getJournalLockAcquisitionTimeout(), new ScheduledThreadPoolExecutor(1));
} else {
if (backup) {
return nodeManagerBackup;

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class FileLockMonitorTest {
private File sharedDir;
private volatile boolean lostLock = false;
private volatile FileLockNodeManager nodeManager;
private ScheduledThreadPoolExecutor executor;
@Before
public void handleLockFile() throws IOException {
sharedDir = File.createTempFile("shared-dir", "");
sharedDir.delete();
Assert.assertTrue(sharedDir.mkdir());
lostLock = false;
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "sun.nio.ch.FileLockImpl", targetMethod = "isValid", action = "return false;") })
public void testLockMonitorInvalid() throws Exception {
lostLock = false;
startServer();
Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
nodeManager.isStarted();
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "throw new java.io.IOException(\"EFS is disconnected\");") })
public void testLockMonitorIOException() throws Exception {
lostLock = false;
startServer();
Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
public void testLockMonitorHasCorrectLockAndState() throws Exception {
lostLock = false;
startServer();
Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") })
public void testLockMonitorHasLockWrongState() throws Exception {
lostLock = false;
startServer();
Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
nodeManager.crashLiveServer();
executor.shutdown();
}
public LockListener startServer() throws Exception {
executor = new ScheduledThreadPoolExecutor(2);
nodeManager = new FileLockNodeManager(sharedDir, false, executor);
LockListener listener = nodeManager.new LockListener() {
@Override
protected void lostLock() throws Exception {
lostLock = true;
nodeManager.crashLiveServer();
}
};
nodeManager.registerLockListener(listener);
try {
nodeManager.start();
ActivateCallback startLiveNode = nodeManager.startLiveNode();
startLiveNode.activationComplete();
} catch (Exception exception) {
exception.printStackTrace();
}
return listener;
}
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.jboss.byteman.contrib.bmunit.BMRule;
@ -66,7 +67,7 @@ public class FileLockNodeManagerTest {
}
private long measureLockAcquisisionTimeout(long lockAcquisitionTimeout) throws Exception {
FileLockNodeManager manager = new FileLockNodeManager(sharedDir, false, lockAcquisitionTimeout);
FileLockNodeManager manager = new FileLockNodeManager(sharedDir, false, lockAcquisitionTimeout, new ScheduledThreadPoolExecutor(1));
manager.start();
// try to lock and measure real timeout

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -123,8 +124,8 @@ public class SharedStoreBackupActivationTest extends FailoverTestBase {
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
// nodes must use separate FileLockNodeManager instances!
NodeManager liveNodeManager = new FileLockNodeManager(sharedDir, false);
NodeManager backupNodeManager = new FileLockNodeManager(sharedDir, false);
NodeManager liveNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
NodeManager backupNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
backupConfig = super.createDefaultConfig(false)
.clearAcceptorConfigurations()

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.cluster;
import java.io.File;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
@ -117,7 +118,7 @@ public class NodeManagerAction {
}
NodeManagerAction nodeManagerAction = new NodeManagerAction(work1);
FileLockNodeManager nodeManager = new FileLockNodeManager(new File("."), false);
FileLockNodeManager nodeManager = new FileLockNodeManager(new File("."), false, new ScheduledThreadPoolExecutor(1));
nodeManager.start();
try {
nodeManagerAction.performWork(nodeManager);

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.cluster;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
@ -32,7 +33,7 @@ public class RealNodeManagerTest extends NodeManagerTest {
@Test
public void testId() throws Exception {
NodeManager nodeManager = new FileLockNodeManager(new File(getTemporaryDir()), false);
NodeManager nodeManager = new FileLockNodeManager(new File(getTemporaryDir()), false, new ScheduledThreadPoolExecutor(1));
nodeManager.start();
UUID id1 = nodeManager.getUUID();
nodeManager.stop();

View File

@ -24,6 +24,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
@ -132,7 +133,7 @@ public class NettyFailoverTest extends FailoverTest {
if (useSeparateLockFolder) {
config.getNodeManagerLockLocation().mkdirs();
}
return new FileLockNodeManager(config.getNodeManagerLockLocation(), false);
return new FileLockNodeManager(config.getNodeManagerLockLocation(), false, new ScheduledThreadPoolExecutor(1));
default:
throw new AssertionError("enum type not supported!");

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.unit.core.server.impl;
import java.io.File;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -35,7 +36,7 @@ public class FileLockTest extends ActiveMQTestBase {
@Test
public void testNIOLock() throws Exception {
doTestLock(new FileLockNodeManager(getTestDirfile(), false), new FileLockNodeManager(getTestDirfile(), false));
doTestLock(new FileLockNodeManager(getTestDirfile(), false, new ScheduledThreadPoolExecutor(1)), new FileLockNodeManager(getTestDirfile(), false, new ScheduledThreadPoolExecutor(1)));
}