This commit is contained in:
Clebert Suconic 2020-10-20 17:23:48 -04:00
commit 9a954188d8
37 changed files with 1387 additions and 928 deletions

View File

@ -16,52 +16,50 @@
*/ */
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
import java.io.File; import java.util.HashSet;
import java.io.IOException; import java.util.Set;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.jboss.logging.Logger;
public abstract class NodeManager implements ActiveMQComponent { public abstract class NodeManager implements ActiveMQComponent {
protected static final byte FIRST_TIME_START = '0'; @FunctionalInterface
public static final String SERVER_LOCK_NAME = "server.lock"; public interface LockListener {
private static final String ACCESS_MODE = "rw";
void lostLock();
}
private static final Logger LOGGER = Logger.getLogger(NodeManager.class);
protected final boolean replicatedBackup; protected final boolean replicatedBackup;
private final File directory; protected final Object nodeIDGuard = new Object();
private final Object nodeIDGuard = new Object();
private SimpleString nodeID; private SimpleString nodeID;
private UUID uuid; private UUID uuid;
private boolean isStarted = false; private boolean isStarted = false;
private final Set<FileLockNodeManager.LockListener> lockListeners;
protected FileChannel channel; public NodeManager(final boolean replicatedBackup) {
public NodeManager(final boolean replicatedBackup, final File directory) {
this.directory = directory;
this.replicatedBackup = replicatedBackup; this.replicatedBackup = replicatedBackup;
this.lockListeners = new HashSet<>();
} }
// -------------------------------------------------------------------- // --------------------------------------------------------------------
public abstract void awaitLiveNode() throws Exception; public abstract void awaitLiveNode() throws NodeManagerException, InterruptedException;
public abstract void awaitLiveStatus() throws Exception; public abstract void awaitLiveStatus() throws NodeManagerException, InterruptedException;
public abstract void startBackup() throws Exception; public abstract void startBackup() throws NodeManagerException, InterruptedException;
public abstract ActivateCallback startLiveNode() throws Exception; public abstract ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException;
public abstract void pauseLiveServer() throws Exception; public abstract void pauseLiveServer() throws NodeManagerException;
public abstract void crashLiveServer() throws Exception; public abstract void crashLiveServer() throws NodeManagerException;
public abstract void releaseBackup() throws Exception; public abstract void releaseBackup() throws NodeManagerException;
// -------------------------------------------------------------------- // --------------------------------------------------------------------
@ -81,7 +79,7 @@ public abstract class NodeManager implements ActiveMQComponent {
} }
} }
public abstract SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException; public abstract SimpleString readNodeId() throws NodeManagerException;
public UUID getUUID() { public UUID getUUID() {
synchronized (nodeIDGuard) { synchronized (nodeIDGuard) {
@ -113,119 +111,63 @@ public abstract class NodeManager implements ActiveMQComponent {
} }
} }
public abstract boolean isAwaitingFailback() throws Exception; public abstract boolean isAwaitingFailback() throws NodeManagerException;
public abstract boolean isBackupLive() throws Exception; public abstract boolean isBackupLive() throws NodeManagerException;
public abstract void interrupt(); public abstract void interrupt();
@Override @Override
public synchronized void stop() throws Exception { public synchronized void stop() throws Exception {
FileChannel channelCopy = channel; // force any running threads on node manager to stop
if (channelCopy != null)
channelCopy.close();
isStarted = false; isStarted = false;
lockListeners.clear();
} }
public void stopBackup() throws Exception { public void stopBackup() throws NodeManagerException {
if (replicatedBackup && getNodeId() != null) {
setUpServerLockFile();
}
releaseBackup(); releaseBackup();
} }
/** protected synchronized void checkStarted() {
* Ensures existence of persistent information about the server's nodeID. if (!isStarted) {
* <p> throw new IllegalStateException("the node manager is supposed to be started");
* Roughly the different use cases are: }
* <ol> }
* <li>old live server restarts: a server.lock file already exists and contains a nodeID.
* <li>new live server starting for the first time: no file exists, and we just *create* a new
* UUID to use as nodeID
* <li>replicated backup received its nodeID from its live: no file exists, we need to persist
* the *current* nodeID
* </ol>
*/
protected synchronized void setUpServerLockFile() throws IOException {
File serverLockFile = newFile(SERVER_LOCK_NAME);
boolean fileCreated = false; protected synchronized void notifyLostLock() {
if (!isStarted) {
int count = 0; return;
while (!serverLockFile.exists()) { }
lockListeners.forEach(lockListener -> {
try { try {
fileCreated = serverLockFile.createNewFile(); lockListener.lostLock();
} catch (RuntimeException e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile); LOGGER.warn("On notify lost lock", e);
throw e; // Need to notify everyone so ignore any exception
} catch (IOException e) {
/*
* on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
* */
if (count < 5) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
}
count++;
continue;
}
ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
throw e;
} }
});
} }
@SuppressWarnings("resource") public synchronized void registerLockListener(FileLockNodeManager.LockListener lockListener) {
RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE); lockListeners.add(lockListener);
channel = raFile.getChannel();
if (fileCreated) {
ByteBuffer id = ByteBuffer.allocateDirect(3);
byte[] bytes = new byte[3];
bytes[0] = FIRST_TIME_START;
bytes[1] = FIRST_TIME_START;
bytes[2] = FIRST_TIME_START;
id.put(bytes, 0, 3);
id.position(0);
channel.write(id, 0);
channel.force(true);
} }
createNodeId(); public synchronized void unregisterLockListener(FileLockNodeManager.LockListener lockListener) {
lockListeners.remove(lockListener);
} }
/** public static final class NodeManagerException extends RuntimeException {
* @return
*/ public NodeManagerException(String message) {
protected final File newFile(final String fileName) { super(message);
File file = new File(directory, fileName);
return file;
} }
protected final synchronized void createNodeId() throws IOException { public NodeManagerException(Throwable cause) {
synchronized (nodeIDGuard) { super(cause);
ByteBuffer id = ByteBuffer.allocateDirect(16);
int read = channel.read(id, 3);
if (replicatedBackup) {
id.position(0);
id.put(getUUID().asBytes(), 0, 16);
id.position(0);
channel.write(id, 3);
channel.force(true);
} else if (read != 16) {
setUUID(UUIDGenerator.getInstance().generateUUID());
id.put(getUUID().asBytes(), 0, 16);
id.position(0);
channel.write(id, 3);
channel.force(true);
} else {
byte[] bytes = new byte[16];
id.position(0);
id.get(bytes);
setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
}
}
} }
public NodeManagerException(String message, Throwable cause) {
super(message, cause);
}
}
} }

View File

@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ColocatedActivation; import org.apache.activemq.artemis.core.server.impl.ColocatedActivation;
import org.apache.activemq.artemis.core.server.impl.LiveActivation; import org.apache.activemq.artemis.core.server.impl.LiveActivation;
@ -91,8 +92,8 @@ public class ColocatedPolicy implements HAPolicy<LiveActivation> {
public LiveActivation createActivation(ActiveMQServerImpl server, public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive, boolean wasLive,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception { IOCriticalErrorListener ioCriticalErrorListener) throws Exception {
return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, shutdownOnCriticalIO)); return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, ioCriticalErrorListener));
} }
@Override @Override

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -34,7 +35,7 @@ public interface HAPolicy<T extends Activation> {
T createActivation(ActiveMQServerImpl server, T createActivation(ActiveMQServerImpl server,
boolean wasLive, boolean wasLive,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception; IOCriticalErrorListener shutdownOnCriticalIO) throws Exception;
boolean isSharedStore(); boolean isSharedStore();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation; import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
@ -37,7 +38,7 @@ public class LiveOnlyPolicy implements HAPolicy<Activation> {
public Activation createActivation(ActiveMQServerImpl server, public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive, boolean wasLive,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { IOCriticalErrorListener ioCriticalErrorListener) {
return new LiveOnlyActivation(server, this); return new LiveOnlyActivation(server, this);
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck; import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -211,8 +212,8 @@ public class ReplicaPolicy extends BackupPolicy {
public Activation createActivation(ActiveMQServerImpl server, public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive, boolean wasLive,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception { IOCriticalErrorListener ioCriticalErrorListener) throws Exception {
SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, shutdownOnCriticalIO, this, networkHealthCheck); SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, ioCriticalErrorListener, this, networkHealthCheck);
backupActivation.init(); backupActivation.init();
return backupActivation; return backupActivation;
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck; import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveActivation; import org.apache.activemq.artemis.core.server.impl.LiveActivation;
@ -231,7 +232,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public LiveActivation createActivation(ActiveMQServerImpl server, public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive, boolean wasLive,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { IOCriticalErrorListener ioCriticalErrorListener) {
return new SharedNothingLiveActivation(server, this); return new SharedNothingLiveActivation(server, this);
} }

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveActivation; import org.apache.activemq.artemis.core.server.impl.LiveActivation;
import org.apache.activemq.artemis.core.server.impl.SharedStoreLiveActivation; import org.apache.activemq.artemis.core.server.impl.SharedStoreLiveActivation;
@ -91,8 +92,8 @@ public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
public LiveActivation createActivation(ActiveMQServerImpl server, public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive, boolean wasLive,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { IOCriticalErrorListener ioCriticalErrorListener) {
return new SharedStoreLiveActivation(server, this); return new SharedStoreLiveActivation(server, this, ioCriticalErrorListener);
} }
@Override @Override

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map; import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedStoreBackupActivation; import org.apache.activemq.artemis.core.server.impl.SharedStoreBackupActivation;
@ -101,8 +102,8 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
public Activation createActivation(ActiveMQServerImpl server, public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive, boolean wasLive,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { IOCriticalErrorListener ioCriticalErrorListener) {
return new SharedStoreBackupActivation(server, this); return new SharedStoreBackupActivation(server, this, ioCriticalErrorListener);
} }
@Override @Override

View File

@ -329,7 +329,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final Map<String, Object> activationParams = new HashMap<>(); private final Map<String, Object> activationParams = new HashMap<>();
protected final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener(); protected final IOCriticalErrorListener ioCriticalErrorListener = new DefaultCriticalErrorListener();
private final ActiveMQServer parentServer; private final ActiveMQServer parentServer;
@ -522,7 +522,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence"); throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence");
} }
final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration(); final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO); manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory);
} else if (haType == null || haType == HAPolicyConfiguration.TYPE.LIVE_ONLY) { } else if (haType == null || haType == HAPolicyConfiguration.TYPE.LIVE_ONLY) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Detected no Shared Store HA options on JDBC store"); logger.debug("Detected no Shared Store HA options on JDBC store");
@ -610,7 +610,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean wasLive = !haPolicy.isBackup(); final boolean wasLive = !haPolicy.isBackup();
if (!haPolicy.isBackup()) { if (!haPolicy.isBackup()) {
activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener);
if (afterActivationCreated != null) { if (afterActivationCreated != null) {
try { try {
@ -636,9 +636,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// checking again here // checking again here
if (haPolicy.isBackup()) { if (haPolicy.isBackup()) {
if (haPolicy.isSharedStore()) { if (haPolicy.isSharedStore()) {
activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener);
} else { } else {
activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO); activation = haPolicy.createActivation(this, wasLive, activationParams, ioCriticalErrorListener);
} }
if (afterActivationCreated != null) { if (afterActivationCreated != null) {
@ -1117,12 +1117,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.stop(failoverOnServerShutdown, criticalIOError, restarting, false); this.stop(failoverOnServerShutdown, criticalIOError, restarting, false);
} }
private void stop(boolean failoverOnServerShutdown,
final boolean criticalIOError,
boolean restarting,
boolean isShutdown) {
stop(failoverOnServerShutdown, criticalIOError, isShutdown || criticalIOError, restarting, isShutdown);
}
/** /**
* Stops the server * Stops the server
* *
* @param criticalIOError whether we have encountered an IO error with the journal etc * @param criticalIOError whether we have encountered an IO error with the journal etc
*/ */
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting, boolean isShutdown) { private void stop(boolean failoverOnServerShutdown,
final boolean criticalIOError,
boolean shutdownExternalComponents,
boolean restarting,
boolean isShutdown) {
if (logger.isDebugEnabled()) { if (logger.isDebugEnabled()) {
logger.debug("Stopping server " + this); logger.debug("Stopping server " + this);
@ -1344,7 +1355,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
connectedClientIds.clear(); connectedClientIds.clear();
stopExternalComponents(isShutdown || criticalIOError); stopExternalComponents(shutdownExternalComponents);
try { try {
this.analyzer.clear(); this.analyzer.clear();
@ -2794,9 +2805,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected PagingStoreFactory getPagingStoreFactory() throws Exception { protected PagingStoreFactory getPagingStoreFactory() throws Exception {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration(); DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage()); return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, ioCriticalErrorListener, configuration.isReadWholePage());
} }
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage()); return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), ioCriticalErrorListener, configuration.isReadWholePage());
} }
/** /**
@ -2805,12 +2816,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected StorageManager createStorageManager() { protected StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) { if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO); JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, ioCriticalErrorListener);
this.getCriticalAnalyzer().add(journal); this.getCriticalAnalyzer().add(journal);
return journal; return journal;
} else { } else {
// Default to File Based Storage Manager, (Legacy default configuration). // Default to File Based Storage Manager, (Legacy default configuration).
JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO); JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener);
this.getCriticalAnalyzer().add(journal); this.getCriticalAnalyzer().add(journal);
return journal; return journal;
} }
@ -3136,7 +3147,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (configuration.getMaxDiskUsage() != -1) { if (configuration.getMaxDiskUsage() != -1) {
try { try {
injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, shutdownOnCriticalIO)); injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, ioCriticalErrorListener));
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e); ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e);
} }
@ -3929,14 +3940,15 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Inner classes // Inner classes
// -------------------------------------------------------------------------------- // --------------------------------------------------------------------------------
public final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener { public final class DefaultCriticalErrorListener implements IOCriticalErrorListener {
boolean failedAlready = false; private final AtomicBoolean failedAlready = new AtomicBoolean();
@Override @Override
public synchronized void onIOException(Throwable cause, String message, SequentialFile file) { public synchronized void onIOException(Throwable cause, String message, SequentialFile file) {
if (!failedAlready) { if (!failedAlready.compareAndSet(false, true)) {
failedAlready = true; return;
}
if (file == null) { if (file == null) {
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause); ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
@ -3947,7 +3959,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
stopTheServer(true); stopTheServer(true);
} }
} }
}
@Override @Override
public void addProtocolManagerFactory(ProtocolManagerFactory factory) { public void addProtocolManagerFactory(ProtocolManagerFactory factory) {

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
public abstract class FileBasedNodeManager extends NodeManager {
protected static final byte FIRST_TIME_START = '0';
public static final String SERVER_LOCK_NAME = "server.lock";
private static final String ACCESS_MODE = "rw";
private final File directory;
protected FileChannel channel;
public FileBasedNodeManager(boolean replicatedBackup, File directory) {
super(replicatedBackup);
this.directory = directory;
}
/**
* Ensures existence of persistent information about the server's nodeID.
* <p>
* Roughly the different use cases are:
* <ol>
* <li>old live server restarts: a server.lock file already exists and contains a nodeID.
* <li>new live server starting for the first time: no file exists, and we just *create* a new
* UUID to use as nodeID
* <li>replicated backup received its nodeID from its live: no file exists, we need to persist
* the *current* nodeID
* </ol>
*/
protected synchronized void setUpServerLockFile() throws IOException {
File serverLockFile = newFile(SERVER_LOCK_NAME);
boolean fileCreated = false;
int count = 0;
while (!serverLockFile.exists()) {
try {
fileCreated = serverLockFile.createNewFile();
} catch (RuntimeException e) {
ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
throw e;
} catch (IOException e) {
/*
* on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
* */
if (count < 5) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
}
count++;
continue;
}
ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
throw e;
}
}
@SuppressWarnings("resource")
RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE);
channel = raFile.getChannel();
if (fileCreated) {
ByteBuffer id = ByteBuffer.allocateDirect(3);
byte[] bytes = new byte[3];
bytes[0] = FIRST_TIME_START;
bytes[1] = FIRST_TIME_START;
bytes[2] = FIRST_TIME_START;
id.put(bytes, 0, 3);
id.position(0);
channel.write(id, 0);
channel.force(true);
}
createNodeId();
}
/**
* @return
*/
protected final File newFile(final String fileName) {
File file = new File(directory, fileName);
return file;
}
protected final synchronized void createNodeId() throws IOException {
synchronized (nodeIDGuard) {
ByteBuffer id = ByteBuffer.allocateDirect(16);
int read = channel.read(id, 3);
if (replicatedBackup) {
id.position(0);
id.put(getUUID().asBytes(), 0, 16);
id.position(0);
channel.write(id, 3);
channel.force(true);
} else if (read != 16) {
setUUID(UUIDGenerator.getInstance().generateUUID());
id.put(getUUID().asBytes(), 0, 16);
id.position(0);
channel.write(id, 3);
channel.force(true);
} else {
byte[] bytes = new byte[16];
id.position(0);
id.get(bytes);
setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
}
}
}
@Override
public synchronized void stop() throws Exception {
FileChannel channelCopy = channel;
if (channelCopy != null)
channelCopy.close();
super.stop();
}
@Override
public void stopBackup() throws NodeManagerException {
if (replicatedBackup && getNodeId() != null) {
try {
setUpServerLockFile();
} catch (IOException e) {
throw new NodeManagerException(e);
}
}
super.stopBackup();
}
}

View File

@ -22,23 +22,18 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException; import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUID;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public class FileLockNodeManager extends NodeManager { public class FileLockNodeManager extends FileBasedNodeManager {
private static final Logger logger = Logger.getLogger(FileLockNodeManager.class); private static final Logger logger = Logger.getLogger(FileLockNodeManager.class);
@ -58,9 +53,9 @@ public class FileLockNodeManager extends NodeManager {
private static final byte NOT_STARTED = 'N'; private static final byte NOT_STARTED = 'N';
private static final long LOCK_ACCESS_FAILURE_WAIT_TIME = 2000; private static final long LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2);
private static final int LOCK_MONITOR_TIMEOUT_MILLIES = 2000; private static final long LOCK_MONITOR_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(2);
private volatile FileLock liveLock; private volatile FileLock liveLock;
@ -68,28 +63,31 @@ public class FileLockNodeManager extends NodeManager {
private final FileChannel[] lockChannels = new FileChannel[3]; private final FileChannel[] lockChannels = new FileChannel[3];
protected long lockAcquisitionTimeout = -1; private final long lockAcquisitionTimeoutNanos;
protected boolean interrupted = false; protected boolean interrupted = false;
private ScheduledExecutorService scheduledPool; private final ScheduledExecutorService scheduledPool;
public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) { public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory); super(replicatedBackup, directory);
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
this.lockAcquisitionTimeoutNanos = -1;
} }
public FileLockNodeManager(final File directory, boolean replicatedBackup) { public FileLockNodeManager(final File directory, boolean replicatedBackup) {
super(replicatedBackup, directory); super(replicatedBackup, directory);
this.scheduledPool = null; this.scheduledPool = null;
this.lockAcquisitionTimeoutNanos = -1;
} }
public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout, public FileLockNodeManager(final File directory,
boolean replicatedBackup,
long lockAcquisitionTimeout,
ScheduledExecutorService scheduledPool) { ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory); super(replicatedBackup, directory);
this.scheduledPool = scheduledPool; this.scheduledPool = scheduledPool;
this.lockAcquisitionTimeout = lockAcquisitionTimeout; this.lockAcquisitionTimeoutNanos = lockAcquisitionTimeout == -1 ? -1 : TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeout);
} }
@Override @Override
@ -141,12 +139,13 @@ public class FileLockNodeManager extends NodeManager {
} }
@Override @Override
public boolean isAwaitingFailback() throws Exception { public boolean isAwaitingFailback() throws NodeManagerException {
return getState() == FileLockNodeManager.FAILINGBACK; return getState() == FileLockNodeManager.FAILINGBACK;
} }
@Override @Override
public boolean isBackupLive() throws Exception { public boolean isBackupLive() throws NodeManagerException {
try {
FileLock liveAttemptLock; FileLock liveAttemptLock;
liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS); liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS);
if (liveAttemptLock == null) { if (liveAttemptLock == null) {
@ -155,6 +154,9 @@ public class FileLockNodeManager extends NodeManager {
liveAttemptLock.release(); liveAttemptLock.release();
return false; return false;
} }
} catch (IOException e) {
throw new NodeManagerException(e);
}
} }
public boolean isLiveLocked() { public boolean isLiveLocked() {
@ -167,15 +169,20 @@ public class FileLockNodeManager extends NodeManager {
} }
@Override @Override
public final void releaseBackup() throws Exception { public final void releaseBackup() throws NodeManagerException {
try {
if (backupLock != null) { if (backupLock != null) {
backupLock.release(); backupLock.release();
backupLock = null; backupLock = null;
} }
} catch (IOException e) {
throw new NodeManagerException(e);
}
} }
@Override @Override
public void awaitLiveNode() throws Exception { public void awaitLiveNode() throws NodeManagerException, InterruptedException {
try {
logger.debug("awaiting live node..."); logger.debug("awaiting live node...");
do { do {
byte state = getState(); byte state = getState();
@ -205,24 +212,31 @@ public class FileLockNodeManager extends NodeManager {
} }
} }
while (true); while (true);
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
} }
@Override @Override
public void startBackup() throws Exception { public void startBackup() throws NodeManagerException {
assert !replicatedBackup; // should not be called if this is a replicating backup assert !replicatedBackup; // should not be called if this is a replicating backup
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup(); ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
try {
backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS); backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS);
} catch (ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
ActiveMQServerLogger.LOGGER.gotBackupLock(); ActiveMQServerLogger.LOGGER.gotBackupLock();
if (getUUID() == null) if (getUUID() == null)
readNodeId(); readNodeId();
} }
@Override @Override
public ActivateCallback startLiveNode() throws Exception { public ActivateCallback startLiveNode() throws NodeManagerException {
try {
setFailingBack(); setFailingBack();
String timeoutMessage = lockAcquisitionTimeout == -1 ? "indefinitely" : lockAcquisitionTimeout + " milliseconds"; String timeoutMessage = lockAcquisitionTimeoutNanos == -1 ? "indefinitely" : TimeUnit.NANOSECONDS.toMillis(lockAcquisitionTimeoutNanos) + " milliseconds";
ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage); ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
@ -238,59 +252,74 @@ public class FileLockNodeManager extends NodeManager {
startLockMonitoring(); startLockMonitoring();
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
// that allows to restart/stop the broker if needed
throw e;
} }
} }
}; };
} catch (ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
} }
@Override @Override
public void pauseLiveServer() throws Exception { public void pauseLiveServer() throws NodeManagerException {
stopLockMonitoring(); stopLockMonitoring();
setPaused(); setPaused();
try {
if (liveLock != null) { if (liveLock != null) {
liveLock.release(); liveLock.release();
} }
} catch (IOException e) {
throw new NodeManagerException(e);
}
} }
@Override @Override
public void crashLiveServer() throws Exception { public void crashLiveServer() throws NodeManagerException {
stopLockMonitoring(); stopLockMonitoring();
if (liveLock != null) { if (liveLock != null) {
try {
liveLock.release(); liveLock.release();
} catch (IOException e) {
throw new NodeManagerException(e);
} finally {
liveLock = null; liveLock = null;
} }
} }
}
@Override @Override
public void awaitLiveStatus() throws Exception { public void awaitLiveStatus() throws NodeManagerException, InterruptedException {
while (getState() != LIVE) { while (getState() != LIVE) {
Thread.sleep(2000); Thread.sleep(2000);
} }
} }
private void setLive() throws Exception { private void setLive() throws NodeManagerException {
writeFileLockStatus(FileLockNodeManager.LIVE); writeFileLockStatus(FileLockNodeManager.LIVE);
} }
private void setFailingBack() throws Exception { private void setFailingBack() throws NodeManagerException {
writeFileLockStatus(FAILINGBACK); writeFileLockStatus(FAILINGBACK);
} }
private void setPaused() throws Exception { private void setPaused() throws NodeManagerException {
writeFileLockStatus(PAUSED); writeFileLockStatus(PAUSED);
} }
/** /**
* @param status * @param status
* @throws IOException * @throws ActiveMQLockAcquisitionTimeoutException,IOException
*/ */
private void writeFileLockStatus(byte status) throws Exception { private void writeFileLockStatus(byte status) throws NodeManagerException {
if (replicatedBackup && channel == null) if (replicatedBackup && channel == null)
return; return;
logger.debug("writing status: " + status); logger.debug("writing status: " + status);
ByteBuffer bb = ByteBuffer.allocateDirect(1); ByteBuffer bb = ByteBuffer.allocateDirect(1);
bb.put(status); bb.put(status);
bb.position(0); bb.position(0);
try {
if (!channel.isOpen()) { if (!channel.isOpen()) {
setUpServerLockFile(); setUpServerLockFile();
} }
@ -304,9 +333,13 @@ public class FileLockNodeManager extends NodeManager {
lock.release(); lock.release();
} }
} }
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
} }
private byte getState() throws Exception { private byte getState() throws NodeManagerException {
try {
byte result; byte result;
logger.debug("getting state..."); logger.debug("getting state...");
ByteBuffer bb = ByteBuffer.allocateDirect(1); ByteBuffer bb = ByteBuffer.allocateDirect(1);
@ -325,24 +358,29 @@ public class FileLockNodeManager extends NodeManager {
lock.release(); lock.release();
} }
} }
logger.debug("state: " + result); logger.debug("state: " + result);
return result; return result;
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
} }
@Override @Override
public final SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException { public final SimpleString readNodeId() throws NodeManagerException {
try {
ByteBuffer id = ByteBuffer.allocateDirect(16); ByteBuffer id = ByteBuffer.allocateDirect(16);
int read = channel.read(id, 3); int read = channel.read(id, 3);
if (read != 16) { if (read != 16) {
throw new ActiveMQIllegalStateException("live server did not write id to file"); throw new IOException("live server did not write id to file");
} }
byte[] bytes = new byte[16]; byte[] bytes = new byte[16];
id.position(0); id.position(0);
id.get(bytes); id.get(bytes);
setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes)); setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
return getNodeId(); return getNodeId();
} catch (IOException e) {
throw new NodeManagerException(e);
}
} }
protected FileLock tryLock(final int lockPos) throws IOException { protected FileLock tryLock(final int lockPos) throws IOException {
@ -361,8 +399,8 @@ public class FileLockNodeManager extends NodeManager {
} }
} }
protected FileLock lock(final int lockPosition) throws Exception { protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTimeoutException {
long start = System.currentTimeMillis(); long start = System.nanoTime();
boolean isRecurringFailure = false; boolean isRecurringFailure = false;
while (!interrupted) { while (!interrupted) {
@ -377,7 +415,7 @@ public class FileLockNodeManager extends NodeManager {
return null; return null;
} }
if (lockAcquisitionTimeout != -1 && (System.currentTimeMillis() - start) > lockAcquisitionTimeout) { if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock"); throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
} }
} else { } else {
@ -390,9 +428,9 @@ public class FileLockNodeManager extends NodeManager {
"Failure when accessing a lock file", e); "Failure when accessing a lock file", e);
isRecurringFailure = true; isRecurringFailure = true;
long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME; long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS;
if (lockAcquisitionTimeout != -1) { if (this.lockAcquisitionTimeoutNanos != -1) {
final long remainingTime = lockAcquisitionTimeout - (System.currentTimeMillis() - start); final long remainingTime = this.lockAcquisitionTimeoutNanos - (System.nanoTime() - start);
if (remainingTime <= 0) { if (remainingTime <= 0) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock"); throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
} }
@ -400,7 +438,7 @@ public class FileLockNodeManager extends NodeManager {
} }
try { try {
Thread.sleep(waitTime); TimeUnit.NANOSECONDS.sleep(waitTime);
} catch (InterruptedException interrupt) { } catch (InterruptedException interrupt) {
return null; return null;
} }
@ -414,7 +452,7 @@ public class FileLockNodeManager extends NodeManager {
private synchronized void startLockMonitoring() { private synchronized void startLockMonitoring() {
logger.debug("Starting the lock monitor"); logger.debug("Starting the lock monitor");
if (monitorLock == null) { if (monitorLock == null) {
monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_MILLIES, LOCK_MONITOR_TIMEOUT_MILLIES, TimeUnit.MILLISECONDS, false); monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_NANOS, LOCK_MONITOR_TIMEOUT_NANOS, TimeUnit.NANOSECONDS, false);
monitorLock.start(); monitorLock.start();
} else { } else {
logger.debug("Lock monitor was already started"); logger.debug("Lock monitor was already started");
@ -431,49 +469,22 @@ public class FileLockNodeManager extends NodeManager {
} }
} }
private void notifyLostLock() { @Override
// Additional check we are not initializing or have no locking object anymore protected synchronized void notifyLostLock() {
// because of a shutdown if (liveLock != null) {
if (lockListeners != null && liveLock != null) { super.notifyLostLock();
Set<LockListener> lockListenersSnapshot = null;
// Snapshot of the set because I'm not sure if we can trigger concurrent
// modification exception here if we don't
synchronized (lockListeners) {
lockListenersSnapshot = new HashSet<>(lockListeners);
}
lockListenersSnapshot.forEach(lockListener -> {
try {
lockListener.lostLock();
} catch (Exception e) {
// Need to notify everyone so ignore any exception
}
});
} }
} }
public void registerLockListener(LockListener lockListener) { // This has been introduced to help ByteMan test testLockMonitorInvalid on JDK 11: sun.nio.ch.FileLockImpl::isValid
lockListeners.add(lockListener); // can affecting setLive, causing an infinite loop due to java.nio.channels.OverlappingFileLockException on tryLock
private boolean isLiveLockLost() {
final FileLock lock = this.liveLock;
return (lock != null && !lock.isValid()) || lock == null;
} }
public void unregisterLockListener(LockListener lockListener) {
lockListeners.remove(lockListener);
}
protected final Set<LockListener> lockListeners = Collections.synchronizedSet(new HashSet<LockListener>());
private MonitorLock monitorLock; private MonitorLock monitorLock;
public abstract class LockListener {
protected abstract void lostLock() throws Exception;
protected void unregisterListener() {
lockListeners.remove(this);
}
}
public class MonitorLock extends ActiveMQScheduledComponent { public class MonitorLock extends ActiveMQScheduledComponent {
public MonitorLock(ScheduledExecutorService scheduledExecutorService, public MonitorLock(ScheduledExecutorService scheduledExecutorService,
long initialDelay, long initialDelay,
@ -492,7 +503,7 @@ public class FileLockNodeManager extends NodeManager {
if (liveLock == null) { if (liveLock == null) {
logger.debug("Livelock is null"); logger.debug("Livelock is null");
} }
lostLock = (liveLock != null && !liveLock.isValid()) || liveLock == null; lostLock = isLiveLockLost();
if (!lostLock) { if (!lostLock) {
logger.debug("Server still has the lock, double check status is live"); logger.debug("Server still has the lock, double check status is live");
// Java always thinks the lock is still valid even when there is no filesystem // Java always thinks the lock is still valid even when there is no filesystem

View File

@ -17,14 +17,10 @@
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.concurrent.Semaphore; import java.util.concurrent.Semaphore;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State.FAILING_BACK; import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State.FAILING_BACK;
@ -39,7 +35,7 @@ import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State
* multiple servers are run inside the same VM and File Locks can not be shared in the * multiple servers are run inside the same VM and File Locks can not be shared in the
* same VM (it would cause a shared lock violation). * same VM (it would cause a shared lock violation).
*/ */
public final class InVMNodeManager extends NodeManager { public final class InVMNodeManager extends FileBasedNodeManager {
private final Semaphore liveLock; private final Semaphore liveLock;
@ -67,7 +63,7 @@ public final class InVMNodeManager extends NodeManager {
} }
@Override @Override
public void awaitLiveNode() throws Exception { public void awaitLiveNode() throws InterruptedException {
do { do {
while (state == NOT_STARTED) { while (state == NOT_STARTED) {
Thread.sleep(10); Thread.sleep(10);
@ -92,51 +88,47 @@ public final class InVMNodeManager extends NodeManager {
} }
@Override @Override
public void awaitLiveStatus() throws Exception { public void awaitLiveStatus() throws InterruptedException {
while (state != LIVE) { while (state != LIVE) {
Thread.sleep(10); Thread.sleep(10);
} }
} }
@Override @Override
public void startBackup() throws Exception { public void startBackup() throws InterruptedException {
backupLock.acquire(); backupLock.acquire();
} }
@Override @Override
public ActivateCallback startLiveNode() throws Exception { public ActivateCallback startLiveNode() throws InterruptedException {
state = FAILING_BACK; state = FAILING_BACK;
liveLock.acquire(); liveLock.acquire();
return new CleaningActivateCallback() { return new CleaningActivateCallback() {
@Override @Override
public void activationComplete() { public void activationComplete() {
try {
state = LIVE; state = LIVE;
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
} }
}; };
} }
@Override @Override
public void pauseLiveServer() throws Exception { public void pauseLiveServer() {
state = PAUSED; state = PAUSED;
liveLock.release(); liveLock.release();
} }
@Override @Override
public void crashLiveServer() throws Exception { public void crashLiveServer() {
liveLock.release(); liveLock.release();
} }
@Override @Override
public boolean isAwaitingFailback() throws Exception { public boolean isAwaitingFailback() {
return state == FAILING_BACK; return state == FAILING_BACK;
} }
@Override @Override
public boolean isBackupLive() throws Exception { public boolean isBackupLive() {
return liveLock.availablePermits() == 0; return liveLock.availablePermits() == 0;
} }
@ -151,7 +143,7 @@ public final class InVMNodeManager extends NodeManager {
} }
@Override @Override
public SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException { public SimpleString readNodeId() {
return getNodeId(); return getNodeId();
} }
} }

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
@ -67,7 +68,7 @@ public final class SharedNothingBackupActivation extends Activation {
private SharedNothingBackupQuorum backupQuorum; private SharedNothingBackupQuorum backupQuorum;
private final boolean attemptFailBack; private final boolean attemptFailBack;
private final Map<String, Object> activationParams; private final Map<String, Object> activationParams;
private final ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO; private final IOCriticalErrorListener ioCriticalErrorListener;
private String nodeID; private String nodeID;
ClusterControl clusterControl; ClusterControl clusterControl;
private boolean closed; private boolean closed;
@ -79,13 +80,13 @@ public final class SharedNothingBackupActivation extends Activation {
public SharedNothingBackupActivation(ActiveMQServerImpl activeMQServer, public SharedNothingBackupActivation(ActiveMQServerImpl activeMQServer,
boolean attemptFailBack, boolean attemptFailBack,
Map<String, Object> activationParams, Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO, IOCriticalErrorListener ioCriticalErrorListener,
ReplicaPolicy replicaPolicy, ReplicaPolicy replicaPolicy,
NetworkHealthCheck networkHealthCheck) { NetworkHealthCheck networkHealthCheck) {
this.activeMQServer = activeMQServer; this.activeMQServer = activeMQServer;
this.attemptFailBack = attemptFailBack; this.attemptFailBack = attemptFailBack;
this.activationParams = activationParams; this.activationParams = activationParams;
this.shutdownOnCriticalIO = shutdownOnCriticalIO; this.ioCriticalErrorListener = ioCriticalErrorListener;
this.replicaPolicy = replicaPolicy; this.replicaPolicy = replicaPolicy;
backupSyncLatch.setCount(1); backupSyncLatch.setCount(1);
this.networkHealthCheck = networkHealthCheck; this.networkHealthCheck = networkHealthCheck;
@ -95,7 +96,7 @@ public final class SharedNothingBackupActivation extends Activation {
assert replicationEndpoint == null; assert replicationEndpoint == null;
activeMQServer.resetNodeManager(); activeMQServer.resetNodeManager();
backupUpToDate = false; backupUpToDate = false;
replicationEndpoint = new ReplicationEndpoint(activeMQServer, shutdownOnCriticalIO, attemptFailBack, this); replicationEndpoint = new ReplicationEndpoint(activeMQServer, ioCriticalErrorListener, attemptFailBack, this);
} }
@Override @Override

View File

@ -317,7 +317,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
SimpleString nodeId0; SimpleString nodeId0;
try { try {
nodeId0 = activeMQServer.getNodeManager().readNodeId(); nodeId0 = activeMQServer.getNodeManager().readNodeId();
} catch (ActiveMQIllegalStateException e) { } catch (NodeManager.NodeManagerException e) {
nodeId0 = null; nodeId0 = null;
} }

View File

@ -16,18 +16,24 @@
*/ */
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration; import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException;
import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
@ -40,17 +46,24 @@ public final class SharedStoreBackupActivation extends Activation {
private static final Logger logger = Logger.getLogger(SharedStoreBackupActivation.class); private static final Logger logger = Logger.getLogger(SharedStoreBackupActivation.class);
//this is how we act as a backup //this is how we act as a backup
private SharedStoreSlavePolicy sharedStoreSlavePolicy; private final SharedStoreSlavePolicy sharedStoreSlavePolicy;
private ActiveMQServerImpl activeMQServer; private final ActiveMQServerImpl activeMQServer;
private final Object failbackCheckerGuard = new Object(); private final Object failbackCheckerGuard = new Object();
private boolean cancelFailBackChecker; private boolean cancelFailBackChecker;
public SharedStoreBackupActivation(ActiveMQServerImpl server, SharedStoreSlavePolicy sharedStoreSlavePolicy) { private LockListener activeLockListener;
private final IOCriticalErrorListener ioCriticalErrorListener;
public SharedStoreBackupActivation(ActiveMQServerImpl server,
SharedStoreSlavePolicy sharedStoreSlavePolicy,
IOCriticalErrorListener ioCriticalErrorListener) {
this.activeMQServer = server; this.activeMQServer = server;
this.sharedStoreSlavePolicy = sharedStoreSlavePolicy; this.sharedStoreSlavePolicy = sharedStoreSlavePolicy;
this.ioCriticalErrorListener = ioCriticalErrorListener;
synchronized (failbackCheckerGuard) { synchronized (failbackCheckerGuard) {
cancelFailBackChecker = false; cancelFailBackChecker = false;
} }
@ -59,6 +72,8 @@ public final class SharedStoreBackupActivation extends Activation {
@Override @Override
public void run() { public void run() {
try { try {
registerActiveLockListener(activeMQServer.getNodeManager());
activeMQServer.getNodeManager().startBackup(); activeMQServer.getNodeManager().startBackup();
ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy(); ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
@ -92,6 +107,11 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.completeActivation(false); activeMQServer.completeActivation(false);
if (scalingDown) { if (scalingDown) {
if (!restarting.compareAndSet(false, true)) {
return;
}
unregisterActiveLockListener(activeMQServer.getNodeManager());
ActiveMQServerLogger.LOGGER.backupServerScaledDown(); ActiveMQServerLogger.LOGGER.backupServerScaledDown();
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
@Override @Override
@ -117,6 +137,17 @@ public final class SharedStoreBackupActivation extends Activation {
if (sharedStoreSlavePolicy.isAllowAutoFailBack() && ActiveMQServerImpl.SERVER_STATE.STOPPING != activeMQServer.getState() && ActiveMQServerImpl.SERVER_STATE.STOPPED != activeMQServer.getState()) { if (sharedStoreSlavePolicy.isAllowAutoFailBack() && ActiveMQServerImpl.SERVER_STATE.STOPPING != activeMQServer.getState() && ActiveMQServerImpl.SERVER_STATE.STOPPED != activeMQServer.getState()) {
startFailbackChecker(); startFailbackChecker();
} }
} catch (NodeManagerException nodeManagerException) {
if (nodeManagerException.getCause() instanceof ClosedChannelException) {
// this is ok, we are being stopped
return;
}
if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) {
ActiveMQServerLogger.LOGGER.initializationError(nodeManagerException.getCause());
return;
}
unregisterActiveLockListener(activeMQServer.getNodeManager());
ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null);
} catch (ClosedChannelException | InterruptedException e) { } catch (ClosedChannelException | InterruptedException e) {
// these are ok, we are being stopped // these are ok, we are being stopped
} catch (Exception e) { } catch (Exception e) {
@ -144,16 +175,25 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.interruptActivationThread(nodeManagerInUse); activeMQServer.interruptActivationThread(nodeManagerInUse);
if (nodeManagerInUse != null) { if (nodeManagerInUse != null) {
unregisterActiveLockListener(nodeManagerInUse);
nodeManagerInUse.stopBackup(); nodeManagerInUse.stopBackup();
} }
} else { } else {
if (nodeManagerInUse != null) { if (nodeManagerInUse != null) {
unregisterActiveLockListener(nodeManagerInUse);
// if we are now live, behave as live // if we are now live, behave as live
// We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
// started before the live // started before the live
if (sharedStoreSlavePolicy.isFailoverOnServerShutdown() || permanently) { if (sharedStoreSlavePolicy.isFailoverOnServerShutdown() || permanently) {
try {
nodeManagerInUse.crashLiveServer(); nodeManagerInUse.crashLiveServer();
} catch (Throwable t) {
if (!permanently) {
throw t;
}
logger.warn("Errored while closing activation: can be ignored because of permanent close", t);
}
} else { } else {
nodeManagerInUse.pauseLiveServer(); nodeManagerInUse.pauseLiveServer();
} }
@ -161,6 +201,27 @@ public final class SharedStoreBackupActivation extends Activation {
} }
} }
private void registerActiveLockListener(NodeManager nodeManager) {
LockListener lockListener = () -> {
if (!restarting.compareAndSet(false, true)) {
logger.warn("Restarting already happening on lost lock");
return;
}
unregisterActiveLockListener(nodeManager);
ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null);
};
activeLockListener = lockListener;
nodeManager.registerLockListener(lockListener);
}
private void unregisterActiveLockListener(NodeManager nodeManager) {
LockListener activeLockListener = this.activeLockListener;
if (activeLockListener != null) {
nodeManager.unregisterLockListener(activeLockListener);
this.activeLockListener = null;
}
}
@Override @Override
public JournalLoader createJournalLoader(PostOffice postOffice, public JournalLoader createJournalLoader(PostOffice postOffice,
PagingManager pagingManager, PagingManager pagingManager,
@ -178,6 +239,8 @@ public final class SharedStoreBackupActivation extends Activation {
} }
} }
private final AtomicBoolean restarting = new AtomicBoolean(false);
/** /**
* To be called by backup trying to fail back the server * To be called by backup trying to fail back the server
*/ */
@ -195,15 +258,14 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.getClusterManager().getDefaultConnection(null).addClusterTopologyListener(backupListener); activeMQServer.getClusterManager().getDefaultConnection(null).addClusterTopologyListener(backupListener);
} }
private boolean restarting = false;
@Override @Override
public void run() { public void run() {
try { try {
if (!restarting && activeMQServer.getNodeManager().isAwaitingFailback()) { if (!restarting.get() && activeMQServer.getNodeManager().isAwaitingFailback() && backupListener.waitForBackup()) {
if (backupListener.waitForBackup()) { if (!restarting.compareAndSet(false, true)) {
return;
}
ActiveMQServerLogger.LOGGER.awaitFailBack(); ActiveMQServerLogger.LOGGER.awaitFailBack();
restarting = true;
Thread t = new Thread(new Runnable() { Thread t = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
@ -228,6 +290,11 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.setHAPolicy(sharedStoreSlavePolicy); activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
logger.debug(activeMQServer + "::Starting backup node now after failback"); logger.debug(activeMQServer + "::Starting backup node now after failback");
activeMQServer.start(); activeMQServer.start();
LockListener lockListener = activeLockListener;
if (lockListener != null) {
activeMQServer.getNodeManager().registerLockListener(lockListener);
}
} }
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.serverRestartWarning(e); ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
@ -236,7 +303,6 @@ public final class SharedStoreBackupActivation extends Activation {
}); });
t.start(); t.start();
} }
}
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.serverRestartWarning(e); ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
} }

View File

@ -16,11 +16,17 @@
*/ */
package org.apache.activemq.artemis.core.server.impl; package org.apache.activemq.artemis.core.server.impl;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
public final class SharedStoreLiveActivation extends LiveActivation { public final class SharedStoreLiveActivation extends LiveActivation {
@ -28,17 +34,22 @@ public final class SharedStoreLiveActivation extends LiveActivation {
private static final Logger logger = Logger.getLogger(SharedStoreLiveActivation.class); private static final Logger logger = Logger.getLogger(SharedStoreLiveActivation.class);
// this is how we act when we initially start as live // this is how we act when we initially start as live
private SharedStoreMasterPolicy sharedStoreMasterPolicy; private final SharedStoreMasterPolicy sharedStoreMasterPolicy;
private ActiveMQServerImpl activeMQServer; private final ActiveMQServerImpl activeMQServer;
private volatile FileLockNodeManager.LockListener activeLockListener; private volatile LockListener activeLockListener;
private volatile ActivateCallback nodeManagerActivateCallback; private volatile ActivateCallback nodeManagerActivateCallback;
public SharedStoreLiveActivation(ActiveMQServerImpl server, SharedStoreMasterPolicy sharedStoreMasterPolicy) { private final IOCriticalErrorListener ioCriticalErrorListener;
public SharedStoreLiveActivation(ActiveMQServerImpl server,
SharedStoreMasterPolicy sharedStoreMasterPolicy,
IOCriticalErrorListener ioCriticalErrorListener) {
this.activeMQServer = server; this.activeMQServer = server;
this.sharedStoreMasterPolicy = sharedStoreMasterPolicy; this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
this.ioCriticalErrorListener = ioCriticalErrorListener;
} }
@Override @Override
@ -71,9 +82,9 @@ public final class SharedStoreLiveActivation extends LiveActivation {
activeMQServer.getBackupManager().announceBackup(); activeMQServer.getBackupManager().announceBackup();
} }
registerActiveLockListener(activeMQServer.getNodeManager());
nodeManagerActivateCallback = activeMQServer.getNodeManager().startLiveNode(); nodeManagerActivateCallback = activeMQServer.getNodeManager().startLiveNode();
activeMQServer.registerActivateCallback(nodeManagerActivateCallback); activeMQServer.registerActivateCallback(nodeManagerActivateCallback);
addLockListener(activeMQServer, activeMQServer.getNodeManager());
if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED
|| activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) { || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
@ -85,59 +96,40 @@ public final class SharedStoreLiveActivation extends LiveActivation {
activeMQServer.completeActivation(false); activeMQServer.completeActivation(false);
ActiveMQServerLogger.LOGGER.serverIsLive(); ActiveMQServerLogger.LOGGER.serverIsLive();
} catch (NodeManagerException nodeManagerException) {
if (nodeManagerException.getCause() instanceof ClosedChannelException) {
// this is ok, we are being stopped
return;
}
if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) {
onActivationFailure((ActiveMQLockAcquisitionTimeoutException) nodeManagerException.getCause());
return;
}
unregisterActiveLockListener(activeMQServer.getNodeManager());
ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null);
} catch (Exception e) { } catch (Exception e) {
onActivationFailure(e);
}
}
private void onActivationFailure(Exception e) {
unregisterActiveLockListener(activeMQServer.getNodeManager());
ActiveMQServerLogger.LOGGER.initializationError(e); ActiveMQServerLogger.LOGGER.initializationError(e);
activeMQServer.callActivationFailureListeners(e); activeMQServer.callActivationFailureListeners(e);
} }
private void registerActiveLockListener(NodeManager nodeManager) {
LockListener lockListener = () ->
ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null);
activeLockListener = lockListener;
nodeManager.registerLockListener(lockListener);
} }
private void addLockListener(ActiveMQServerImpl activeMQServer, NodeManager nodeManager) { private void unregisterActiveLockListener(NodeManager nodeManager) {
if (nodeManager instanceof FileLockNodeManager) { LockListener activeLockListener = this.activeLockListener;
FileLockNodeManager fileNodeManager = (FileLockNodeManager) nodeManager; if (activeLockListener != null) {
nodeManager.unregisterLockListener(activeLockListener);
activeLockListener = fileNodeManager.new LockListener() { this.activeLockListener = null;
@Override
public void lostLock() {
stopStartServerInSeperateThread(activeMQServer);
}
};
fileNodeManager.registerLockListener(activeLockListener);
} // else no business registering a listener
}
/**
* We need to do this in a new thread because this takes to long to finish in
* the scheduled thread Also this is not the responsibility of the scheduled
* thread
* @param activeMQServer
*/
private void stopStartServerInSeperateThread(ActiveMQServerImpl activeMQServer) {
try {
Runnable startServerRunnable = new Runnable() {
@Override
public void run() {
try {
activeMQServer.stop(true, false);
} catch (Exception e) {
logger.warn("Failed to stop artemis server after loosing the lock", e);
}
try {
activeMQServer.start();
} catch (Exception e) {
logger.error("Failed to start artemis server after recovering from loosing the lock", e);
}
}
};
Thread startServer = new Thread(startServerRunnable);
startServer.start();
} catch (Exception e) {
logger.error(e.getMessage());
} }
} }
@ -147,16 +139,20 @@ public final class SharedStoreLiveActivation extends LiveActivation {
NodeManager nodeManagerInUse = activeMQServer.getNodeManager(); NodeManager nodeManagerInUse = activeMQServer.getNodeManager();
if (nodeManagerInUse != null) { if (nodeManagerInUse != null) {
LockListener closeLockListener = activeLockListener; unregisterActiveLockListener(nodeManagerInUse);
if (closeLockListener != null) {
closeLockListener.unregisterListener();
}
ActivateCallback activateCallback = nodeManagerActivateCallback; ActivateCallback activateCallback = nodeManagerActivateCallback;
if (activateCallback != null) { if (activateCallback != null) {
activeMQServer.unregisterActivateCallback(activateCallback); activeMQServer.unregisterActivateCallback(activateCallback);
} }
if (sharedStoreMasterPolicy.isFailoverOnServerShutdown() || permanently) { if (sharedStoreMasterPolicy.isFailoverOnServerShutdown() || permanently) {
try {
nodeManagerInUse.crashLiveServer(); nodeManagerInUse.crashLiveServer();
} catch (Throwable t) {
if (!permanently) {
throw t;
}
logger.warn("Errored while closing activation: can be ignored because of permanent close", t);
}
} else { } else {
nodeManagerInUse.pauseLiveServer(); nodeManagerInUse.pauseLiveServer();
} }

View File

@ -20,13 +20,13 @@ package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
/** /**
* Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}. * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, LockListener)}.
*/ */
final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock { final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock {
@ -36,14 +36,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
private final LeaseLock lock; private final LeaseLock lock;
private long lastLockRenewStart; private long lastLockRenewStart;
private final long renewPeriodMillis; private final long renewPeriodMillis;
private final IOCriticalErrorListener ioCriticalErrorListener; private final LockListener lockListener;
ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService, ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor, ArtemisExecutor executor,
String lockName, String lockName,
LeaseLock lock, LeaseLock lock,
long renewPeriodMillis, long renewPeriodMillis,
IOCriticalErrorListener ioCriticalErrorListener) { LockListener lockListener) {
super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false); super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false);
if (renewPeriodMillis >= lock.expirationMillis()) { if (renewPeriodMillis >= lock.expirationMillis()) {
throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis"); throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis");
@ -51,9 +51,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
this.lockName = lockName; this.lockName = lockName;
this.lock = lock; this.lock = lock;
this.renewPeriodMillis = renewPeriodMillis; this.renewPeriodMillis = renewPeriodMillis;
//already expired start time // already expired start time
this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis()); this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis());
this.ioCriticalErrorListener = ioCriticalErrorListener; this.lockListener = lockListener;
}
@Override
public String lockName() {
return lockName;
} }
@Override @Override
@ -84,37 +89,55 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
} }
@Override @Override
public void run() { public synchronized void run() {
if (!isStarted()) {
return;
}
final long lastRenewStart = this.lastLockRenewStart; final long lastRenewStart = this.lastLockRenewStart;
final long renewStart = System.nanoTime(); final long renewStart = System.nanoTime();
boolean lockLost = true;
try { try {
if (!this.lock.renew()) { lockLost = !this.lock.renew();
ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
}
} catch (Throwable t) { } catch (Throwable t) {
ioCriticalErrorListener.onIOException(t, "Critical error while on " + lockName + " renew", null); LOGGER.warnf(t, "%s lock renew has failed", lockName);
throw t; if (lock.localExpirationTime() > 0) {
final long millisBeforeExpiration = (lock.localExpirationTime() - System.currentTimeMillis());
// there is enough time to retry to renew it?
if (millisBeforeExpiration >= this.renewPeriodMillis) {
lockLost = false;
}
}
}
// a failed attempt to renew is treated as a lost lock
if (lockLost) {
try {
lockListener.lostLock();
} catch (Throwable t) {
LOGGER.warnf(t, "Errored while notifying %s lock listener", lockName);
}
} }
//logic to detect slowness of DB and/or the scheduled executor service //logic to detect slowness of DB and/or the scheduled executor service
detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis()); detectAndReportRenewSlowness(lockName, lockLost, lastRenewStart,
renewStart, renewPeriodMillis, lock.expirationMillis());
this.lastLockRenewStart = renewStart; this.lastLockRenewStart = renewStart;
} }
private static void detectAndReportRenewSlowness(String lockName, private static void detectAndReportRenewSlowness(String lockName,
boolean lostLock,
long lastRenewStart, long lastRenewStart,
long renewStart, long renewStart,
long expectedRenewPeriodMillis, long expectedRenewPeriodMillis,
long expirationMillis) { long expirationMillis) {
final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart); final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart);
if (elapsedMillisToRenew > expectedRenewPeriodMillis) { if (elapsedMillisToRenew > expectedRenewPeriodMillis) {
LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms"); LOGGER.errorf("%s lock %s renew tooks %d ms, while is supposed to take <%d ms", lockName, lostLock ? "failed" : "successful", elapsedMillisToRenew, expectedRenewPeriodMillis);
} }
final long measuredRenewPeriodNanos = renewStart - lastRenewStart; final long measuredRenewPeriodNanos = renewStart - lastRenewStart;
final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos); final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos);
if (measuredRenewPeriodMillis - expirationMillis > 100) { if (measuredRenewPeriodMillis - expirationMillis > 100) {
LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms"); LOGGER.errorf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis);
} else if (measuredRenewPeriodMillis - expectedRenewPeriodMillis > 100) { } else if (measuredRenewPeriodMillis - expectedRenewPeriodMillis > 100) {
LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms"); LOGGER.warnf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis);
} }
} }
} }

View File

@ -44,8 +44,10 @@ final class JdbcLeaseLock implements LeaseLock {
private final String isLocked; private final String isLocked;
private final String currentDateTime; private final String currentDateTime;
private final long expirationMillis; private final long expirationMillis;
private final int queryTimeout;
private boolean maybeAcquired; private boolean maybeAcquired;
private final String lockName; private final String lockName;
private long localExpirationTime;
/** /**
* The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection}, * The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
@ -59,6 +61,7 @@ final class JdbcLeaseLock implements LeaseLock {
String isLocked, String isLocked,
String currentDateTime, String currentDateTime,
long expirationMIllis, long expirationMIllis,
long queryTimeoutMillis,
String lockName) { String lockName) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) { if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH); throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
@ -73,12 +76,30 @@ final class JdbcLeaseLock implements LeaseLock {
this.maybeAcquired = false; this.maybeAcquired = false;
this.connectionProvider = connectionProvider; this.connectionProvider = connectionProvider;
this.lockName = lockName; this.lockName = lockName;
this.localExpirationTime = -1;
int expectedTimeout = (int) (queryTimeoutMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(queryTimeoutMillis) : -1);
if (queryTimeoutMillis >= 0) {
LOGGER.warn("queryTimeoutMillis is too low: it's suggested to configure a multi-seconds value. Disabling it because too low.");
expectedTimeout = -1;
}
this.queryTimeout = expectedTimeout;
} }
public String holderId() { public String holderId() {
return holderId; return holderId;
} }
/**
* Given that many DBMS won't support standard SQL queries to collect CURRENT_TIMESTAMP at milliseconds granularity,
* this value is stripped of the milliseconds part, making it less optimistic then the reality, if >= 0.<p>
* It's commonly used as an hard deadline for JDBC operations, hence is fine to not have a high precision.
*/
@Override
public long localExpirationTime() {
return localExpirationTime;
}
@Override @Override
public long expirationMillis() { public long expirationMillis() {
return expirationMillis; return expirationMillis;
@ -115,17 +136,24 @@ final class JdbcLeaseLock implements LeaseLock {
} }
private long dbCurrentTimeMillis(Connection connection) throws SQLException { private long dbCurrentTimeMillis(Connection connection) throws SQLException {
final long start = System.nanoTime();
try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) { try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) {
if (queryTimeout >= 0) {
currentDateTime.setQueryTimeout(queryTimeout);
}
final long startTime = stripMilliseconds(System.currentTimeMillis());
try (ResultSet resultSet = currentDateTime.executeQuery()) { try (ResultSet resultSet = currentDateTime.executeQuery()) {
resultSet.next(); resultSet.next();
final long endTime = stripMilliseconds(System.currentTimeMillis());
final Timestamp currentTimestamp = resultSet.getTimestamp(1); final Timestamp currentTimestamp = resultSet.getTimestamp(1);
final long elapsedTime = System.nanoTime() - start; final long currentTime = currentTimestamp.getTime();
if (LOGGER.isDebugEnabled()) { final long currentTimeMillis = stripMilliseconds(currentTime);
LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms", if (currentTimeMillis < startTime) {
lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime)); LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen AFTER %s on broker", lockName, holderId, currentTimestamp, new Timestamp(startTime));
} }
return currentTimestamp.getTime(); if (currentTimeMillis > endTime) {
LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen BEFORE %s on broker", lockName, holderId, currentTimestamp, new Timestamp(endTime));
}
return currentTime;
} }
} }
} }
@ -138,7 +166,8 @@ final class JdbcLeaseLock implements LeaseLock {
connection.setAutoCommit(false); connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) { try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) {
final long now = dbCurrentTimeMillis(connection); final long now = dbCurrentTimeMillis(connection);
final Timestamp expirationTime = new Timestamp(now + expirationMillis); final long localExpirationTime = now + expirationMillis;
final Timestamp expirationTime = new Timestamp(localExpirationTime);
if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s", LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
lockName, holderId, expirationTime); lockName, holderId, expirationTime);
@ -151,11 +180,13 @@ final class JdbcLeaseLock implements LeaseLock {
final boolean renewed = updatedRows == 1; final boolean renewed = updatedRows == 1;
connection.commit(); connection.commit();
if (!renewed) { if (!renewed) {
this.localExpirationTime = -1;
if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }", LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
lockName, holderId, readableLockStatus()); lockName, holderId, readableLockStatus());
} }
} else { } else {
this.localExpirationTime = stripMilliseconds(localExpirationTime);
LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId); LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
} }
return renewed; return renewed;
@ -170,6 +201,10 @@ final class JdbcLeaseLock implements LeaseLock {
} }
} }
private static long stripMilliseconds(long time) {
return (time / 1000) * 1000;
}
@Override @Override
public boolean tryAcquire() { public boolean tryAcquire() {
try (Connection connection = connectionProvider.getConnection()) { try (Connection connection = connectionProvider.getConnection()) {
@ -179,7 +214,8 @@ final class JdbcLeaseLock implements LeaseLock {
try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) { try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) {
final long now = dbCurrentTimeMillis(connection); final long now = dbCurrentTimeMillis(connection);
preparedStatement.setString(1, holderId); preparedStatement.setString(1, holderId);
final Timestamp expirationTime = new Timestamp(now + expirationMillis); final long localExpirationTime = now + expirationMillis;
final Timestamp expirationTime = new Timestamp(localExpirationTime);
preparedStatement.setTimestamp(2, expirationTime); preparedStatement.setTimestamp(2, expirationTime);
preparedStatement.setTimestamp(3, expirationTime); preparedStatement.setTimestamp(3, expirationTime);
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s", LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
@ -188,6 +224,7 @@ final class JdbcLeaseLock implements LeaseLock {
connection.commit(); connection.commit();
if (acquired) { if (acquired) {
this.maybeAcquired = true; this.maybeAcquired = true;
this.localExpirationTime = stripMilliseconds(localExpirationTime);
LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId); LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
} else { } else {
if (LOGGER.isDebugEnabled()) { if (LOGGER.isDebugEnabled()) {
@ -272,6 +309,7 @@ final class JdbcLeaseLock implements LeaseLock {
preparedStatement.setString(1, holderId); preparedStatement.setString(1, holderId);
final boolean released = preparedStatement.executeUpdate() == 1; final boolean released = preparedStatement.executeUpdate() == 1;
//consider it as released to avoid on finalize to be reclaimed //consider it as released to avoid on finalize to be reclaimed
this.localExpirationTime = -1;
this.maybeAcquired = false; this.maybeAcquired = false;
connection.commit(); connection.commit();
if (!released) { if (!released) {

View File

@ -23,8 +23,8 @@ import java.util.function.Supplier;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
@ -36,6 +36,8 @@ import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator; import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger; import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.server.impl.jdbc.LeaseLock.AcquireResult.Timeout;
/** /**
* JDBC implementation of {@link NodeManager}. * JDBC implementation of {@link NodeManager}.
*/ */
@ -53,12 +55,10 @@ public final class JdbcNodeManager extends NodeManager {
private final long lockAcquisitionTimeoutMillis; private final long lockAcquisitionTimeoutMillis;
private volatile boolean interrupted = false; private volatile boolean interrupted = false;
private final LeaseLock.Pauser pauser; private final LeaseLock.Pauser pauser;
private final IOCriticalErrorListener ioCriticalErrorListener;
public static JdbcNodeManager with(DatabaseStorageConfiguration configuration, public static JdbcNodeManager with(DatabaseStorageConfiguration configuration,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory, ExecutorFactory executorFactory) {
IOCriticalErrorListener ioCriticalErrorListener) {
validateTimeoutConfiguration(configuration); validateTimeoutConfiguration(configuration);
final SQLProvider.Factory sqlProviderFactory; final SQLProvider.Factory sqlProviderFactory;
if (configuration.getSqlProviderFactory() != null) { if (configuration.getSqlProviderFactory() != null) {
@ -74,8 +74,7 @@ public final class JdbcNodeManager extends NodeManager {
configuration.getConnectionProvider(), configuration.getConnectionProvider(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService, scheduledExecutorService,
executorFactory, executorFactory);
ioCriticalErrorListener);
} }
private static JdbcNodeManager usingConnectionProvider(String brokerId, private static JdbcNodeManager usingConnectionProvider(String brokerId,
@ -85,18 +84,16 @@ public final class JdbcNodeManager extends NodeManager {
JDBCConnectionProvider connectionProvider, JDBCConnectionProvider connectionProvider,
SQLProvider provider, SQLProvider provider,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory, ExecutorFactory executorFactory) {
IOCriticalErrorListener ioCriticalErrorListener) { return new JdbcNodeManager(() -> JdbcSharedStateManager.usingConnectionProvider(brokerId, lockExpirationMillis,
return new JdbcNodeManager( lockRenewPeriodMillis,
() -> JdbcSharedStateManager.usingConnectionProvider(brokerId,
lockExpirationMillis,
connectionProvider, connectionProvider,
provider), provider),
lockExpirationMillis,
lockRenewPeriodMillis, lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis, lockAcquisitionTimeoutMillis,
scheduledExecutorService, scheduledExecutorService,
executorFactory, executorFactory);
ioCriticalErrorListener);
} }
private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) { private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) {
@ -122,12 +119,12 @@ public final class JdbcNodeManager extends NodeManager {
} }
private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory, private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
long lockExpirationMillis,
long lockRenewPeriodMillis, long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis, long lockAcquisitionTimeoutMillis,
ScheduledExecutorService scheduledExecutorService, ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory, ExecutorFactory executorFactory) {
IOCriticalErrorListener ioCriticalErrorListener) { super(false);
super(false, null);
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis; this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS); this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.sharedStateManagerFactory = sharedStateManagerFactory; this.sharedStateManagerFactory = sharedStateManagerFactory;
@ -137,7 +134,7 @@ public final class JdbcNodeManager extends NodeManager {
"live", "live",
this.sharedStateManager.liveLock(), this.sharedStateManager.liveLock(),
lockRenewPeriodMillis, lockRenewPeriodMillis,
ioCriticalErrorListener); this::notifyLostLock);
this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of( this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of(
scheduledExecutorService, scheduledExecutorService,
executorFactory != null ? executorFactory != null ?
@ -145,17 +142,33 @@ public final class JdbcNodeManager extends NodeManager {
"backup", "backup",
this.sharedStateManager.backupLock(), this.sharedStateManager.backupLock(),
lockRenewPeriodMillis, lockRenewPeriodMillis,
ioCriticalErrorListener); this::notifyLostLock);
this.ioCriticalErrorListener = ioCriticalErrorListener;
this.sharedStateManager = null; this.sharedStateManager = null;
this.scheduledLiveLock = null; this.scheduledLiveLock = null;
this.scheduledBackupLock = null; this.scheduledBackupLock = null;
} }
@Override @Override
public void start() throws Exception { protected synchronized void notifyLostLock() {
try {
super.notifyLostLock();
} finally {
// if any of the notified listener has stopped the node manager or
// the node manager was already stopped
if (!isStarted()) {
return;
}
try {
stop();
} catch (Exception ex) {
LOGGER.warn("Stopping node manager has errored on lost lock notification", ex);
}
}
}
@Override
public synchronized void start() throws Exception {
try { try {
synchronized (this) {
if (isStarted()) { if (isStarted()) {
return; return;
} }
@ -166,14 +179,10 @@ public final class JdbcNodeManager extends NodeManager {
this.scheduledLiveLock = scheduledLiveLockFactory.get(); this.scheduledLiveLock = scheduledLiveLockFactory.get();
this.scheduledBackupLock = scheduledBackupLockFactory.get(); this.scheduledBackupLock = scheduledBackupLockFactory.get();
super.start(); super.start();
}
} catch (IllegalStateException e) { } catch (IllegalStateException e) {
this.sharedStateManager = null; this.sharedStateManager = null;
this.scheduledLiveLock = null; this.scheduledLiveLock = null;
this.scheduledBackupLock = null; this.scheduledBackupLock = null;
if (this.ioCriticalErrorListener != null) {
this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null);
}
throw e; throw e;
} }
} }
@ -200,42 +209,33 @@ public final class JdbcNodeManager extends NodeManager {
} }
@Override @Override
public boolean isAwaitingFailback() throws Exception { public boolean isAwaitingFailback() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER isAwaitingFailback"); LOGGER.debug("ENTER isAwaitingFailback");
try { try {
return readSharedState() == SharedStateManager.State.FAILING_BACK; return readSharedState() == SharedStateManager.State.FAILING_BACK;
} catch (IllegalStateException e) {
LOGGER.warn("cannot retrieve the live state: assume it's not yet failed back", e);
return false;
} finally { } finally {
LOGGER.debug("EXIT isAwaitingFailback"); LOGGER.debug("EXIT isAwaitingFailback");
} }
} }
@Override @Override
public boolean isBackupLive() throws Exception { public boolean isBackupLive() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER isBackupLive"); LOGGER.debug("ENTER isBackupLive");
try { try {
//is anyone holding the live lock? //is anyone holding the live lock?
return this.scheduledLiveLock.lock().isHeld(); return this.scheduledLiveLock.lock().isHeld();
} catch (IllegalStateException e) {
throw new NodeManagerException(e);
} finally { } finally {
LOGGER.debug("EXIT isBackupLive"); LOGGER.debug("EXIT isBackupLive");
} }
} }
@Override
public void stopBackup() throws Exception {
LOGGER.debug("ENTER stopBackup");
try {
if (this.scheduledBackupLock.isStarted()) {
LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
this.scheduledBackupLock.stop();
this.scheduledBackupLock.lock().release();
} else {
LOGGER.debug("scheduledBackupLock is not running");
}
} finally {
LOGGER.debug("EXIT stopBackup");
}
}
@Override @Override
public void interrupt() { public void interrupt() {
LOGGER.debug("ENTER interrupted"); LOGGER.debug("ENTER interrupted");
@ -245,7 +245,8 @@ public final class JdbcNodeManager extends NodeManager {
} }
@Override @Override
public void releaseBackup() throws Exception { public void releaseBackup() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER releaseBackup"); LOGGER.debug("ENTER releaseBackup");
try { try {
if (this.scheduledBackupLock.isStarted()) { if (this.scheduledBackupLock.isStarted()) {
@ -255,19 +256,47 @@ public final class JdbcNodeManager extends NodeManager {
} else { } else {
LOGGER.debug("scheduledBackupLock is not running"); LOGGER.debug("scheduledBackupLock is not running");
} }
} catch (IllegalStateException e) {
throw new NodeManagerException(e);
} finally { } finally {
LOGGER.debug("EXIT releaseBackup"); LOGGER.debug("EXIT releaseBackup");
} }
} }
/** /**
* Try to acquire a lock, failing with an exception otherwise. * Try to acquire a lock
*/ */
private void lock(LeaseLock lock) throws Exception { private void lock(LeaseLock lock) throws ActiveMQLockAcquisitionTimeoutException, InterruptedException {
final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted); final long lockAcquisitionTimeoutNanos = lockAcquisitionTimeoutMillis >= 0 ?
TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeoutMillis) : -1;
LeaseLock.AcquireResult acquireResult = null;
final long start = System.nanoTime();
while (acquireResult == null) {
checkStarted();
// measure distance from the timeout
final long remainingNanos = remainingNanos(start, lockAcquisitionTimeoutNanos);
if (remainingNanos == 0) {
acquireResult = Timeout;
continue;
}
final long remainingMillis = remainingNanos > 0 ? TimeUnit.NANOSECONDS.toMillis(remainingNanos) : -1;
try {
acquireResult = lock.tryAcquire(remainingMillis, this.pauser, () -> !this.interrupted);
} catch (IllegalStateException e) {
LOGGER.warn("Errored while trying to acquire lock", e);
if (remainingNanos(start, lockAcquisitionTimeoutNanos) == 0) {
acquireResult = Timeout;
continue;
}
// that's not precise, but it's ok: it can trigger the timeout right after the pause,
// depending by the pause length. The sole purpose of the pause is to save
// hammering with requests the DBMS if the connection is down
this.pauser.idle();
}
}
switch (acquireResult) { switch (acquireResult) {
case Timeout: case Timeout:
throw new Exception("timed out waiting for lock"); throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
case Exit: case Exit:
this.interrupted = false; this.interrupted = false;
throw new InterruptedException("LeaseLock was interrupted"); throw new InterruptedException("LeaseLock was interrupted");
@ -279,6 +308,20 @@ public final class JdbcNodeManager extends NodeManager {
} }
private static long remainingNanos(long start, long timeoutNanos) {
if (timeoutNanos > 0) {
final long elapsedNanos = (System.nanoTime() - start);
if (elapsedNanos < timeoutNanos) {
return timeoutNanos - elapsedNanos;
} else {
return 0;
}
} else {
assert timeoutNanos == -1;
return -1;
}
}
private void checkInterrupted(Supplier<String> message) throws InterruptedException { private void checkInterrupted(Supplier<String> message) throws InterruptedException {
if (this.interrupted) { if (this.interrupted) {
interrupted = false; interrupted = false;
@ -286,52 +329,77 @@ public final class JdbcNodeManager extends NodeManager {
} }
} }
private void renewLiveLockIfNeeded(final long acquiredOn) { private void renewLock(ScheduledLeaseLock lock) {
final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn); boolean lostLock = true;
if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) { IllegalStateException renewEx = null;
if (!this.scheduledLiveLock.lock().renew()) { try {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); lostLock = !this.scheduledLiveLock.lock().renew();
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null); } catch (IllegalStateException e) {
throw e; renewEx = e;
} }
if (lostLock) {
notifyLostLock();
if (renewEx == null) {
renewEx = new IllegalStateException(lock.lockName() + " lock isn't renewed");
}
throw renewEx;
} }
} }
/** /**
* Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise * Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise
*/ */
private boolean lockLiveAndCheckLiveState() throws Exception { private boolean lockLiveAndCheckLiveState() throws ActiveMQLockAcquisitionTimeoutException, InterruptedException {
lock(this.scheduledLiveLock.lock());
final long acquiredOn = System.nanoTime();
boolean liveWhileLocked = false;
//check if the state is live
final SharedStateManager.State stateWhileLocked;
try { try {
stateWhileLocked = readSharedState(); lock(this.scheduledLiveLock.lock());
} catch (Throwable t) { //check if the state is live
LOGGER.error("error while holding the live node lock and tried to read the shared state", t); while (true) {
this.scheduledLiveLock.lock().release(); try {
throw t; final SharedStateManager.State stateWhileLocked = readSharedState();
final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime();
if (System.currentTimeMillis() > localExpirationTime) {
// the lock can be assumed to be expired,
// so the state isn't worthy to be considered
return false;
} }
if (stateWhileLocked == SharedStateManager.State.LIVE) { if (stateWhileLocked == SharedStateManager.State.LIVE) {
renewLiveLockIfNeeded(acquiredOn); // TODO need some tolerance//renew here?
liveWhileLocked = true; return true;
} else { } else {
LOGGER.debugf("state is %s while holding the live lock: releasing live lock", stateWhileLocked); // state is not live: can (try to) release the lock
//state is not live: can (try to) release the lock
this.scheduledLiveLock.lock().release(); this.scheduledLiveLock.lock().release();
return false;
}
} catch (IllegalStateException e) {
LOGGER.error("error while holding the live node lock and tried to read the shared state or to release the lock", e);
checkStarted();
checkInterrupted(() -> "interrupt on error while checking live state");
pauser.idle();
final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime();
if (System.currentTimeMillis() > localExpirationTime) {
return false;
}
}
}
} catch (InterruptedException e) {
throw e;
} }
return liveWhileLocked;
} }
@Override @Override
public void awaitLiveNode() throws Exception { public void awaitLiveNode() throws NodeManagerException, InterruptedException {
checkStarted();
LOGGER.debug("ENTER awaitLiveNode"); LOGGER.debug("ENTER awaitLiveNode");
try { try {
boolean liveWhileLocked = false; boolean liveWhileLocked = false;
while (!liveWhileLocked) { while (!liveWhileLocked) {
//check first without holding any lock //check first without holding any lock
final SharedStateManager.State state = readSharedState(); SharedStateManager.State state = null;
try {
state = readSharedState();
} catch (IllegalStateException e) {
LOGGER.warn("Errored while reading shared state", e);
}
if (state == SharedStateManager.State.LIVE) { if (state == SharedStateManager.State.LIVE) {
//verify if the state is live while holding the live node lock too //verify if the state is live while holding the live node lock too
liveWhileLocked = lockLiveAndCheckLiveState(); liveWhileLocked = lockLiveAndCheckLiveState();
@ -339,6 +407,7 @@ public final class JdbcNodeManager extends NodeManager {
LOGGER.debugf("state while awaiting live node: %s", state); LOGGER.debugf("state while awaiting live node: %s", state);
} }
if (!liveWhileLocked) { if (!liveWhileLocked) {
checkStarted();
checkInterrupted(() -> "awaitLiveNode got interrupted!"); checkInterrupted(() -> "awaitLiveNode got interrupted!");
pauser.idle(); pauser.idle();
} }
@ -346,32 +415,51 @@ public final class JdbcNodeManager extends NodeManager {
//state is LIVE and live lock is acquired and valid //state is LIVE and live lock is acquired and valid
LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE); LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE);
this.scheduledLiveLock.start(); this.scheduledLiveLock.start();
} catch (InterruptedException e) {
throw e;
} catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
throw new NodeManagerException(e);
} finally { } finally {
LOGGER.debug("EXIT awaitLiveNode"); LOGGER.debug("EXIT awaitLiveNode");
} }
} }
@Override @Override
public void startBackup() throws Exception { public void startBackup() throws NodeManagerException, InterruptedException {
checkStarted();
LOGGER.debug("ENTER startBackup"); LOGGER.debug("ENTER startBackup");
try { try {
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup(); ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
lock(scheduledBackupLock.lock()); lock(scheduledBackupLock.lock());
scheduledBackupLock.start(); scheduledBackupLock.start();
ActiveMQServerLogger.LOGGER.gotBackupLock(); ActiveMQServerLogger.LOGGER.gotBackupLock();
if (getUUID() == null) if (getUUID() == null)
readNodeId(); readNodeId();
} catch (InterruptedException ie) {
throw ie;
} catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
throw new NodeManagerException(e);
} finally { } finally {
LOGGER.debug("EXIT startBackup"); LOGGER.debug("EXIT startBackup");
} }
} }
@Override @Override
public ActivateCallback startLiveNode() throws Exception { public ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException {
checkStarted();
LOGGER.debug("ENTER startLiveNode"); LOGGER.debug("ENTER startLiveNode");
try {
boolean done = false;
while (!done) {
try { try {
setFailingBack(); setFailingBack();
done = true;
} catch (IllegalStateException e) {
LOGGER.warn("cannot set failing back state, retry", e);
pauser.idle();
checkInterrupted(() -> "interrupt while trying to set failing back state");
}
}
final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds"; final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
@ -389,21 +477,42 @@ public final class JdbcNodeManager extends NodeManager {
LOGGER.debug("ENTER activationComplete"); LOGGER.debug("ENTER activationComplete");
try { try {
//state can be written only if the live renew task is running //state can be written only if the live renew task is running
boolean done = false;
while (!done) {
try {
setLive(); setLive();
} catch (Exception e) { done = true;
} catch (IllegalStateException e) {
LOGGER.warn("Errored while trying to setLive", e);
checkStarted();
pauser.idle();
final long localExpirationTime = scheduledLiveLock.lock().localExpirationTime();
// optimistic: is just to set a deadline while retrying
if (System.currentTimeMillis() > localExpirationTime) {
throw new IllegalStateException("live lock is probably expired: failed to setLive");
}
}
}
} catch (IllegalStateException e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
throw new NodeManagerException(e);
} finally { } finally {
LOGGER.debug("EXIT activationComplete"); LOGGER.debug("EXIT activationComplete");
} }
} }
}; };
} catch (InterruptedException ie) {
throw ie;
} catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
throw new NodeManagerException(e);
} finally { } finally {
LOGGER.debug("EXIT startLiveNode"); LOGGER.debug("EXIT startLiveNode");
} }
} }
@Override @Override
public void pauseLiveServer() throws Exception { public void pauseLiveServer() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER pauseLiveServer"); LOGGER.debug("ENTER pauseLiveServer");
try { try {
if (scheduledLiveLock.isStarted()) { if (scheduledLiveLock.isStarted()) {
@ -413,23 +522,21 @@ public final class JdbcNodeManager extends NodeManager {
scheduledLiveLock.lock().release(); scheduledLiveLock.lock().release();
} else { } else {
LOGGER.debug("scheduledLiveLock is not running: try renew live lock"); LOGGER.debug("scheduledLiveLock is not running: try renew live lock");
if (scheduledLiveLock.lock().renew()) { renewLock(scheduledLiveLock);
LOGGER.debug("live lock renewed: set paused shared state and release live lock"); LOGGER.debug("live lock renewed: set paused shared state and release live lock");
setPaused(); setPaused();
scheduledLiveLock.lock().release(); scheduledLiveLock.lock().release();
} else {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
throw e;
}
} }
} catch (IllegalStateException e) {
throw new NodeManagerException(e);
} finally { } finally {
LOGGER.debug("EXIT pauseLiveServer"); LOGGER.debug("EXIT pauseLiveServer");
} }
} }
@Override @Override
public void crashLiveServer() throws Exception { public void crashLiveServer() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER crashLiveServer"); LOGGER.debug("ENTER crashLiveServer");
try { try {
if (this.scheduledLiveLock.isStarted()) { if (this.scheduledLiveLock.isStarted()) {
@ -446,10 +553,18 @@ public final class JdbcNodeManager extends NodeManager {
@Override @Override
public void awaitLiveStatus() { public void awaitLiveStatus() {
checkStarted();
LOGGER.debug("ENTER awaitLiveStatus"); LOGGER.debug("ENTER awaitLiveStatus");
try { try {
while (readSharedState() != SharedStateManager.State.LIVE) { SharedStateManager.State state = null;
while (state != SharedStateManager.State.LIVE) {
try {
state = readSharedState();
} catch (IllegalStateException e) {
LOGGER.warn("Errored while trying to read shared state", e);
}
pauser.idle(); pauser.idle();
checkStarted();
} }
} finally { } finally {
LOGGER.debug("EXIT awaitLiveStatus"); LOGGER.debug("EXIT awaitLiveStatus");
@ -481,6 +596,7 @@ public final class JdbcNodeManager extends NodeManager {
@Override @Override
public SimpleString readNodeId() { public SimpleString readNodeId() {
checkStarted();
final UUID nodeId = this.sharedStateManager.readNodeId(); final UUID nodeId = this.sharedStateManager.readNodeId();
LOGGER.debugf("readNodeId nodeId = %s", nodeId); LOGGER.debugf("readNodeId nodeId = %s", nodeId);
setUUID(nodeId); setUUID(nodeId);

View File

@ -39,6 +39,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private static final int MAX_SETUP_ATTEMPTS = 20; private static final int MAX_SETUP_ATTEMPTS = 20;
private final String holderId; private final String holderId;
private final long lockExpirationMillis; private final long lockExpirationMillis;
private final long queryTimeoutMillis;
private JdbcLeaseLock liveLock; private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock; private JdbcLeaseLock backupLock;
private String readNodeId; private String readNodeId;
@ -51,7 +52,16 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
long locksExpirationMillis, long locksExpirationMillis,
JDBCConnectionProvider connectionProvider, JDBCConnectionProvider connectionProvider,
SQLProvider provider) { SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis); return usingConnectionProvider(holderId, locksExpirationMillis, -1, connectionProvider, provider);
}
public static JdbcSharedStateManager usingConnectionProvider(String holderId,
long locksExpirationMillis,
long queryTimeoutMillis,
JDBCConnectionProvider connectionProvider,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis,
queryTimeoutMillis);
sharedStateManager.setJdbcConnectionProvider(connectionProvider); sharedStateManager.setJdbcConnectionProvider(connectionProvider);
sharedStateManager.setSqlProvider(provider); sharedStateManager.setSqlProvider(provider);
try { try {
@ -76,20 +86,35 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
JDBCConnectionProvider connectionProvider, JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider, SQLProvider sqlProvider,
long expirationMillis) { long expirationMillis) {
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "LIVE"); return createLiveLock(holderId, connectionProvider, sqlProvider, expirationMillis, -1);
}
static JdbcLeaseLock createLiveLock(String holderId,
JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider,
long expirationMillis,
long queryTimeoutMillis) {
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(),
sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(),
sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(),
expirationMillis, queryTimeoutMillis, "LIVE");
} }
static JdbcLeaseLock createBackupLock(String holderId, static JdbcLeaseLock createBackupLock(String holderId,
JDBCConnectionProvider connectionProvider, JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider, SQLProvider sqlProvider,
long expirationMillis) { long expirationMillis,
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "BACKUP"); long queryTimeoutMillis) {
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(),
sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(),
sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(),
expirationMillis, queryTimeoutMillis, "BACKUP");
} }
@Override @Override
protected void prepareStatements() { protected void prepareStatements() {
this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis); this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis);
this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis); this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis);
this.readNodeId = sqlProvider.readNodeIdSQL(); this.readNodeId = sqlProvider.readNodeIdSQL();
this.writeNodeId = sqlProvider.writeNodeIdSQL(); this.writeNodeId = sqlProvider.writeNodeIdSQL();
this.initializeNodeId = sqlProvider.initializeNodeIdSQL(); this.initializeNodeId = sqlProvider.initializeNodeIdSQL();
@ -97,9 +122,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
this.readState = sqlProvider.readStateSQL(); this.readState = sqlProvider.readStateSQL();
} }
private JdbcSharedStateManager(String holderId, long lockExpirationMillis) { private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long queryTimeoutMillis) {
this.holderId = holderId; this.holderId = holderId;
this.lockExpirationMillis = lockExpirationMillis; this.lockExpirationMillis = lockExpirationMillis;
this.queryTimeoutMillis = queryTimeoutMillis;
} }
@Override @Override

View File

@ -41,6 +41,8 @@ interface LeaseLock extends AutoCloseable {
boolean keepRunning(); boolean keepRunning();
} }
long localExpirationTime();
interface Pauser { interface Pauser {
void idle(); void idle();

View File

@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQComponent; import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
/** /**
@ -28,17 +28,25 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
*/ */
interface ScheduledLeaseLock extends ActiveMQComponent { interface ScheduledLeaseLock extends ActiveMQComponent {
@Override
void start();
@Override
void stop();
LeaseLock lock(); LeaseLock lock();
long renewPeriodMillis(); long renewPeriodMillis();
String lockName();
static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService, static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor, ArtemisExecutor executor,
String lockName, String lockName,
LeaseLock lock, LeaseLock lock,
long renewPeriodMillis, long renewPeriodMillis,
IOCriticalErrorListener ioCriticalErrorListener) { LockListener lockListener) {
return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener); return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, lockListener);
} }
} }

View File

@ -21,15 +21,23 @@ import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream; import java.util.stream.Stream;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
@ -38,6 +46,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized; import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameter;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.core.Is.is;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class JdbcLeaseLockTest extends ActiveMQTestBase { public class JdbcLeaseLockTest extends ActiveMQTestBase {
@ -183,7 +194,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test @Test
public void shouldAcquireExpiredLock() throws InterruptedException { public void shouldAcquireExpiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try { try {
Thread.sleep(lock.expirationMillis() * 2); Thread.sleep(lock.expirationMillis() * 2);
@ -197,13 +208,13 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test @Test
public void shouldOtherAcquireExpiredLock() throws InterruptedException { public void shouldOtherAcquireExpiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try { try {
Thread.sleep(lock.expirationMillis() * 2); Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld()); Assert.assertFalse("lock is already expired", lock.isHeld());
final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10)); final LeaseLock otherLock = lock(10);
try { try {
Assert.assertTrue("lock is already expired", otherLock.tryAcquire()); Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
} finally { } finally {
@ -237,7 +248,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test @Test
public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException { public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try { try {
Thread.sleep(lock.expirationMillis() * 2); Thread.sleep(lock.expirationMillis() * 2);
@ -251,7 +262,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test @Test
public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException { public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try { try {
Thread.sleep(lock.expirationMillis() * 2); Thread.sleep(lock.expirationMillis() * 2);
@ -268,5 +279,97 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
lock.release(); lock.release();
} }
} }
@Test
public void shouldNotNotifyLostLock() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
final ArtemisExecutor artemisExecutor = factory.getExecutor();
final AtomicLong lostLock = new AtomicLong();
final LockListener lockListener = () -> {
lostLock.incrementAndGet();
};
final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
.of(scheduledExecutorService, artemisExecutor,
"test", lock(), dbConf.getJdbcLockRenewPeriodMillis(), lockListener);
Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
scheduledLeaseLock.start();
Assert.assertEquals(0, lostLock.get());
scheduledLeaseLock.stop();
Assert.assertEquals(0, lostLock.get());
executorService.shutdown();
scheduledExecutorService.shutdown();
scheduledLeaseLock.lock().release();
}
@Test
public void shouldNotifyManyTimesLostLock() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
final ArtemisExecutor artemisExecutor = factory.getExecutor();
final AtomicLong lostLock = new AtomicLong();
final LockListener lockListener = () -> {
lostLock.incrementAndGet();
};
final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
.of(scheduledExecutorService, artemisExecutor,
"test", lock(TimeUnit.SECONDS.toMillis(1)), 100, lockListener);
Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
scheduledLeaseLock.start();
// should let the renew to happen at least 1 time, excluding the time to start a scheduled task
TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis());
Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller());
Assert.assertEquals(0, lostLock.get());
scheduledLeaseLock.lock().release();
Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller());
TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis());
Assert.assertThat(lostLock.get(), is(greaterThanOrEqualTo(2L)));
scheduledLeaseLock.stop();
executorService.shutdown();
scheduledExecutorService.shutdown();
}
@Test
public void shouldNotifyOnceLostLockIfStopped() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
final ArtemisExecutor artemisExecutor = factory.getExecutor();
final AtomicLong lostLock = new AtomicLong();
final AtomicReference<ScheduledLeaseLock> lock = new AtomicReference<>();
final AtomicReference<Throwable> stopErrors = new AtomicReference<>();
final LockListener lockListener = () -> {
lostLock.incrementAndGet();
try {
lock.get().stop();
} catch (Throwable e) {
stopErrors.set(e);
}
};
final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
.of(scheduledExecutorService, artemisExecutor, "test", lock(TimeUnit.SECONDS.toMillis(1)),
100, lockListener);
lock.set(scheduledLeaseLock);
Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
lostLock.set(0);
scheduledLeaseLock.start();
Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller());
scheduledLeaseLock.lock().release();
Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller());
Wait.assertTrue(() -> lostLock.get() > 0);
Assert.assertFalse(scheduledLeaseLock.isStarted());
// wait enough to see if it get triggered again
TimeUnit.MILLISECONDS.sleep(scheduledLeaseLock.renewPeriodMillis());
Assert.assertEquals(1, lostLock.getAndSet(0));
Assert.assertNull(stopErrors.getAndSet(null));
scheduledLeaseLock.stop();
executorService.shutdown();
scheduledExecutorService.shutdown();
}
} }

View File

@ -23,7 +23,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -105,17 +104,9 @@ public class JdbcNodeManagerTest extends ActiveMQTestBase {
@Test @Test
public void shouldStartAndStopGracefullyTest() throws Exception { public void shouldStartAndStopGracefullyTest() throws Exception {
final AtomicReference<String> criticalError = new AtomicReference<>(); final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null);
final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null, (code, message, file) -> criticalError.lazySet(message));
try {
nodeManager.start(); nodeManager.start();
} finally {
nodeManager.stop(); nodeManager.stop();
final String error = criticalError.get();
if (error != null) {
Assert.fail(error);
}
}
} }
} }

View File

@ -522,6 +522,7 @@ public abstract class ActiveMQTestBase extends Assert {
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis()); dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis()); dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis()); dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
dbStorageConfiguration.setJdbcNetworkTimeout(-1);
return dbStorageConfiguration; return dbStorageConfiguration;
} }
@ -530,11 +531,11 @@ public abstract class ActiveMQTestBase extends Assert {
} }
protected long getJdbcLockExpirationMillis() { protected long getJdbcLockExpirationMillis() {
return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis()); return Long.getLong("jdbc.lock.expiration", 4_000);
} }
protected long getJdbcLockRenewPeriodMillis() { protected long getJdbcLockRenewPeriodMillis() {
return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis()); return Long.getLong("jdbc.lock.renew", 200);
} }
public void destroyTables(List<String> tableNames) throws Exception { public void destroyTables(List<String> tableNames) throws Exception {

View File

@ -18,12 +18,12 @@
package org.apache.activemq.artemis.tests.extras.byteman; package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
import org.apache.activemq.artemis.utils.Wait; import org.apache.activemq.artemis.utils.Wait;
import org.jboss.byteman.contrib.bmunit.BMRule; import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules; import org.jboss.byteman.contrib.bmunit.BMRules;
@ -42,28 +42,33 @@ public class FileLockMonitorTest {
private ScheduledThreadPoolExecutor executor; private ScheduledThreadPoolExecutor executor;
@Before @Before
public void handleLockFile() throws IOException { public void handleLockFile() throws Exception {
sharedDir = File.createTempFile("shared-dir", ""); sharedDir = File.createTempFile("shared-dir", "");
sharedDir.delete(); sharedDir.delete();
Assert.assertTrue(sharedDir.mkdir()); Assert.assertTrue(sharedDir.mkdir());
lostLock = false;
} }
@Test @Test
@BMRules(rules = { @BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "sun.nio.ch.FileLockImpl", targetMethod = "isValid", action = "return false;") }) @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "isLiveLockLost", action = "return true;") })
public void testLockMonitorInvalid() throws Exception { public void testLockMonitorInvalid() throws Exception {
lostLock = false; lostLock = false;
startServer(); startServer();
Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100); Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 20_000, 100);
nodeManager.isStarted(); nodeManager.isStarted();
nodeManager.crashLiveServer(); nodeManager.crashLiveServer();
executor.shutdown(); executor.shutdown();
} }
public static void throwNodeManagerException(String msg) {
throw new NodeManager.NodeManagerException(msg);
}
@Test @Test
@BMRules(rules = { @BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "throw new java.io.IOException(\"EFS is disconnected\");") }) @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager",
targetMethod = "getState",
action = "org.apache.activemq.artemis.tests.extras.byteman.FileLockMonitorTest.throwNodeManagerException(\"EFS is disconnected\");") })
public void testLockMonitorIOException() throws Exception { public void testLockMonitorIOException() throws Exception {
lostLock = false; lostLock = false;
startServer(); startServer();
@ -95,14 +100,13 @@ public class FileLockMonitorTest {
public LockListener startServer() throws Exception { public LockListener startServer() throws Exception {
executor = new ScheduledThreadPoolExecutor(2); executor = new ScheduledThreadPoolExecutor(2);
nodeManager = new FileLockNodeManager(sharedDir, false, executor); nodeManager = new FileLockNodeManager(sharedDir, false, executor);
LockListener listener = nodeManager.new LockListener() { LockListener listener = () -> {
@Override
protected void lostLock() throws Exception {
lostLock = true; lostLock = true;
try {
nodeManager.crashLiveServer(); nodeManager.crashLiveServer();
} catch (Throwable t) {
t.printStackTrace();
} }
}; };
nodeManager.registerLockListener(listener); nodeManager.registerLockListener(listener);

View File

@ -78,7 +78,7 @@ public class FileLockNodeManagerTest {
manager.awaitLiveNode(); manager.awaitLiveNode();
} catch (Exception e) { } catch (Exception e) {
long stop = System.currentTimeMillis(); long stop = System.currentTimeMillis();
if (!"timed out waiting for lock".equals(e.getMessage())) { if (!"timed out waiting for lock".equals(e.getCause().getMessage())) {
throw e; throw e;
} }
return stop - start; return stop - start;

View File

@ -1,151 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class SharedStoreBackupActivationTest extends FailoverTestBase {
private static Logger logger = Logger.getLogger(SharedStoreBackupActivationTest.class);
private static volatile boolean throwException = false;
private static CountDownLatch exceptionThrownLatch;
public static synchronized boolean isThrowException() {
logger.debugf("Throwing IOException during FileLockNodeManager.tryLock(): %s", throwException);
if (exceptionThrownLatch != null) {
exceptionThrownLatch.countDown();
}
return throwException;
}
/**
* Waits for the backup server to call FileLockNodeManager.tryLock().
*/
public static void awaitTryLock(boolean throwException) throws InterruptedException {
synchronized (SharedStoreBackupActivationTest.class) {
SharedStoreBackupActivationTest.throwException = throwException;
exceptionThrownLatch = new CountDownLatch(1);
}
logger.debugf("Awaiting backup to perform FileLockNodeManager.tryLock()");
boolean ret = exceptionThrownLatch.await(10, TimeUnit.SECONDS);
SharedStoreBackupActivationTest.throwException = false;
Assert.assertTrue("FileLockNodeManager.tryLock() was not called during specified timeout", ret);
logger.debugf("Awaiting FileLockNodeManager.tryLock() done");
}
@Test
@BMRules(
rules = {@BMRule(
name = "throw IOException during activation",
targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager",
targetMethod = "tryLock",
targetLocation = "AT ENTRY",
condition = "org.apache.activemq.artemis.tests.extras.byteman.SharedStoreBackupActivationTest.isThrowException()",
action = "THROW new IOException(\"IO Error\");")
})
public void testFailOverAfterTryLockException() throws Exception {
Assert.assertTrue(liveServer.isActive());
Assert.assertFalse(backupServer.isActive());
// wait for backup to try to acquire lock, once without exception (acquiring will not succeed because live is
// still active)
awaitTryLock(false);
// wait for backup to try to acquire lock, this time throw an IOException
logger.debug("Causing exception");
awaitTryLock(true);
// stop live server
logger.debugf("Stopping live server");
liveServer.stop();
waitForServerToStop(liveServer.getServer());
logger.debugf("Live server stopped, waiting for backup activation");
backupServer.getServer().waitForActivation(10, TimeUnit.SECONDS);
// backup should be activated by now
Assert.assertFalse(liveServer.isActive());
Assert.assertTrue("Backup server didn't activate", backupServer.isActive());
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMConnector(live);
}
@Override
protected void createConfigs() throws Exception {
File sharedDir = File.createTempFile("shared-dir", "");
sharedDir.delete();
Assert.assertTrue(sharedDir.mkdir());
logger.debugf("Created shared store directory %s", sharedDir.getCanonicalPath());
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
// nodes must use separate FileLockNodeManager instances!
NodeManager liveNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
NodeManager backupNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
backupConfig = super.createDefaultConfig(false)
.clearAcceptorConfigurations()
.addAcceptorConfiguration(getAcceptorTransportConfiguration(false))
.setHAPolicyConfiguration(
new SharedStoreSlavePolicyConfiguration()
.setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false))
.setRestartBackup(false))
.addConnectorConfiguration(liveConnector.getName(), liveConnector)
.addConnectorConfiguration(backupConnector.getName(), backupConnector)
.addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
backupServer = createTestableServer(backupConfig, backupNodeManager);
liveConfig = super.createDefaultConfig(false)
.clearAcceptorConfigurations()
.addAcceptorConfiguration(getAcceptorTransportConfiguration(true))
.setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true))
.addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName()))
.addConnectorConfiguration(liveConnector.getName(), liveConnector);
liveServer = createTestableServer(liveConfig, liveNodeManager);
}
}

View File

@ -18,34 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.List; import java.util.List;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.SpawnedVMSupport; import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.activemq.artemis.utils.UUID;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test;
public class RealNodeManagerTest extends NodeManagerTest { public class FileLockNodeManagerTest extends NodeManagerTest {
@Test
public void testId() throws Exception {
NodeManager nodeManager = new FileLockNodeManager(new File(getTemporaryDir()), false);
nodeManager.start();
UUID id1 = nodeManager.getUUID();
nodeManager.stop();
nodeManager.start();
ActiveMQTestBase.assertEqualsByteArrays(id1.asBytes(), nodeManager.getUUID().asBytes());
nodeManager.stop();
}
@Override @Override
public void performWork(NodeManagerAction... actions) throws Exception { public void performWork(NodeManagerAction... actions) throws Exception {
List<Process> processes = new ArrayList<>(); List<Process> processes = new ArrayList<>();
for (NodeManagerAction action : actions) { for (NodeManagerAction action : actions) {
Process p = SpawnedVMSupport.spawnVM(NodeManagerAction.class.getName(), "-Xms512m", "-Xmx512m", new String[0], true, true, true, action.getWork()); final String[] args = new String[action.works() + 1];
args[0] = getTemporaryDir();
action.getWork(args, 1);
Process p = SpawnedVMSupport.spawnVM(this.getClass().getName(), "-Xms50m", "-Xmx512m", new String[0], true, true, true, args);
processes.add(p); processes.add(p);
} }
for (Process process : processes) { for (Process process : processes) {
@ -58,4 +47,8 @@ public class RealNodeManagerTest extends NodeManagerTest {
} }
} }
public static void main(String[] args) throws Exception {
NodeManagerAction.execute(Arrays.copyOfRange(args, 1, args.length), new FileLockNodeManager(new File(args[0]), false));
}
} }

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.Assert;
public class JdbcNodeManagerTest extends NodeManagerTest {
@Override
public void performWork(NodeManagerAction... actions) throws Exception {
List<NodeRunner> nodeRunners = new ArrayList<>();
final ThreadFactory daemonThreadFactory = t -> {
final Thread th = new Thread(t);
th.setDaemon(true);
return th;
};
Thread[] threads = new Thread[actions.length];
List<ExecutorService> executors = new ArrayList<>(actions.length);
List<NodeManager> nodeManagers = new ArrayList<>(actions.length * 2);
AtomicBoolean failedRenew = new AtomicBoolean(false);
for (NodeManagerAction action : actions) {
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
nodeManager.start();
NodeRunner nodeRunner = new NodeRunner(nodeManager, action);
nodeRunners.add(nodeRunner);
nodeManagers.add(nodeManager);
executors.add(scheduledExecutorService);
executors.add(executor);
}
for (int i = 0, nodeRunnersSize = nodeRunners.size(); i < nodeRunnersSize; i++) {
NodeRunner nodeRunner = nodeRunners.get(i);
threads[i] = new Thread(nodeRunner);
threads[i].start();
}
boolean isDebug = isDebug();
for (Thread thread : threads) {
try {
if (isDebug) {
thread.join();
} else {
thread.join(60_000);
}
} catch (InterruptedException e) {
//
}
if (thread.isAlive()) {
thread.interrupt();
fail("thread still running");
}
}
// forcibly stop node managers
nodeManagers.forEach(nodeManager -> {
try {
nodeManager.stop();
} catch (Exception e) {
// won't prevent the test to complete
e.printStackTrace();
}
});
// stop executors
executors.forEach(ExecutorService::shutdownNow);
for (NodeRunner nodeRunner : nodeRunners) {
if (nodeRunner.e != null) {
nodeRunner.e.printStackTrace();
fail(nodeRunner.e.getMessage());
}
}
Assert.assertFalse("Some of the lease locks has failed to renew the locks", failedRenew.get());
}
}

View File

@ -16,10 +16,10 @@
*/ */
package org.apache.activemq.artemis.tests.integration.cluster; package org.apache.activemq.artemis.tests.integration.cluster;
import java.io.File; import java.util.Arrays;
import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.utils.UUID;
public class NodeManagerAction { public class NodeManagerAction {
@ -35,6 +35,7 @@ public class NodeManagerAction {
public static final int HAS_BACKUP = 11; public static final int HAS_BACKUP = 11;
public static final int DOESNT_HAVE_LIVE = 12; public static final int DOESNT_HAVE_LIVE = 12;
public static final int DOESNT_HAVE_BACKUP = 13; public static final int DOESNT_HAVE_BACKUP = 13;
public static final int CHECK_ID = 14;
private final int[] work; private final int[] work;
@ -82,7 +83,6 @@ public class NodeManagerAction {
} }
break; break;
case HAS_BACKUP: case HAS_BACKUP:
if (!hasBackupLock) { if (!hasBackupLock) {
throw new IllegalStateException("backup lock not held"); throw new IllegalStateException("backup lock not held");
} }
@ -93,37 +93,50 @@ public class NodeManagerAction {
} }
break; break;
case DOESNT_HAVE_BACKUP: case DOESNT_HAVE_BACKUP:
if (hasBackupLock) { if (hasBackupLock) {
throw new IllegalStateException("backup lock held"); throw new IllegalStateException("backup lock held");
} }
break; break;
case CHECK_ID:
nodeManager.start();
UUID id1 = nodeManager.getUUID();
nodeManager.stop();
nodeManager.start();
if (!Arrays.equals(id1.asBytes(), nodeManager.getUUID().asBytes())) {
throw new IllegalStateException("getUUID should be the same on restart");
}
break;
} }
} }
} }
public String[] getWork() { public int works() {
String[] strings = new String[work.length]; return work.length;
for (int i = 0, stringsLength = strings.length; i < stringsLength; i++) {
strings[i] = "" + work[i];
}
return strings;
} }
public static void main(String[] args) throws Exception { public int getWork(String[] works, int start) {
final int workLength = work.length;
for (int i = 0; i < workLength; i++) {
works[i + start] = Integer.toString(work[i]);
}
return workLength;
}
public static void execute(String[] args, NodeManager nodeManager) throws Exception {
int[] work1 = new int[args.length]; int[] work1 = new int[args.length];
for (int i = 0; i < args.length; i++) { for (int i = 0; i < args.length; i++) {
work1[i] = Integer.parseInt(args[i]); work1[i] = Integer.parseInt(args[i]);
} }
NodeManagerAction nodeManagerAction = new NodeManagerAction(work1); NodeManagerAction nodeManagerAction = new NodeManagerAction(work1);
FileLockNodeManager nodeManager = new FileLockNodeManager(new File("."), false);
nodeManager.start(); nodeManager.start();
try { try {
nodeManagerAction.performWork(nodeManager); nodeManagerAction.performWork(nodeManager);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
System.exit(9); System.exit(9);
} finally {
nodeManager.stop();
} }
} }

View File

@ -24,7 +24,9 @@ import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.util.SpawnedTestBase; import org.apache.activemq.artemis.tests.util.SpawnedTestBase;
import org.junit.Test; import org.junit.Test;
import static java.lang.management.ManagementFactory.getRuntimeMXBean;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.AWAIT_LIVE; import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.AWAIT_LIVE;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CHECK_ID;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CRASH_LIVE; import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CRASH_LIVE;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_BACKUP; import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_BACKUP;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_LIVE; import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_LIVE;
@ -38,6 +40,12 @@ import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerA
public class NodeManagerTest extends SpawnedTestBase { public class NodeManagerTest extends SpawnedTestBase {
@Test
public void testID() throws Exception {
NodeManagerAction live1 = new NodeManagerAction(CHECK_ID);
performWork(live1);
}
@Test @Test
public void testLive() throws Exception { public void testLive() throws Exception {
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE); NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
@ -155,6 +163,10 @@ public class NodeManagerTest extends SpawnedTestBase {
} }
} }
protected static boolean isDebug() {
return getRuntimeMXBean().getInputArguments().toString().contains("jdwp");
}
static class NodeRunner implements Runnable { static class NodeRunner implements Runnable {
private NodeManagerAction action; private NodeManagerAction action;

View File

@ -147,10 +147,7 @@ public class FileLockNodeManagerTest extends FailoverTestBase {
executors.add(executor); executors.add(executor);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration(); final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> { return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
code.printStackTrace();
Assert.fail(message);
});
case File: case File:
final Configuration config = createDefaultInVMConfig(); final Configuration config = createDefaultInVMConfig();
if (useSeparateLockFolder) { if (useSeparateLockFolder) {

View File

@ -116,10 +116,7 @@ public class NettyFailoverTest extends FailoverTest {
executors.add(executor); executors.add(executor);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration(); final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> { return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
code.printStackTrace();
Assert.fail(message);
});
default: default:
throw new AssertionError("enum type not supported!"); throw new AssertionError("enum type not supported!");
} }

View File

@ -100,7 +100,7 @@ public class CriticalCrashTest extends SpawnedTestBase {
@Override @Override
protected StorageManager createStorageManager() { protected StorageManager createStorageManager() {
JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) { JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
@Override @Override
public void readLock() { public void readLock() {
super.readLock(); super.readLock();

View File

@ -91,7 +91,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase {
@Override @Override
protected StorageManager createStorageManager() { protected StorageManager createStorageManager() {
JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) { JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
@Override @Override
protected Journal createMessageJournal(Configuration config, protected Journal createMessageJournal(Configuration config,

View File

@ -175,38 +175,38 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
protected final class FakeNodeManager extends NodeManager { protected final class FakeNodeManager extends NodeManager {
public FakeNodeManager(String nodeID) { public FakeNodeManager(String nodeID) {
super(false, null); super(false);
this.setNodeID(nodeID); this.setNodeID(nodeID);
} }
@Override @Override
public void awaitLiveNode() throws Exception { public void awaitLiveNode() {
} }
@Override @Override
public void awaitLiveStatus() throws Exception { public void awaitLiveStatus() {
} }
@Override @Override
public void startBackup() throws Exception { public void startBackup() {
} }
@Override @Override
public ActivateCallback startLiveNode() throws Exception { public ActivateCallback startLiveNode() {
return new CleaningActivateCallback() { return new CleaningActivateCallback() {
}; };
} }
@Override @Override
public void pauseLiveServer() throws Exception { public void pauseLiveServer() {
} }
@Override @Override
public void crashLiveServer() throws Exception { public void crashLiveServer() {
} }
@Override @Override
public void releaseBackup() throws Exception { public void releaseBackup() {
} }
@Override @Override
@ -215,12 +215,12 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
} }
@Override @Override
public boolean isAwaitingFailback() throws Exception { public boolean isAwaitingFailback() {
return false; return false;
} }
@Override @Override
public boolean isBackupLive() throws Exception { public boolean isBackupLive() {
return false; return false;
} }