ARTEMIS-2941 Improve JDBC HA connection resiliency

This commit is contained in:
franz1981 2020-09-28 13:49:20 +02:00 committed by Clebert Suconic
parent 04bb3d66b0
commit 4545749969
37 changed files with 1387 additions and 928 deletions

View File

@ -16,52 +16,50 @@
*/
package org.apache.activemq.artemis.core.server;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
public abstract class NodeManager implements ActiveMQComponent {
protected static final byte FIRST_TIME_START = '0';
public static final String SERVER_LOCK_NAME = "server.lock";
private static final String ACCESS_MODE = "rw";
@FunctionalInterface
public interface LockListener {
void lostLock();
}
private static final Logger LOGGER = Logger.getLogger(NodeManager.class);
protected final boolean replicatedBackup;
private final File directory;
private final Object nodeIDGuard = new Object();
protected final Object nodeIDGuard = new Object();
private SimpleString nodeID;
private UUID uuid;
private boolean isStarted = false;
private final Set<FileLockNodeManager.LockListener> lockListeners;
protected FileChannel channel;
public NodeManager(final boolean replicatedBackup, final File directory) {
this.directory = directory;
public NodeManager(final boolean replicatedBackup) {
this.replicatedBackup = replicatedBackup;
this.lockListeners = new HashSet<>();
}
// --------------------------------------------------------------------
public abstract void awaitLiveNode() throws Exception;
public abstract void awaitLiveNode() throws NodeManagerException, InterruptedException;
public abstract void awaitLiveStatus() throws Exception;
public abstract void awaitLiveStatus() throws NodeManagerException, InterruptedException;
public abstract void startBackup() throws Exception;
public abstract void startBackup() throws NodeManagerException, InterruptedException;
public abstract ActivateCallback startLiveNode() throws Exception;
public abstract ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException;
public abstract void pauseLiveServer() throws Exception;
public abstract void pauseLiveServer() throws NodeManagerException;
public abstract void crashLiveServer() throws Exception;
public abstract void crashLiveServer() throws NodeManagerException;
public abstract void releaseBackup() throws Exception;
public abstract void releaseBackup() throws NodeManagerException;
// --------------------------------------------------------------------
@ -81,7 +79,7 @@ public abstract class NodeManager implements ActiveMQComponent {
}
}
public abstract SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException;
public abstract SimpleString readNodeId() throws NodeManagerException;
public UUID getUUID() {
synchronized (nodeIDGuard) {
@ -113,119 +111,63 @@ public abstract class NodeManager implements ActiveMQComponent {
}
}
public abstract boolean isAwaitingFailback() throws Exception;
public abstract boolean isAwaitingFailback() throws NodeManagerException;
public abstract boolean isBackupLive() throws Exception;
public abstract boolean isBackupLive() throws NodeManagerException;
public abstract void interrupt();
@Override
public synchronized void stop() throws Exception {
FileChannel channelCopy = channel;
if (channelCopy != null)
channelCopy.close();
// force any running threads on node manager to stop
isStarted = false;
lockListeners.clear();
}
public void stopBackup() throws Exception {
if (replicatedBackup && getNodeId() != null) {
setUpServerLockFile();
}
public void stopBackup() throws NodeManagerException {
releaseBackup();
}
/**
* Ensures existence of persistent information about the server's nodeID.
* <p>
* Roughly the different use cases are:
* <ol>
* <li>old live server restarts: a server.lock file already exists and contains a nodeID.
* <li>new live server starting for the first time: no file exists, and we just *create* a new
* UUID to use as nodeID
* <li>replicated backup received its nodeID from its live: no file exists, we need to persist
* the *current* nodeID
* </ol>
*/
protected synchronized void setUpServerLockFile() throws IOException {
File serverLockFile = newFile(SERVER_LOCK_NAME);
protected synchronized void checkStarted() {
if (!isStarted) {
throw new IllegalStateException("the node manager is supposed to be started");
}
}
boolean fileCreated = false;
int count = 0;
while (!serverLockFile.exists()) {
protected synchronized void notifyLostLock() {
if (!isStarted) {
return;
}
lockListeners.forEach(lockListener -> {
try {
fileCreated = serverLockFile.createNewFile();
} catch (RuntimeException e) {
ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
throw e;
} catch (IOException e) {
/*
* on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
* */
if (count < 5) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
}
count++;
continue;
}
ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
throw e;
lockListener.lostLock();
} catch (Exception e) {
LOGGER.warn("On notify lost lock", e);
// Need to notify everyone so ignore any exception
}
}
@SuppressWarnings("resource")
RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE);
channel = raFile.getChannel();
if (fileCreated) {
ByteBuffer id = ByteBuffer.allocateDirect(3);
byte[] bytes = new byte[3];
bytes[0] = FIRST_TIME_START;
bytes[1] = FIRST_TIME_START;
bytes[2] = FIRST_TIME_START;
id.put(bytes, 0, 3);
id.position(0);
channel.write(id, 0);
channel.force(true);
}
createNodeId();
});
}
/**
* @return
*/
protected final File newFile(final String fileName) {
File file = new File(directory, fileName);
return file;
public synchronized void registerLockListener(FileLockNodeManager.LockListener lockListener) {
lockListeners.add(lockListener);
}
protected final synchronized void createNodeId() throws IOException {
synchronized (nodeIDGuard) {
ByteBuffer id = ByteBuffer.allocateDirect(16);
int read = channel.read(id, 3);
if (replicatedBackup) {
id.position(0);
id.put(getUUID().asBytes(), 0, 16);
id.position(0);
channel.write(id, 3);
channel.force(true);
} else if (read != 16) {
setUUID(UUIDGenerator.getInstance().generateUUID());
id.put(getUUID().asBytes(), 0, 16);
id.position(0);
channel.write(id, 3);
channel.force(true);
} else {
byte[] bytes = new byte[16];
id.position(0);
id.get(bytes);
setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
}
public synchronized void unregisterLockListener(FileLockNodeManager.LockListener lockListener) {
lockListeners.remove(lockListener);
}
public static final class NodeManagerException extends RuntimeException {
public NodeManagerException(String message) {
super(message);
}
public NodeManagerException(Throwable cause) {
super(cause);
}
public NodeManagerException(String message, Throwable cause) {
super(message, cause);
}
}
}

View File

@ -21,6 +21,7 @@ import java.util.List;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.ColocatedActivation;
import org.apache.activemq.artemis.core.server.impl.LiveActivation;
@ -91,8 +92,8 @@ public class ColocatedPolicy implements HAPolicy<LiveActivation> {
public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception {
return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, shutdownOnCriticalIO));
IOCriticalErrorListener ioCriticalErrorListener) throws Exception {
return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, ioCriticalErrorListener));
}
@Override

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -34,7 +35,7 @@ public interface HAPolicy<T extends Activation> {
T createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception;
IOCriticalErrorListener shutdownOnCriticalIO) throws Exception;
boolean isSharedStore();

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
@ -37,7 +38,7 @@ public class LiveOnlyPolicy implements HAPolicy<Activation> {
public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
IOCriticalErrorListener ioCriticalErrorListener) {
return new LiveOnlyActivation(server, this);
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -211,8 +212,8 @@ public class ReplicaPolicy extends BackupPolicy {
public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception {
SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, shutdownOnCriticalIO, this, networkHealthCheck);
IOCriticalErrorListener ioCriticalErrorListener) throws Exception {
SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, ioCriticalErrorListener, this, networkHealthCheck);
backupActivation.init();
return backupActivation;
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.NetworkHealthCheck;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveActivation;
@ -231,7 +232,7 @@ public class ReplicatedPolicy implements HAPolicy<LiveActivation> {
public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
IOCriticalErrorListener ioCriticalErrorListener) {
return new SharedNothingLiveActivation(server, this);
}

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.LiveActivation;
import org.apache.activemq.artemis.core.server.impl.SharedStoreLiveActivation;
@ -91,8 +92,8 @@ public class SharedStoreMasterPolicy implements HAPolicy<LiveActivation> {
public LiveActivation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
return new SharedStoreLiveActivation(server, this);
IOCriticalErrorListener ioCriticalErrorListener) {
return new SharedStoreLiveActivation(server, this, ioCriticalErrorListener);
}
@Override

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha;
import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.impl.Activation;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.SharedStoreBackupActivation;
@ -101,8 +102,8 @@ public class SharedStoreSlavePolicy extends BackupPolicy {
public Activation createActivation(ActiveMQServerImpl server,
boolean wasLive,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) {
return new SharedStoreBackupActivation(server, this);
IOCriticalErrorListener ioCriticalErrorListener) {
return new SharedStoreBackupActivation(server, this, ioCriticalErrorListener);
}
@Override

View File

@ -329,7 +329,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
private final Map<String, Object> activationParams = new HashMap<>();
protected final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener();
protected final IOCriticalErrorListener ioCriticalErrorListener = new DefaultCriticalErrorListener();
private final ActiveMQServer parentServer;
@ -522,7 +522,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence");
}
final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO);
manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory);
} else if (haType == null || haType == HAPolicyConfiguration.TYPE.LIVE_ONLY) {
if (logger.isDebugEnabled()) {
logger.debug("Detected no Shared Store HA options on JDBC store");
@ -610,7 +610,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean wasLive = !haPolicy.isBackup();
if (!haPolicy.isBackup()) {
activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener);
if (afterActivationCreated != null) {
try {
@ -636,9 +636,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// checking again here
if (haPolicy.isBackup()) {
if (haPolicy.isSharedStore()) {
activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO);
activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener);
} else {
activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO);
activation = haPolicy.createActivation(this, wasLive, activationParams, ioCriticalErrorListener);
}
if (afterActivationCreated != null) {
@ -1117,12 +1117,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
this.stop(failoverOnServerShutdown, criticalIOError, restarting, false);
}
private void stop(boolean failoverOnServerShutdown,
final boolean criticalIOError,
boolean restarting,
boolean isShutdown) {
stop(failoverOnServerShutdown, criticalIOError, isShutdown || criticalIOError, restarting, isShutdown);
}
/**
* Stops the server
*
* @param criticalIOError whether we have encountered an IO error with the journal etc
*/
void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting, boolean isShutdown) {
private void stop(boolean failoverOnServerShutdown,
final boolean criticalIOError,
boolean shutdownExternalComponents,
boolean restarting,
boolean isShutdown) {
if (logger.isDebugEnabled()) {
logger.debug("Stopping server " + this);
@ -1344,7 +1355,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
connectedClientIds.clear();
stopExternalComponents(isShutdown || criticalIOError);
stopExternalComponents(shutdownExternalComponents);
try {
this.analyzer.clear();
@ -2794,9 +2805,9 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected PagingStoreFactory getPagingStoreFactory() throws Exception {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration();
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage());
return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, ioCriticalErrorListener, configuration.isReadWholePage());
}
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage());
return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), ioCriticalErrorListener, configuration.isReadWholePage());
}
/**
@ -2805,12 +2816,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
protected StorageManager createStorageManager() {
if (configuration.isPersistenceEnabled()) {
if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) {
JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO);
JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, ioCriticalErrorListener);
this.getCriticalAnalyzer().add(journal);
return journal;
} else {
// Default to File Based Storage Manager, (Legacy default configuration).
JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO);
JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener);
this.getCriticalAnalyzer().add(journal);
return journal;
}
@ -3136,7 +3147,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
if (configuration.getMaxDiskUsage() != -1) {
try {
injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, shutdownOnCriticalIO));
injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, ioCriticalErrorListener));
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e);
}
@ -3929,23 +3940,23 @@ public class ActiveMQServerImpl implements ActiveMQServer {
// Inner classes
// --------------------------------------------------------------------------------
public final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener {
public final class DefaultCriticalErrorListener implements IOCriticalErrorListener {
boolean failedAlready = false;
private final AtomicBoolean failedAlready = new AtomicBoolean();
@Override
public synchronized void onIOException(Throwable cause, String message, SequentialFile file) {
if (!failedAlready) {
failedAlready = true;
if (file == null) {
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
} else {
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
}
stopTheServer(true);
if (!failedAlready.compareAndSet(false, true)) {
return;
}
if (file == null) {
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause);
} else {
ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause);
}
stopTheServer(true);
}
}

View File

@ -0,0 +1,156 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.server.impl;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
public abstract class FileBasedNodeManager extends NodeManager {
protected static final byte FIRST_TIME_START = '0';
public static final String SERVER_LOCK_NAME = "server.lock";
private static final String ACCESS_MODE = "rw";
private final File directory;
protected FileChannel channel;
public FileBasedNodeManager(boolean replicatedBackup, File directory) {
super(replicatedBackup);
this.directory = directory;
}
/**
* Ensures existence of persistent information about the server's nodeID.
* <p>
* Roughly the different use cases are:
* <ol>
* <li>old live server restarts: a server.lock file already exists and contains a nodeID.
* <li>new live server starting for the first time: no file exists, and we just *create* a new
* UUID to use as nodeID
* <li>replicated backup received its nodeID from its live: no file exists, we need to persist
* the *current* nodeID
* </ol>
*/
protected synchronized void setUpServerLockFile() throws IOException {
File serverLockFile = newFile(SERVER_LOCK_NAME);
boolean fileCreated = false;
int count = 0;
while (!serverLockFile.exists()) {
try {
fileCreated = serverLockFile.createNewFile();
} catch (RuntimeException e) {
ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
throw e;
} catch (IOException e) {
/*
* on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think
* */
if (count < 5) {
try {
Thread.sleep(100);
} catch (InterruptedException e1) {
}
count++;
continue;
}
ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile);
throw e;
}
}
@SuppressWarnings("resource")
RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE);
channel = raFile.getChannel();
if (fileCreated) {
ByteBuffer id = ByteBuffer.allocateDirect(3);
byte[] bytes = new byte[3];
bytes[0] = FIRST_TIME_START;
bytes[1] = FIRST_TIME_START;
bytes[2] = FIRST_TIME_START;
id.put(bytes, 0, 3);
id.position(0);
channel.write(id, 0);
channel.force(true);
}
createNodeId();
}
/**
* @return
*/
protected final File newFile(final String fileName) {
File file = new File(directory, fileName);
return file;
}
protected final synchronized void createNodeId() throws IOException {
synchronized (nodeIDGuard) {
ByteBuffer id = ByteBuffer.allocateDirect(16);
int read = channel.read(id, 3);
if (replicatedBackup) {
id.position(0);
id.put(getUUID().asBytes(), 0, 16);
id.position(0);
channel.write(id, 3);
channel.force(true);
} else if (read != 16) {
setUUID(UUIDGenerator.getInstance().generateUUID());
id.put(getUUID().asBytes(), 0, 16);
id.position(0);
channel.write(id, 3);
channel.force(true);
} else {
byte[] bytes = new byte[16];
id.position(0);
id.get(bytes);
setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
}
}
}
@Override
public synchronized void stop() throws Exception {
FileChannel channelCopy = channel;
if (channelCopy != null)
channelCopy.close();
super.stop();
}
@Override
public void stopBackup() throws NodeManagerException {
if (replicatedBackup && getNodeId() != null) {
try {
setUpServerLockFile();
} catch (IOException e) {
throw new NodeManagerException(e);
}
}
super.stopBackup();
}
}

View File

@ -22,23 +22,18 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID;
import org.jboss.logging.Logger;
public class FileLockNodeManager extends NodeManager {
public class FileLockNodeManager extends FileBasedNodeManager {
private static final Logger logger = Logger.getLogger(FileLockNodeManager.class);
@ -58,9 +53,9 @@ public class FileLockNodeManager extends NodeManager {
private static final byte NOT_STARTED = 'N';
private static final long LOCK_ACCESS_FAILURE_WAIT_TIME = 2000;
private static final long LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2);
private static final int LOCK_MONITOR_TIMEOUT_MILLIES = 2000;
private static final long LOCK_MONITOR_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(2);
private volatile FileLock liveLock;
@ -68,28 +63,31 @@ public class FileLockNodeManager extends NodeManager {
private final FileChannel[] lockChannels = new FileChannel[3];
protected long lockAcquisitionTimeout = -1;
private final long lockAcquisitionTimeoutNanos;
protected boolean interrupted = false;
private ScheduledExecutorService scheduledPool;
private final ScheduledExecutorService scheduledPool;
public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
this.scheduledPool = scheduledPool;
this.lockAcquisitionTimeoutNanos = -1;
}
public FileLockNodeManager(final File directory, boolean replicatedBackup) {
super(replicatedBackup, directory);
this.scheduledPool = null;
this.lockAcquisitionTimeoutNanos = -1;
}
public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout,
ScheduledExecutorService scheduledPool) {
public FileLockNodeManager(final File directory,
boolean replicatedBackup,
long lockAcquisitionTimeout,
ScheduledExecutorService scheduledPool) {
super(replicatedBackup, directory);
this.scheduledPool = scheduledPool;
this.lockAcquisitionTimeout = lockAcquisitionTimeout;
this.lockAcquisitionTimeoutNanos = lockAcquisitionTimeout == -1 ? -1 : TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeout);
}
@Override
@ -141,19 +139,23 @@ public class FileLockNodeManager extends NodeManager {
}
@Override
public boolean isAwaitingFailback() throws Exception {
public boolean isAwaitingFailback() throws NodeManagerException {
return getState() == FileLockNodeManager.FAILINGBACK;
}
@Override
public boolean isBackupLive() throws Exception {
FileLock liveAttemptLock;
liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS);
if (liveAttemptLock == null) {
return true;
} else {
liveAttemptLock.release();
return false;
public boolean isBackupLive() throws NodeManagerException {
try {
FileLock liveAttemptLock;
liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS);
if (liveAttemptLock == null) {
return true;
} else {
liveAttemptLock.release();
return false;
}
} catch (IOException e) {
throw new NodeManagerException(e);
}
}
@ -167,182 +169,218 @@ public class FileLockNodeManager extends NodeManager {
}
@Override
public final void releaseBackup() throws Exception {
if (backupLock != null) {
backupLock.release();
backupLock = null;
public final void releaseBackup() throws NodeManagerException {
try {
if (backupLock != null) {
backupLock.release();
backupLock = null;
}
} catch (IOException e) {
throw new NodeManagerException(e);
}
}
@Override
public void awaitLiveNode() throws Exception {
logger.debug("awaiting live node...");
do {
byte state = getState();
while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
logger.debug("awaiting live node startup state='" + state + "'");
Thread.sleep(2000);
public void awaitLiveNode() throws NodeManagerException, InterruptedException {
try {
logger.debug("awaiting live node...");
do {
byte state = getState();
while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) {
logger.debug("awaiting live node startup state='" + state + "'");
Thread.sleep(2000);
state = getState();
}
liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
if (interrupted) {
interrupted = false;
throw new InterruptedException("Lock was interrupted");
}
state = getState();
if (state == FileLockNodeManager.PAUSED) {
liveLock.release();
logger.debug("awaiting live node restarting");
Thread.sleep(2000);
} else if (state == FileLockNodeManager.FAILINGBACK) {
liveLock.release();
logger.debug("awaiting live node failing back");
Thread.sleep(2000);
} else if (state == FileLockNodeManager.LIVE) {
logger.debug("acquired live node lock state = " + (char) state);
break;
}
}
liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
if (interrupted) {
interrupted = false;
throw new InterruptedException("Lock was interrupted");
}
state = getState();
if (state == FileLockNodeManager.PAUSED) {
liveLock.release();
logger.debug("awaiting live node restarting");
Thread.sleep(2000);
} else if (state == FileLockNodeManager.FAILINGBACK) {
liveLock.release();
logger.debug("awaiting live node failing back");
Thread.sleep(2000);
} else if (state == FileLockNodeManager.LIVE) {
logger.debug("acquired live node lock state = " + (char) state);
break;
}
while (true);
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
while (true);
}
@Override
public void startBackup() throws Exception {
public void startBackup() throws NodeManagerException {
assert !replicatedBackup; // should not be called if this is a replicating backup
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS);
try {
backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS);
} catch (ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
ActiveMQServerLogger.LOGGER.gotBackupLock();
if (getUUID() == null)
readNodeId();
}
@Override
public ActivateCallback startLiveNode() throws Exception {
setFailingBack();
public ActivateCallback startLiveNode() throws NodeManagerException {
try {
setFailingBack();
String timeoutMessage = lockAcquisitionTimeout == -1 ? "indefinitely" : lockAcquisitionTimeout + " milliseconds";
String timeoutMessage = lockAcquisitionTimeoutNanos == -1 ? "indefinitely" : TimeUnit.NANOSECONDS.toMillis(lockAcquisitionTimeoutNanos) + " milliseconds";
ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage);
liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS);
ActiveMQServerLogger.LOGGER.obtainedLiveLock();
ActiveMQServerLogger.LOGGER.obtainedLiveLock();
return new CleaningActivateCallback() {
@Override
public void activationComplete() {
try {
setLive();
startLockMonitoring();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
return new CleaningActivateCallback() {
@Override
public void activationComplete() {
try {
setLive();
startLockMonitoring();
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
// that allows to restart/stop the broker if needed
throw e;
}
}
}
};
};
} catch (ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
}
@Override
public void pauseLiveServer() throws Exception {
public void pauseLiveServer() throws NodeManagerException {
stopLockMonitoring();
setPaused();
if (liveLock != null) {
liveLock.release();
try {
if (liveLock != null) {
liveLock.release();
}
} catch (IOException e) {
throw new NodeManagerException(e);
}
}
@Override
public void crashLiveServer() throws Exception {
public void crashLiveServer() throws NodeManagerException {
stopLockMonitoring();
if (liveLock != null) {
liveLock.release();
liveLock = null;
try {
liveLock.release();
} catch (IOException e) {
throw new NodeManagerException(e);
} finally {
liveLock = null;
}
}
}
@Override
public void awaitLiveStatus() throws Exception {
public void awaitLiveStatus() throws NodeManagerException, InterruptedException {
while (getState() != LIVE) {
Thread.sleep(2000);
}
}
private void setLive() throws Exception {
private void setLive() throws NodeManagerException {
writeFileLockStatus(FileLockNodeManager.LIVE);
}
private void setFailingBack() throws Exception {
private void setFailingBack() throws NodeManagerException {
writeFileLockStatus(FAILINGBACK);
}
private void setPaused() throws Exception {
private void setPaused() throws NodeManagerException {
writeFileLockStatus(PAUSED);
}
/**
* @param status
* @throws IOException
* @throws ActiveMQLockAcquisitionTimeoutException,IOException
*/
private void writeFileLockStatus(byte status) throws Exception {
private void writeFileLockStatus(byte status) throws NodeManagerException {
if (replicatedBackup && channel == null)
return;
logger.debug("writing status: " + status);
ByteBuffer bb = ByteBuffer.allocateDirect(1);
bb.put(status);
bb.position(0);
if (!channel.isOpen()) {
setUpServerLockFile();
}
FileLock lock = null;
try {
lock = lock(STATE_LOCK_POS);
channel.write(bb, 0);
channel.force(true);
} finally {
if (lock != null) {
lock.release();
if (!channel.isOpen()) {
setUpServerLockFile();
}
FileLock lock = null;
try {
lock = lock(STATE_LOCK_POS);
channel.write(bb, 0);
channel.force(true);
} finally {
if (lock != null) {
lock.release();
}
}
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
}
private byte getState() throws Exception {
byte result;
logger.debug("getting state...");
ByteBuffer bb = ByteBuffer.allocateDirect(1);
int read;
FileLock lock = null;
private byte getState() throws NodeManagerException {
try {
lock = lock(STATE_LOCK_POS);
read = channel.read(bb, 0);
if (read <= 0) {
result = FileLockNodeManager.NOT_STARTED;
} else {
result = bb.get(0);
}
} finally {
if (lock != null) {
lock.release();
byte result;
logger.debug("getting state...");
ByteBuffer bb = ByteBuffer.allocateDirect(1);
int read;
FileLock lock = null;
try {
lock = lock(STATE_LOCK_POS);
read = channel.read(bb, 0);
if (read <= 0) {
result = FileLockNodeManager.NOT_STARTED;
} else {
result = bb.get(0);
}
} finally {
if (lock != null) {
lock.release();
}
}
logger.debug("state: " + result);
return result;
} catch (IOException | ActiveMQLockAcquisitionTimeoutException e) {
throw new NodeManagerException(e);
}
logger.debug("state: " + result);
return result;
}
@Override
public final SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException {
ByteBuffer id = ByteBuffer.allocateDirect(16);
int read = channel.read(id, 3);
if (read != 16) {
throw new ActiveMQIllegalStateException("live server did not write id to file");
public final SimpleString readNodeId() throws NodeManagerException {
try {
ByteBuffer id = ByteBuffer.allocateDirect(16);
int read = channel.read(id, 3);
if (read != 16) {
throw new IOException("live server did not write id to file");
}
byte[] bytes = new byte[16];
id.position(0);
id.get(bytes);
setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
return getNodeId();
} catch (IOException e) {
throw new NodeManagerException(e);
}
byte[] bytes = new byte[16];
id.position(0);
id.get(bytes);
setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes));
return getNodeId();
}
protected FileLock tryLock(final int lockPos) throws IOException {
@ -361,8 +399,8 @@ public class FileLockNodeManager extends NodeManager {
}
}
protected FileLock lock(final int lockPosition) throws Exception {
long start = System.currentTimeMillis();
protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTimeoutException {
long start = System.nanoTime();
boolean isRecurringFailure = false;
while (!interrupted) {
@ -377,7 +415,7 @@ public class FileLockNodeManager extends NodeManager {
return null;
}
if (lockAcquisitionTimeout != -1 && (System.currentTimeMillis() - start) > lockAcquisitionTimeout) {
if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
}
} else {
@ -390,9 +428,9 @@ public class FileLockNodeManager extends NodeManager {
"Failure when accessing a lock file", e);
isRecurringFailure = true;
long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME;
if (lockAcquisitionTimeout != -1) {
final long remainingTime = lockAcquisitionTimeout - (System.currentTimeMillis() - start);
long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS;
if (this.lockAcquisitionTimeoutNanos != -1) {
final long remainingTime = this.lockAcquisitionTimeoutNanos - (System.nanoTime() - start);
if (remainingTime <= 0) {
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
}
@ -400,7 +438,7 @@ public class FileLockNodeManager extends NodeManager {
}
try {
Thread.sleep(waitTime);
TimeUnit.NANOSECONDS.sleep(waitTime);
} catch (InterruptedException interrupt) {
return null;
}
@ -414,7 +452,7 @@ public class FileLockNodeManager extends NodeManager {
private synchronized void startLockMonitoring() {
logger.debug("Starting the lock monitor");
if (monitorLock == null) {
monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_MILLIES, LOCK_MONITOR_TIMEOUT_MILLIES, TimeUnit.MILLISECONDS, false);
monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_NANOS, LOCK_MONITOR_TIMEOUT_NANOS, TimeUnit.NANOSECONDS, false);
monitorLock.start();
} else {
logger.debug("Lock monitor was already started");
@ -431,49 +469,22 @@ public class FileLockNodeManager extends NodeManager {
}
}
private void notifyLostLock() {
// Additional check we are not initializing or have no locking object anymore
// because of a shutdown
if (lockListeners != null && liveLock != null) {
Set<LockListener> lockListenersSnapshot = null;
// Snapshot of the set because I'm not sure if we can trigger concurrent
// modification exception here if we don't
synchronized (lockListeners) {
lockListenersSnapshot = new HashSet<>(lockListeners);
}
lockListenersSnapshot.forEach(lockListener -> {
try {
lockListener.lostLock();
} catch (Exception e) {
// Need to notify everyone so ignore any exception
}
});
@Override
protected synchronized void notifyLostLock() {
if (liveLock != null) {
super.notifyLostLock();
}
}
public void registerLockListener(LockListener lockListener) {
lockListeners.add(lockListener);
// This has been introduced to help ByteMan test testLockMonitorInvalid on JDK 11: sun.nio.ch.FileLockImpl::isValid
// can affecting setLive, causing an infinite loop due to java.nio.channels.OverlappingFileLockException on tryLock
private boolean isLiveLockLost() {
final FileLock lock = this.liveLock;
return (lock != null && !lock.isValid()) || lock == null;
}
public void unregisterLockListener(LockListener lockListener) {
lockListeners.remove(lockListener);
}
protected final Set<LockListener> lockListeners = Collections.synchronizedSet(new HashSet<LockListener>());
private MonitorLock monitorLock;
public abstract class LockListener {
protected abstract void lostLock() throws Exception;
protected void unregisterListener() {
lockListeners.remove(this);
}
}
public class MonitorLock extends ActiveMQScheduledComponent {
public MonitorLock(ScheduledExecutorService scheduledExecutorService,
long initialDelay,
@ -492,7 +503,7 @@ public class FileLockNodeManager extends NodeManager {
if (liveLock == null) {
logger.debug("Livelock is null");
}
lostLock = (liveLock != null && !liveLock.isValid()) || liveLock == null;
lostLock = isLiveLockLost();
if (!lostLock) {
logger.debug("Server still has the lock, double check status is live");
// Java always thinks the lock is still valid even when there is no filesystem

View File

@ -17,14 +17,10 @@
package org.apache.activemq.artemis.core.server.impl;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.Semaphore;
import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State.FAILING_BACK;
@ -39,7 +35,7 @@ import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State
* multiple servers are run inside the same VM and File Locks can not be shared in the
* same VM (it would cause a shared lock violation).
*/
public final class InVMNodeManager extends NodeManager {
public final class InVMNodeManager extends FileBasedNodeManager {
private final Semaphore liveLock;
@ -67,7 +63,7 @@ public final class InVMNodeManager extends NodeManager {
}
@Override
public void awaitLiveNode() throws Exception {
public void awaitLiveNode() throws InterruptedException {
do {
while (state == NOT_STARTED) {
Thread.sleep(10);
@ -92,51 +88,47 @@ public final class InVMNodeManager extends NodeManager {
}
@Override
public void awaitLiveStatus() throws Exception {
public void awaitLiveStatus() throws InterruptedException {
while (state != LIVE) {
Thread.sleep(10);
}
}
@Override
public void startBackup() throws Exception {
public void startBackup() throws InterruptedException {
backupLock.acquire();
}
@Override
public ActivateCallback startLiveNode() throws Exception {
public ActivateCallback startLiveNode() throws InterruptedException {
state = FAILING_BACK;
liveLock.acquire();
return new CleaningActivateCallback() {
@Override
public void activationComplete() {
try {
state = LIVE;
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
}
state = LIVE;
}
};
}
@Override
public void pauseLiveServer() throws Exception {
public void pauseLiveServer() {
state = PAUSED;
liveLock.release();
}
@Override
public void crashLiveServer() throws Exception {
public void crashLiveServer() {
liveLock.release();
}
@Override
public boolean isAwaitingFailback() throws Exception {
public boolean isAwaitingFailback() {
return state == FAILING_BACK;
}
@Override
public boolean isBackupLive() throws Exception {
public boolean isBackupLive() {
return liveLock.availablePermits() == 0;
}
@ -151,7 +143,7 @@ public final class InVMNodeManager extends NodeManager {
}
@Override
public SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException {
public SimpleString readNodeId() {
return getNodeId();
}
}

View File

@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.TopologyMember;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
@ -67,7 +68,7 @@ public final class SharedNothingBackupActivation extends Activation {
private SharedNothingBackupQuorum backupQuorum;
private final boolean attemptFailBack;
private final Map<String, Object> activationParams;
private final ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO;
private final IOCriticalErrorListener ioCriticalErrorListener;
private String nodeID;
ClusterControl clusterControl;
private boolean closed;
@ -79,13 +80,13 @@ public final class SharedNothingBackupActivation extends Activation {
public SharedNothingBackupActivation(ActiveMQServerImpl activeMQServer,
boolean attemptFailBack,
Map<String, Object> activationParams,
ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO,
IOCriticalErrorListener ioCriticalErrorListener,
ReplicaPolicy replicaPolicy,
NetworkHealthCheck networkHealthCheck) {
this.activeMQServer = activeMQServer;
this.attemptFailBack = attemptFailBack;
this.activationParams = activationParams;
this.shutdownOnCriticalIO = shutdownOnCriticalIO;
this.ioCriticalErrorListener = ioCriticalErrorListener;
this.replicaPolicy = replicaPolicy;
backupSyncLatch.setCount(1);
this.networkHealthCheck = networkHealthCheck;
@ -95,7 +96,7 @@ public final class SharedNothingBackupActivation extends Activation {
assert replicationEndpoint == null;
activeMQServer.resetNodeManager();
backupUpToDate = false;
replicationEndpoint = new ReplicationEndpoint(activeMQServer, shutdownOnCriticalIO, attemptFailBack, this);
replicationEndpoint = new ReplicationEndpoint(activeMQServer, ioCriticalErrorListener, attemptFailBack, this);
}
@Override

View File

@ -317,7 +317,7 @@ public class SharedNothingLiveActivation extends LiveActivation {
SimpleString nodeId0;
try {
nodeId0 = activeMQServer.getNodeManager().readNodeId();
} catch (ActiveMQIllegalStateException e) {
} catch (NodeManager.NodeManagerException e) {
nodeId0 = null;
}

View File

@ -16,18 +16,24 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException;
import org.apache.activemq.artemis.core.server.QueueFactory;
import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
@ -40,17 +46,24 @@ public final class SharedStoreBackupActivation extends Activation {
private static final Logger logger = Logger.getLogger(SharedStoreBackupActivation.class);
//this is how we act as a backup
private SharedStoreSlavePolicy sharedStoreSlavePolicy;
private final SharedStoreSlavePolicy sharedStoreSlavePolicy;
private ActiveMQServerImpl activeMQServer;
private final ActiveMQServerImpl activeMQServer;
private final Object failbackCheckerGuard = new Object();
private boolean cancelFailBackChecker;
public SharedStoreBackupActivation(ActiveMQServerImpl server, SharedStoreSlavePolicy sharedStoreSlavePolicy) {
private LockListener activeLockListener;
private final IOCriticalErrorListener ioCriticalErrorListener;
public SharedStoreBackupActivation(ActiveMQServerImpl server,
SharedStoreSlavePolicy sharedStoreSlavePolicy,
IOCriticalErrorListener ioCriticalErrorListener) {
this.activeMQServer = server;
this.sharedStoreSlavePolicy = sharedStoreSlavePolicy;
this.ioCriticalErrorListener = ioCriticalErrorListener;
synchronized (failbackCheckerGuard) {
cancelFailBackChecker = false;
}
@ -59,6 +72,8 @@ public final class SharedStoreBackupActivation extends Activation {
@Override
public void run() {
try {
registerActiveLockListener(activeMQServer.getNodeManager());
activeMQServer.getNodeManager().startBackup();
ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy();
@ -92,6 +107,11 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.completeActivation(false);
if (scalingDown) {
if (!restarting.compareAndSet(false, true)) {
return;
}
unregisterActiveLockListener(activeMQServer.getNodeManager());
ActiveMQServerLogger.LOGGER.backupServerScaledDown();
Thread t = new Thread(new Runnable() {
@Override
@ -117,6 +137,17 @@ public final class SharedStoreBackupActivation extends Activation {
if (sharedStoreSlavePolicy.isAllowAutoFailBack() && ActiveMQServerImpl.SERVER_STATE.STOPPING != activeMQServer.getState() && ActiveMQServerImpl.SERVER_STATE.STOPPED != activeMQServer.getState()) {
startFailbackChecker();
}
} catch (NodeManagerException nodeManagerException) {
if (nodeManagerException.getCause() instanceof ClosedChannelException) {
// this is ok, we are being stopped
return;
}
if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) {
ActiveMQServerLogger.LOGGER.initializationError(nodeManagerException.getCause());
return;
}
unregisterActiveLockListener(activeMQServer.getNodeManager());
ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null);
} catch (ClosedChannelException | InterruptedException e) {
// these are ok, we are being stopped
} catch (Exception e) {
@ -144,16 +175,25 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.interruptActivationThread(nodeManagerInUse);
if (nodeManagerInUse != null) {
unregisterActiveLockListener(nodeManagerInUse);
nodeManagerInUse.stopBackup();
}
} else {
if (nodeManagerInUse != null) {
unregisterActiveLockListener(nodeManagerInUse);
// if we are now live, behave as live
// We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is
// started before the live
if (sharedStoreSlavePolicy.isFailoverOnServerShutdown() || permanently) {
nodeManagerInUse.crashLiveServer();
try {
nodeManagerInUse.crashLiveServer();
} catch (Throwable t) {
if (!permanently) {
throw t;
}
logger.warn("Errored while closing activation: can be ignored because of permanent close", t);
}
} else {
nodeManagerInUse.pauseLiveServer();
}
@ -161,6 +201,27 @@ public final class SharedStoreBackupActivation extends Activation {
}
}
private void registerActiveLockListener(NodeManager nodeManager) {
LockListener lockListener = () -> {
if (!restarting.compareAndSet(false, true)) {
logger.warn("Restarting already happening on lost lock");
return;
}
unregisterActiveLockListener(nodeManager);
ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null);
};
activeLockListener = lockListener;
nodeManager.registerLockListener(lockListener);
}
private void unregisterActiveLockListener(NodeManager nodeManager) {
LockListener activeLockListener = this.activeLockListener;
if (activeLockListener != null) {
nodeManager.unregisterLockListener(activeLockListener);
this.activeLockListener = null;
}
}
@Override
public JournalLoader createJournalLoader(PostOffice postOffice,
PagingManager pagingManager,
@ -178,6 +239,8 @@ public final class SharedStoreBackupActivation extends Activation {
}
}
private final AtomicBoolean restarting = new AtomicBoolean(false);
/**
* To be called by backup trying to fail back the server
*/
@ -195,47 +258,50 @@ public final class SharedStoreBackupActivation extends Activation {
activeMQServer.getClusterManager().getDefaultConnection(null).addClusterTopologyListener(backupListener);
}
private boolean restarting = false;
@Override
public void run() {
try {
if (!restarting && activeMQServer.getNodeManager().isAwaitingFailback()) {
if (backupListener.waitForBackup()) {
ActiveMQServerLogger.LOGGER.awaitFailBack();
restarting = true;
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
logger.debug(activeMQServer + "::Stopping live node in favor of failback");
NodeManager nodeManager = activeMQServer.getNodeManager();
activeMQServer.stop(true, false, true);
// ensure that the server to which we are failing back actually starts fully before we restart
nodeManager.start();
try {
nodeManager.awaitLiveStatus();
} finally {
nodeManager.stop();
}
synchronized (failbackCheckerGuard) {
if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
return;
activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
logger.debug(activeMQServer + "::Starting backup node now after failback");
activeMQServer.start();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
}
}
});
t.start();
if (!restarting.get() && activeMQServer.getNodeManager().isAwaitingFailback() && backupListener.waitForBackup()) {
if (!restarting.compareAndSet(false, true)) {
return;
}
ActiveMQServerLogger.LOGGER.awaitFailBack();
Thread t = new Thread(new Runnable() {
@Override
public void run() {
try {
logger.debug(activeMQServer + "::Stopping live node in favor of failback");
NodeManager nodeManager = activeMQServer.getNodeManager();
activeMQServer.stop(true, false, true);
// ensure that the server to which we are failing back actually starts fully before we restart
nodeManager.start();
try {
nodeManager.awaitLiveStatus();
} finally {
nodeManager.stop();
}
synchronized (failbackCheckerGuard) {
if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup())
return;
activeMQServer.setHAPolicy(sharedStoreSlavePolicy);
logger.debug(activeMQServer + "::Starting backup node now after failback");
activeMQServer.start();
LockListener lockListener = activeLockListener;
if (lockListener != null) {
activeMQServer.getNodeManager().registerLockListener(lockListener);
}
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.serverRestartWarning(e);
}
}
});
t.start();
}
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.serverRestartWarning(e);

View File

@ -16,11 +16,17 @@
*/
package org.apache.activemq.artemis.core.server.impl;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
import org.jboss.logging.Logger;
public final class SharedStoreLiveActivation extends LiveActivation {
@ -28,17 +34,22 @@ public final class SharedStoreLiveActivation extends LiveActivation {
private static final Logger logger = Logger.getLogger(SharedStoreLiveActivation.class);
// this is how we act when we initially start as live
private SharedStoreMasterPolicy sharedStoreMasterPolicy;
private final SharedStoreMasterPolicy sharedStoreMasterPolicy;
private ActiveMQServerImpl activeMQServer;
private final ActiveMQServerImpl activeMQServer;
private volatile FileLockNodeManager.LockListener activeLockListener;
private volatile LockListener activeLockListener;
private volatile ActivateCallback nodeManagerActivateCallback;
public SharedStoreLiveActivation(ActiveMQServerImpl server, SharedStoreMasterPolicy sharedStoreMasterPolicy) {
private final IOCriticalErrorListener ioCriticalErrorListener;
public SharedStoreLiveActivation(ActiveMQServerImpl server,
SharedStoreMasterPolicy sharedStoreMasterPolicy,
IOCriticalErrorListener ioCriticalErrorListener) {
this.activeMQServer = server;
this.sharedStoreMasterPolicy = sharedStoreMasterPolicy;
this.ioCriticalErrorListener = ioCriticalErrorListener;
}
@Override
@ -71,9 +82,9 @@ public final class SharedStoreLiveActivation extends LiveActivation {
activeMQServer.getBackupManager().announceBackup();
}
registerActiveLockListener(activeMQServer.getNodeManager());
nodeManagerActivateCallback = activeMQServer.getNodeManager().startLiveNode();
activeMQServer.registerActivateCallback(nodeManagerActivateCallback);
addLockListener(activeMQServer, activeMQServer.getNodeManager());
if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED
|| activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) {
@ -85,59 +96,40 @@ public final class SharedStoreLiveActivation extends LiveActivation {
activeMQServer.completeActivation(false);
ActiveMQServerLogger.LOGGER.serverIsLive();
} catch (NodeManagerException nodeManagerException) {
if (nodeManagerException.getCause() instanceof ClosedChannelException) {
// this is ok, we are being stopped
return;
}
if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) {
onActivationFailure((ActiveMQLockAcquisitionTimeoutException) nodeManagerException.getCause());
return;
}
unregisterActiveLockListener(activeMQServer.getNodeManager());
ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.initializationError(e);
activeMQServer.callActivationFailureListeners(e);
onActivationFailure(e);
}
}
private void addLockListener(ActiveMQServerImpl activeMQServer, NodeManager nodeManager) {
if (nodeManager instanceof FileLockNodeManager) {
FileLockNodeManager fileNodeManager = (FileLockNodeManager) nodeManager;
activeLockListener = fileNodeManager.new LockListener() {
@Override
public void lostLock() {
stopStartServerInSeperateThread(activeMQServer);
}
};
fileNodeManager.registerLockListener(activeLockListener);
} // else no business registering a listener
private void onActivationFailure(Exception e) {
unregisterActiveLockListener(activeMQServer.getNodeManager());
ActiveMQServerLogger.LOGGER.initializationError(e);
activeMQServer.callActivationFailureListeners(e);
}
/**
* We need to do this in a new thread because this takes to long to finish in
* the scheduled thread Also this is not the responsibility of the scheduled
* thread
* @param activeMQServer
*/
private void stopStartServerInSeperateThread(ActiveMQServerImpl activeMQServer) {
try {
private void registerActiveLockListener(NodeManager nodeManager) {
LockListener lockListener = () ->
ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null);
activeLockListener = lockListener;
nodeManager.registerLockListener(lockListener);
}
Runnable startServerRunnable = new Runnable() {
@Override
public void run() {
try {
activeMQServer.stop(true, false);
} catch (Exception e) {
logger.warn("Failed to stop artemis server after loosing the lock", e);
}
try {
activeMQServer.start();
} catch (Exception e) {
logger.error("Failed to start artemis server after recovering from loosing the lock", e);
}
}
};
Thread startServer = new Thread(startServerRunnable);
startServer.start();
} catch (Exception e) {
logger.error(e.getMessage());
private void unregisterActiveLockListener(NodeManager nodeManager) {
LockListener activeLockListener = this.activeLockListener;
if (activeLockListener != null) {
nodeManager.unregisterLockListener(activeLockListener);
this.activeLockListener = null;
}
}
@ -147,16 +139,20 @@ public final class SharedStoreLiveActivation extends LiveActivation {
NodeManager nodeManagerInUse = activeMQServer.getNodeManager();
if (nodeManagerInUse != null) {
LockListener closeLockListener = activeLockListener;
if (closeLockListener != null) {
closeLockListener.unregisterListener();
}
unregisterActiveLockListener(nodeManagerInUse);
ActivateCallback activateCallback = nodeManagerActivateCallback;
if (activateCallback != null) {
activeMQServer.unregisterActivateCallback(activateCallback);
}
if (sharedStoreMasterPolicy.isFailoverOnServerShutdown() || permanently) {
nodeManagerInUse.crashLiveServer();
try {
nodeManagerInUse.crashLiveServer();
} catch (Throwable t) {
if (!permanently) {
throw t;
}
logger.warn("Errored while closing activation: can be ignored because of permanent close", t);
}
} else {
nodeManagerInUse.pauseLiveServer();
}

View File

@ -20,13 +20,13 @@ package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.jboss.logging.Logger;
/**
* Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}.
* Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, LockListener)}.
*/
final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock {
@ -36,14 +36,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
private final LeaseLock lock;
private long lastLockRenewStart;
private final long renewPeriodMillis;
private final IOCriticalErrorListener ioCriticalErrorListener;
private final LockListener lockListener;
ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor,
String lockName,
LeaseLock lock,
long renewPeriodMillis,
IOCriticalErrorListener ioCriticalErrorListener) {
LockListener lockListener) {
super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false);
if (renewPeriodMillis >= lock.expirationMillis()) {
throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis");
@ -51,9 +51,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
this.lockName = lockName;
this.lock = lock;
this.renewPeriodMillis = renewPeriodMillis;
//already expired start time
// already expired start time
this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis());
this.ioCriticalErrorListener = ioCriticalErrorListener;
this.lockListener = lockListener;
}
@Override
public String lockName() {
return lockName;
}
@Override
@ -84,37 +89,55 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem
}
@Override
public void run() {
public synchronized void run() {
if (!isStarted()) {
return;
}
final long lastRenewStart = this.lastLockRenewStart;
final long renewStart = System.nanoTime();
boolean lockLost = true;
try {
if (!this.lock.renew()) {
ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null);
}
lockLost = !this.lock.renew();
} catch (Throwable t) {
ioCriticalErrorListener.onIOException(t, "Critical error while on " + lockName + " renew", null);
throw t;
LOGGER.warnf(t, "%s lock renew has failed", lockName);
if (lock.localExpirationTime() > 0) {
final long millisBeforeExpiration = (lock.localExpirationTime() - System.currentTimeMillis());
// there is enough time to retry to renew it?
if (millisBeforeExpiration >= this.renewPeriodMillis) {
lockLost = false;
}
}
}
// a failed attempt to renew is treated as a lost lock
if (lockLost) {
try {
lockListener.lostLock();
} catch (Throwable t) {
LOGGER.warnf(t, "Errored while notifying %s lock listener", lockName);
}
}
//logic to detect slowness of DB and/or the scheduled executor service
detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis());
detectAndReportRenewSlowness(lockName, lockLost, lastRenewStart,
renewStart, renewPeriodMillis, lock.expirationMillis());
this.lastLockRenewStart = renewStart;
}
private static void detectAndReportRenewSlowness(String lockName,
boolean lostLock,
long lastRenewStart,
long renewStart,
long expectedRenewPeriodMillis,
long expirationMillis) {
final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart);
if (elapsedMillisToRenew > expectedRenewPeriodMillis) {
LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms");
LOGGER.errorf("%s lock %s renew tooks %d ms, while is supposed to take <%d ms", lockName, lostLock ? "failed" : "successful", elapsedMillisToRenew, expectedRenewPeriodMillis);
}
final long measuredRenewPeriodNanos = renewStart - lastRenewStart;
final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos);
if (measuredRenewPeriodMillis - expirationMillis > 100) {
LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
LOGGER.errorf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis);
} else if (measuredRenewPeriodMillis - expectedRenewPeriodMillis > 100) {
LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms");
LOGGER.warnf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis);
}
}
}

View File

@ -44,8 +44,10 @@ final class JdbcLeaseLock implements LeaseLock {
private final String isLocked;
private final String currentDateTime;
private final long expirationMillis;
private final int queryTimeout;
private boolean maybeAcquired;
private final String lockName;
private long localExpirationTime;
/**
* The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection},
@ -59,6 +61,7 @@ final class JdbcLeaseLock implements LeaseLock {
String isLocked,
String currentDateTime,
long expirationMIllis,
long queryTimeoutMillis,
String lockName) {
if (holderId.length() > MAX_HOLDER_ID_LENGTH) {
throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH);
@ -73,12 +76,30 @@ final class JdbcLeaseLock implements LeaseLock {
this.maybeAcquired = false;
this.connectionProvider = connectionProvider;
this.lockName = lockName;
this.localExpirationTime = -1;
int expectedTimeout = (int) (queryTimeoutMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(queryTimeoutMillis) : -1);
if (queryTimeoutMillis >= 0) {
LOGGER.warn("queryTimeoutMillis is too low: it's suggested to configure a multi-seconds value. Disabling it because too low.");
expectedTimeout = -1;
}
this.queryTimeout = expectedTimeout;
}
public String holderId() {
return holderId;
}
/**
* Given that many DBMS won't support standard SQL queries to collect CURRENT_TIMESTAMP at milliseconds granularity,
* this value is stripped of the milliseconds part, making it less optimistic then the reality, if >= 0.<p>
* It's commonly used as an hard deadline for JDBC operations, hence is fine to not have a high precision.
*/
@Override
public long localExpirationTime() {
return localExpirationTime;
}
@Override
public long expirationMillis() {
return expirationMillis;
@ -115,17 +136,24 @@ final class JdbcLeaseLock implements LeaseLock {
}
private long dbCurrentTimeMillis(Connection connection) throws SQLException {
final long start = System.nanoTime();
try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) {
if (queryTimeout >= 0) {
currentDateTime.setQueryTimeout(queryTimeout);
}
final long startTime = stripMilliseconds(System.currentTimeMillis());
try (ResultSet resultSet = currentDateTime.executeQuery()) {
resultSet.next();
final long endTime = stripMilliseconds(System.currentTimeMillis());
final Timestamp currentTimestamp = resultSet.getTimestamp(1);
final long elapsedTime = System.nanoTime() - start;
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms",
lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime));
final long currentTime = currentTimestamp.getTime();
final long currentTimeMillis = stripMilliseconds(currentTime);
if (currentTimeMillis < startTime) {
LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen AFTER %s on broker", lockName, holderId, currentTimestamp, new Timestamp(startTime));
}
return currentTimestamp.getTime();
if (currentTimeMillis > endTime) {
LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen BEFORE %s on broker", lockName, holderId, currentTimestamp, new Timestamp(endTime));
}
return currentTime;
}
}
}
@ -138,7 +166,8 @@ final class JdbcLeaseLock implements LeaseLock {
connection.setAutoCommit(false);
try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) {
final long now = dbCurrentTimeMillis(connection);
final Timestamp expirationTime = new Timestamp(now + expirationMillis);
final long localExpirationTime = now + expirationMillis;
final Timestamp expirationTime = new Timestamp(localExpirationTime);
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s",
lockName, holderId, expirationTime);
@ -151,11 +180,13 @@ final class JdbcLeaseLock implements LeaseLock {
final boolean renewed = updatedRows == 1;
connection.commit();
if (!renewed) {
this.localExpirationTime = -1;
if (LOGGER.isDebugEnabled()) {
LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }",
lockName, holderId, readableLockStatus());
}
} else {
this.localExpirationTime = stripMilliseconds(localExpirationTime);
LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId);
}
return renewed;
@ -170,6 +201,10 @@ final class JdbcLeaseLock implements LeaseLock {
}
}
private static long stripMilliseconds(long time) {
return (time / 1000) * 1000;
}
@Override
public boolean tryAcquire() {
try (Connection connection = connectionProvider.getConnection()) {
@ -179,7 +214,8 @@ final class JdbcLeaseLock implements LeaseLock {
try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) {
final long now = dbCurrentTimeMillis(connection);
preparedStatement.setString(1, holderId);
final Timestamp expirationTime = new Timestamp(now + expirationMillis);
final long localExpirationTime = now + expirationMillis;
final Timestamp expirationTime = new Timestamp(localExpirationTime);
preparedStatement.setTimestamp(2, expirationTime);
preparedStatement.setTimestamp(3, expirationTime);
LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s",
@ -188,6 +224,7 @@ final class JdbcLeaseLock implements LeaseLock {
connection.commit();
if (acquired) {
this.maybeAcquired = true;
this.localExpirationTime = stripMilliseconds(localExpirationTime);
LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId);
} else {
if (LOGGER.isDebugEnabled()) {
@ -272,6 +309,7 @@ final class JdbcLeaseLock implements LeaseLock {
preparedStatement.setString(1, holderId);
final boolean released = preparedStatement.executeUpdate() == 1;
//consider it as released to avoid on finalize to be reclaimed
this.localExpirationTime = -1;
this.maybeAcquired = false;
connection.commit();
if (!released) {

View File

@ -23,8 +23,8 @@ import java.util.function.Supplier;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback;
@ -36,6 +36,8 @@ import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.server.impl.jdbc.LeaseLock.AcquireResult.Timeout;
/**
* JDBC implementation of {@link NodeManager}.
*/
@ -53,12 +55,10 @@ public final class JdbcNodeManager extends NodeManager {
private final long lockAcquisitionTimeoutMillis;
private volatile boolean interrupted = false;
private final LeaseLock.Pauser pauser;
private final IOCriticalErrorListener ioCriticalErrorListener;
public static JdbcNodeManager with(DatabaseStorageConfiguration configuration,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
ExecutorFactory executorFactory) {
validateTimeoutConfiguration(configuration);
final SQLProvider.Factory sqlProviderFactory;
if (configuration.getSqlProviderFactory() != null) {
@ -74,8 +74,7 @@ public final class JdbcNodeManager extends NodeManager {
configuration.getConnectionProvider(),
sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER),
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
executorFactory);
}
private static JdbcNodeManager usingConnectionProvider(String brokerId,
@ -85,18 +84,16 @@ public final class JdbcNodeManager extends NodeManager {
JDBCConnectionProvider connectionProvider,
SQLProvider provider,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
return new JdbcNodeManager(
() -> JdbcSharedStateManager.usingConnectionProvider(brokerId,
lockExpirationMillis,
connectionProvider,
provider),
ExecutorFactory executorFactory) {
return new JdbcNodeManager(() -> JdbcSharedStateManager.usingConnectionProvider(brokerId, lockExpirationMillis,
lockRenewPeriodMillis,
connectionProvider,
provider),
lockExpirationMillis,
lockRenewPeriodMillis,
lockAcquisitionTimeoutMillis,
scheduledExecutorService,
executorFactory,
ioCriticalErrorListener);
executorFactory);
}
private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) {
@ -122,12 +119,12 @@ public final class JdbcNodeManager extends NodeManager {
}
private JdbcNodeManager(Supplier<? extends SharedStateManager> sharedStateManagerFactory,
long lockExpirationMillis,
long lockRenewPeriodMillis,
long lockAcquisitionTimeoutMillis,
ScheduledExecutorService scheduledExecutorService,
ExecutorFactory executorFactory,
IOCriticalErrorListener ioCriticalErrorListener) {
super(false, null);
ExecutorFactory executorFactory) {
super(false);
this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis;
this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS);
this.sharedStateManagerFactory = sharedStateManagerFactory;
@ -137,7 +134,7 @@ public final class JdbcNodeManager extends NodeManager {
"live",
this.sharedStateManager.liveLock(),
lockRenewPeriodMillis,
ioCriticalErrorListener);
this::notifyLostLock);
this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of(
scheduledExecutorService,
executorFactory != null ?
@ -145,35 +142,47 @@ public final class JdbcNodeManager extends NodeManager {
"backup",
this.sharedStateManager.backupLock(),
lockRenewPeriodMillis,
ioCriticalErrorListener);
this.ioCriticalErrorListener = ioCriticalErrorListener;
this::notifyLostLock);
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
}
@Override
public void start() throws Exception {
protected synchronized void notifyLostLock() {
try {
synchronized (this) {
if (isStarted()) {
return;
}
this.sharedStateManager = sharedStateManagerFactory.get();
LOGGER.debug("setup sharedStateManager on start");
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
setUUID(nodeId);
this.scheduledLiveLock = scheduledLiveLockFactory.get();
this.scheduledBackupLock = scheduledBackupLockFactory.get();
super.start();
super.notifyLostLock();
} finally {
// if any of the notified listener has stopped the node manager or
// the node manager was already stopped
if (!isStarted()) {
return;
}
try {
stop();
} catch (Exception ex) {
LOGGER.warn("Stopping node manager has errored on lost lock notification", ex);
}
}
}
@Override
public synchronized void start() throws Exception {
try {
if (isStarted()) {
return;
}
this.sharedStateManager = sharedStateManagerFactory.get();
LOGGER.debug("setup sharedStateManager on start");
final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID);
setUUID(nodeId);
this.scheduledLiveLock = scheduledLiveLockFactory.get();
this.scheduledBackupLock = scheduledBackupLockFactory.get();
super.start();
} catch (IllegalStateException e) {
this.sharedStateManager = null;
this.scheduledLiveLock = null;
this.scheduledBackupLock = null;
if (this.ioCriticalErrorListener != null) {
this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null);
}
throw e;
}
}
@ -200,42 +209,33 @@ public final class JdbcNodeManager extends NodeManager {
}
@Override
public boolean isAwaitingFailback() throws Exception {
public boolean isAwaitingFailback() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER isAwaitingFailback");
try {
return readSharedState() == SharedStateManager.State.FAILING_BACK;
} catch (IllegalStateException e) {
LOGGER.warn("cannot retrieve the live state: assume it's not yet failed back", e);
return false;
} finally {
LOGGER.debug("EXIT isAwaitingFailback");
}
}
@Override
public boolean isBackupLive() throws Exception {
public boolean isBackupLive() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER isBackupLive");
try {
//is anyone holding the live lock?
return this.scheduledLiveLock.lock().isHeld();
} catch (IllegalStateException e) {
throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT isBackupLive");
}
}
@Override
public void stopBackup() throws Exception {
LOGGER.debug("ENTER stopBackup");
try {
if (this.scheduledBackupLock.isStarted()) {
LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock");
this.scheduledBackupLock.stop();
this.scheduledBackupLock.lock().release();
} else {
LOGGER.debug("scheduledBackupLock is not running");
}
} finally {
LOGGER.debug("EXIT stopBackup");
}
}
@Override
public void interrupt() {
LOGGER.debug("ENTER interrupted");
@ -245,7 +245,8 @@ public final class JdbcNodeManager extends NodeManager {
}
@Override
public void releaseBackup() throws Exception {
public void releaseBackup() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER releaseBackup");
try {
if (this.scheduledBackupLock.isStarted()) {
@ -255,19 +256,47 @@ public final class JdbcNodeManager extends NodeManager {
} else {
LOGGER.debug("scheduledBackupLock is not running");
}
} catch (IllegalStateException e) {
throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT releaseBackup");
}
}
/**
* Try to acquire a lock, failing with an exception otherwise.
* Try to acquire a lock
*/
private void lock(LeaseLock lock) throws Exception {
final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted);
private void lock(LeaseLock lock) throws ActiveMQLockAcquisitionTimeoutException, InterruptedException {
final long lockAcquisitionTimeoutNanos = lockAcquisitionTimeoutMillis >= 0 ?
TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeoutMillis) : -1;
LeaseLock.AcquireResult acquireResult = null;
final long start = System.nanoTime();
while (acquireResult == null) {
checkStarted();
// measure distance from the timeout
final long remainingNanos = remainingNanos(start, lockAcquisitionTimeoutNanos);
if (remainingNanos == 0) {
acquireResult = Timeout;
continue;
}
final long remainingMillis = remainingNanos > 0 ? TimeUnit.NANOSECONDS.toMillis(remainingNanos) : -1;
try {
acquireResult = lock.tryAcquire(remainingMillis, this.pauser, () -> !this.interrupted);
} catch (IllegalStateException e) {
LOGGER.warn("Errored while trying to acquire lock", e);
if (remainingNanos(start, lockAcquisitionTimeoutNanos) == 0) {
acquireResult = Timeout;
continue;
}
// that's not precise, but it's ok: it can trigger the timeout right after the pause,
// depending by the pause length. The sole purpose of the pause is to save
// hammering with requests the DBMS if the connection is down
this.pauser.idle();
}
}
switch (acquireResult) {
case Timeout:
throw new Exception("timed out waiting for lock");
throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock");
case Exit:
this.interrupted = false;
throw new InterruptedException("LeaseLock was interrupted");
@ -279,6 +308,20 @@ public final class JdbcNodeManager extends NodeManager {
}
private static long remainingNanos(long start, long timeoutNanos) {
if (timeoutNanos > 0) {
final long elapsedNanos = (System.nanoTime() - start);
if (elapsedNanos < timeoutNanos) {
return timeoutNanos - elapsedNanos;
} else {
return 0;
}
} else {
assert timeoutNanos == -1;
return -1;
}
}
private void checkInterrupted(Supplier<String> message) throws InterruptedException {
if (this.interrupted) {
interrupted = false;
@ -286,52 +329,77 @@ public final class JdbcNodeManager extends NodeManager {
}
}
private void renewLiveLockIfNeeded(final long acquiredOn) {
final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn);
if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) {
if (!this.scheduledLiveLock.lock().renew()) {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null);
throw e;
private void renewLock(ScheduledLeaseLock lock) {
boolean lostLock = true;
IllegalStateException renewEx = null;
try {
lostLock = !this.scheduledLiveLock.lock().renew();
} catch (IllegalStateException e) {
renewEx = e;
}
if (lostLock) {
notifyLostLock();
if (renewEx == null) {
renewEx = new IllegalStateException(lock.lockName() + " lock isn't renewed");
}
throw renewEx;
}
}
/**
* Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise
*/
private boolean lockLiveAndCheckLiveState() throws Exception {
lock(this.scheduledLiveLock.lock());
final long acquiredOn = System.nanoTime();
boolean liveWhileLocked = false;
//check if the state is live
final SharedStateManager.State stateWhileLocked;
private boolean lockLiveAndCheckLiveState() throws ActiveMQLockAcquisitionTimeoutException, InterruptedException {
try {
stateWhileLocked = readSharedState();
} catch (Throwable t) {
LOGGER.error("error while holding the live node lock and tried to read the shared state", t);
this.scheduledLiveLock.lock().release();
throw t;
lock(this.scheduledLiveLock.lock());
//check if the state is live
while (true) {
try {
final SharedStateManager.State stateWhileLocked = readSharedState();
final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime();
if (System.currentTimeMillis() > localExpirationTime) {
// the lock can be assumed to be expired,
// so the state isn't worthy to be considered
return false;
}
if (stateWhileLocked == SharedStateManager.State.LIVE) {
// TODO need some tolerance//renew here?
return true;
} else {
// state is not live: can (try to) release the lock
this.scheduledLiveLock.lock().release();
return false;
}
} catch (IllegalStateException e) {
LOGGER.error("error while holding the live node lock and tried to read the shared state or to release the lock", e);
checkStarted();
checkInterrupted(() -> "interrupt on error while checking live state");
pauser.idle();
final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime();
if (System.currentTimeMillis() > localExpirationTime) {
return false;
}
}
}
} catch (InterruptedException e) {
throw e;
}
if (stateWhileLocked == SharedStateManager.State.LIVE) {
renewLiveLockIfNeeded(acquiredOn);
liveWhileLocked = true;
} else {
LOGGER.debugf("state is %s while holding the live lock: releasing live lock", stateWhileLocked);
//state is not live: can (try to) release the lock
this.scheduledLiveLock.lock().release();
}
return liveWhileLocked;
}
@Override
public void awaitLiveNode() throws Exception {
public void awaitLiveNode() throws NodeManagerException, InterruptedException {
checkStarted();
LOGGER.debug("ENTER awaitLiveNode");
try {
boolean liveWhileLocked = false;
while (!liveWhileLocked) {
//check first without holding any lock
final SharedStateManager.State state = readSharedState();
SharedStateManager.State state = null;
try {
state = readSharedState();
} catch (IllegalStateException e) {
LOGGER.warn("Errored while reading shared state", e);
}
if (state == SharedStateManager.State.LIVE) {
//verify if the state is live while holding the live node lock too
liveWhileLocked = lockLiveAndCheckLiveState();
@ -339,6 +407,7 @@ public final class JdbcNodeManager extends NodeManager {
LOGGER.debugf("state while awaiting live node: %s", state);
}
if (!liveWhileLocked) {
checkStarted();
checkInterrupted(() -> "awaitLiveNode got interrupted!");
pauser.idle();
}
@ -346,32 +415,51 @@ public final class JdbcNodeManager extends NodeManager {
//state is LIVE and live lock is acquired and valid
LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE);
this.scheduledLiveLock.start();
} catch (InterruptedException e) {
throw e;
} catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT awaitLiveNode");
}
}
@Override
public void startBackup() throws Exception {
public void startBackup() throws NodeManagerException, InterruptedException {
checkStarted();
LOGGER.debug("ENTER startBackup");
try {
ActiveMQServerLogger.LOGGER.waitingToBecomeBackup();
lock(scheduledBackupLock.lock());
scheduledBackupLock.start();
ActiveMQServerLogger.LOGGER.gotBackupLock();
if (getUUID() == null)
readNodeId();
} catch (InterruptedException ie) {
throw ie;
} catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT startBackup");
}
}
@Override
public ActivateCallback startLiveNode() throws Exception {
public ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException {
checkStarted();
LOGGER.debug("ENTER startLiveNode");
try {
setFailingBack();
boolean done = false;
while (!done) {
try {
setFailingBack();
done = true;
} catch (IllegalStateException e) {
LOGGER.warn("cannot set failing back state, retry", e);
pauser.idle();
checkInterrupted(() -> "interrupt while trying to set failing back state");
}
}
final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds";
@ -389,21 +477,42 @@ public final class JdbcNodeManager extends NodeManager {
LOGGER.debug("ENTER activationComplete");
try {
//state can be written only if the live renew task is running
setLive();
} catch (Exception e) {
boolean done = false;
while (!done) {
try {
setLive();
done = true;
} catch (IllegalStateException e) {
LOGGER.warn("Errored while trying to setLive", e);
checkStarted();
pauser.idle();
final long localExpirationTime = scheduledLiveLock.lock().localExpirationTime();
// optimistic: is just to set a deadline while retrying
if (System.currentTimeMillis() > localExpirationTime) {
throw new IllegalStateException("live lock is probably expired: failed to setLive");
}
}
}
} catch (IllegalStateException e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT activationComplete");
}
}
};
} catch (InterruptedException ie) {
throw ie;
} catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) {
throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT startLiveNode");
}
}
@Override
public void pauseLiveServer() throws Exception {
public void pauseLiveServer() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER pauseLiveServer");
try {
if (scheduledLiveLock.isStarted()) {
@ -413,23 +522,21 @@ public final class JdbcNodeManager extends NodeManager {
scheduledLiveLock.lock().release();
} else {
LOGGER.debug("scheduledLiveLock is not running: try renew live lock");
if (scheduledLiveLock.lock().renew()) {
LOGGER.debug("live lock renewed: set paused shared state and release live lock");
setPaused();
scheduledLiveLock.lock().release();
} else {
final IllegalStateException e = new IllegalStateException("live lock can't be renewed");
ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null);
throw e;
}
renewLock(scheduledLiveLock);
LOGGER.debug("live lock renewed: set paused shared state and release live lock");
setPaused();
scheduledLiveLock.lock().release();
}
} catch (IllegalStateException e) {
throw new NodeManagerException(e);
} finally {
LOGGER.debug("EXIT pauseLiveServer");
}
}
@Override
public void crashLiveServer() throws Exception {
public void crashLiveServer() throws NodeManagerException {
checkStarted();
LOGGER.debug("ENTER crashLiveServer");
try {
if (this.scheduledLiveLock.isStarted()) {
@ -446,10 +553,18 @@ public final class JdbcNodeManager extends NodeManager {
@Override
public void awaitLiveStatus() {
checkStarted();
LOGGER.debug("ENTER awaitLiveStatus");
try {
while (readSharedState() != SharedStateManager.State.LIVE) {
SharedStateManager.State state = null;
while (state != SharedStateManager.State.LIVE) {
try {
state = readSharedState();
} catch (IllegalStateException e) {
LOGGER.warn("Errored while trying to read shared state", e);
}
pauser.idle();
checkStarted();
}
} finally {
LOGGER.debug("EXIT awaitLiveStatus");
@ -481,6 +596,7 @@ public final class JdbcNodeManager extends NodeManager {
@Override
public SimpleString readNodeId() {
checkStarted();
final UUID nodeId = this.sharedStateManager.readNodeId();
LOGGER.debugf("readNodeId nodeId = %s", nodeId);
setUUID(nodeId);

View File

@ -39,6 +39,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private static final int MAX_SETUP_ATTEMPTS = 20;
private final String holderId;
private final long lockExpirationMillis;
private final long queryTimeoutMillis;
private JdbcLeaseLock liveLock;
private JdbcLeaseLock backupLock;
private String readNodeId;
@ -48,10 +49,19 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
private String writeState;
public static JdbcSharedStateManager usingConnectionProvider(String holderId,
long locksExpirationMillis,
JDBCConnectionProvider connectionProvider,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis);
long locksExpirationMillis,
JDBCConnectionProvider connectionProvider,
SQLProvider provider) {
return usingConnectionProvider(holderId, locksExpirationMillis, -1, connectionProvider, provider);
}
public static JdbcSharedStateManager usingConnectionProvider(String holderId,
long locksExpirationMillis,
long queryTimeoutMillis,
JDBCConnectionProvider connectionProvider,
SQLProvider provider) {
final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis,
queryTimeoutMillis);
sharedStateManager.setJdbcConnectionProvider(connectionProvider);
sharedStateManager.setSqlProvider(provider);
try {
@ -76,20 +86,35 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider,
long expirationMillis) {
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "LIVE");
return createLiveLock(holderId, connectionProvider, sqlProvider, expirationMillis, -1);
}
static JdbcLeaseLock createLiveLock(String holderId,
JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider,
long expirationMillis,
long queryTimeoutMillis) {
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(),
sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(),
sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(),
expirationMillis, queryTimeoutMillis, "LIVE");
}
static JdbcLeaseLock createBackupLock(String holderId,
JDBCConnectionProvider connectionProvider,
SQLProvider sqlProvider,
long expirationMillis) {
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "BACKUP");
long expirationMillis,
long queryTimeoutMillis) {
return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(),
sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(),
sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(),
expirationMillis, queryTimeoutMillis, "BACKUP");
}
@Override
protected void prepareStatements() {
this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis);
this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis);
this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis);
this.readNodeId = sqlProvider.readNodeIdSQL();
this.writeNodeId = sqlProvider.writeNodeIdSQL();
this.initializeNodeId = sqlProvider.initializeNodeIdSQL();
@ -97,9 +122,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS
this.readState = sqlProvider.readStateSQL();
}
private JdbcSharedStateManager(String holderId, long lockExpirationMillis) {
private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long queryTimeoutMillis) {
this.holderId = holderId;
this.lockExpirationMillis = lockExpirationMillis;
this.queryTimeoutMillis = queryTimeoutMillis;
}
@Override

View File

@ -41,6 +41,8 @@ interface LeaseLock extends AutoCloseable {
boolean keepRunning();
}
long localExpirationTime();
interface Pauser {
void idle();

View File

@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.server.impl.jdbc;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
/**
@ -28,17 +28,25 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
*/
interface ScheduledLeaseLock extends ActiveMQComponent {
@Override
void start();
@Override
void stop();
LeaseLock lock();
long renewPeriodMillis();
String lockName();
static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService,
ArtemisExecutor executor,
String lockName,
LeaseLock lock,
long renewPeriodMillis,
IOCriticalErrorListener ioCriticalErrorListener) {
return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener);
LockListener lockListener) {
return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, lockListener);
}
}

View File

@ -21,15 +21,23 @@ import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils;
import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.ArtemisExecutor;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -38,6 +46,9 @@ import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.core.Is.is;
@RunWith(Parameterized.class)
public class JdbcLeaseLockTest extends ActiveMQTestBase {
@ -183,7 +194,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test
public void shouldAcquireExpiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
@ -197,13 +208,13 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test
public void shouldOtherAcquireExpiredLock() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
Assert.assertFalse("lock is already expired", lock.isHeldByCaller());
Assert.assertFalse("lock is already expired", lock.isHeld());
final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10));
final LeaseLock otherLock = lock(10);
try {
Assert.assertTrue("lock is already expired", otherLock.tryAcquire());
} finally {
@ -237,7 +248,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test
public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
@ -251,7 +262,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
@Test
public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException {
final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1));
final LeaseLock lock = lock(10);
Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire());
try {
Thread.sleep(lock.expirationMillis() * 2);
@ -268,5 +279,97 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase {
lock.release();
}
}
@Test
public void shouldNotNotifyLostLock() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
final ArtemisExecutor artemisExecutor = factory.getExecutor();
final AtomicLong lostLock = new AtomicLong();
final LockListener lockListener = () -> {
lostLock.incrementAndGet();
};
final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
.of(scheduledExecutorService, artemisExecutor,
"test", lock(), dbConf.getJdbcLockRenewPeriodMillis(), lockListener);
Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
scheduledLeaseLock.start();
Assert.assertEquals(0, lostLock.get());
scheduledLeaseLock.stop();
Assert.assertEquals(0, lostLock.get());
executorService.shutdown();
scheduledExecutorService.shutdown();
scheduledLeaseLock.lock().release();
}
@Test
public void shouldNotifyManyTimesLostLock() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
final ArtemisExecutor artemisExecutor = factory.getExecutor();
final AtomicLong lostLock = new AtomicLong();
final LockListener lockListener = () -> {
lostLock.incrementAndGet();
};
final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
.of(scheduledExecutorService, artemisExecutor,
"test", lock(TimeUnit.SECONDS.toMillis(1)), 100, lockListener);
Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
scheduledLeaseLock.start();
// should let the renew to happen at least 1 time, excluding the time to start a scheduled task
TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis());
Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller());
Assert.assertEquals(0, lostLock.get());
scheduledLeaseLock.lock().release();
Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller());
TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis());
Assert.assertThat(lostLock.get(), is(greaterThanOrEqualTo(2L)));
scheduledLeaseLock.stop();
executorService.shutdown();
scheduledExecutorService.shutdown();
}
@Test
public void shouldNotifyOnceLostLockIfStopped() throws Exception {
final ExecutorService executorService = Executors.newSingleThreadExecutor();
final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService);
final ArtemisExecutor artemisExecutor = factory.getExecutor();
final AtomicLong lostLock = new AtomicLong();
final AtomicReference<ScheduledLeaseLock> lock = new AtomicReference<>();
final AtomicReference<Throwable> stopErrors = new AtomicReference<>();
final LockListener lockListener = () -> {
lostLock.incrementAndGet();
try {
lock.get().stop();
} catch (Throwable e) {
stopErrors.set(e);
}
};
final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock
.of(scheduledExecutorService, artemisExecutor, "test", lock(TimeUnit.SECONDS.toMillis(1)),
100, lockListener);
lock.set(scheduledLeaseLock);
Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire());
lostLock.set(0);
scheduledLeaseLock.start();
Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller());
scheduledLeaseLock.lock().release();
Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller());
Wait.assertTrue(() -> lostLock.get() > 0);
Assert.assertFalse(scheduledLeaseLock.isStarted());
// wait enough to see if it get triggered again
TimeUnit.MILLISECONDS.sleep(scheduledLeaseLock.renewPeriodMillis());
Assert.assertEquals(1, lostLock.getAndSet(0));
Assert.assertNull(stopErrors.getAndSet(null));
scheduledLeaseLock.stop();
executorService.shutdown();
scheduledExecutorService.shutdown();
}
}

View File

@ -23,7 +23,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@ -105,17 +104,9 @@ public class JdbcNodeManagerTest extends ActiveMQTestBase {
@Test
public void shouldStartAndStopGracefullyTest() throws Exception {
final AtomicReference<String> criticalError = new AtomicReference<>();
final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null, (code, message, file) -> criticalError.lazySet(message));
try {
nodeManager.start();
} finally {
nodeManager.stop();
final String error = criticalError.get();
if (error != null) {
Assert.fail(error);
}
}
final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null);
nodeManager.start();
nodeManager.stop();
}
}

View File

@ -522,6 +522,7 @@ public abstract class ActiveMQTestBase extends Assert {
dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis());
dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis());
dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis());
dbStorageConfiguration.setJdbcNetworkTimeout(-1);
return dbStorageConfiguration;
}
@ -530,11 +531,11 @@ public abstract class ActiveMQTestBase extends Assert {
}
protected long getJdbcLockExpirationMillis() {
return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis());
return Long.getLong("jdbc.lock.expiration", 4_000);
}
protected long getJdbcLockRenewPeriodMillis() {
return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis());
return Long.getLong("jdbc.lock.renew", 200);
}
public void destroyTables(List<String> tableNames) throws Exception {

View File

@ -1,120 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class FileLockMonitorTest {
private File sharedDir;
private volatile boolean lostLock = false;
private volatile FileLockNodeManager nodeManager;
private ScheduledThreadPoolExecutor executor;
@Before
public void handleLockFile() throws IOException {
sharedDir = File.createTempFile("shared-dir", "");
sharedDir.delete();
Assert.assertTrue(sharedDir.mkdir());
lostLock = false;
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "sun.nio.ch.FileLockImpl", targetMethod = "isValid", action = "return false;") })
public void testLockMonitorInvalid() throws Exception {
lostLock = false;
startServer();
Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
nodeManager.isStarted();
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "throw new java.io.IOException(\"EFS is disconnected\");") })
public void testLockMonitorIOException() throws Exception {
lostLock = false;
startServer();
Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
public void testLockMonitorHasCorrectLockAndState() throws Exception {
lostLock = false;
startServer();
Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") })
public void testLockMonitorHasLockWrongState() throws Exception {
lostLock = false;
startServer();
Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
nodeManager.crashLiveServer();
executor.shutdown();
}
public LockListener startServer() throws Exception {
executor = new ScheduledThreadPoolExecutor(2);
nodeManager = new FileLockNodeManager(sharedDir, false, executor);
LockListener listener = nodeManager.new LockListener() {
@Override
protected void lostLock() throws Exception {
lostLock = true;
nodeManager.crashLiveServer();
}
};
nodeManager.registerLockListener(listener);
try {
nodeManager.start();
ActivateCallback startLiveNode = nodeManager.startLiveNode();
startLiveNode.activationComplete();
} catch (Exception exception) {
exception.printStackTrace();
}
return listener;
}
}
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.activemq.artemis.core.server.ActivateCallback;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.NodeManager.LockListener;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.utils.Wait;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class FileLockMonitorTest {
private File sharedDir;
private volatile boolean lostLock = false;
private volatile FileLockNodeManager nodeManager;
private ScheduledThreadPoolExecutor executor;
@Before
public void handleLockFile() throws Exception {
sharedDir = File.createTempFile("shared-dir", "");
sharedDir.delete();
Assert.assertTrue(sharedDir.mkdir());
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "isLiveLockLost", action = "return true;") })
public void testLockMonitorInvalid() throws Exception {
lostLock = false;
startServer();
Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 20_000, 100);
nodeManager.isStarted();
nodeManager.crashLiveServer();
executor.shutdown();
}
public static void throwNodeManagerException(String msg) {
throw new NodeManager.NodeManagerException(msg);
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager",
targetMethod = "getState",
action = "org.apache.activemq.artemis.tests.extras.byteman.FileLockMonitorTest.throwNodeManagerException(\"EFS is disconnected\");") })
public void testLockMonitorIOException() throws Exception {
lostLock = false;
startServer();
Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100);
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
public void testLockMonitorHasCorrectLockAndState() throws Exception {
lostLock = false;
startServer();
Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
nodeManager.crashLiveServer();
executor.shutdown();
}
@Test
@BMRules(rules = {
@BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") })
public void testLockMonitorHasLockWrongState() throws Exception {
lostLock = false;
startServer();
Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100));
nodeManager.crashLiveServer();
executor.shutdown();
}
public LockListener startServer() throws Exception {
executor = new ScheduledThreadPoolExecutor(2);
nodeManager = new FileLockNodeManager(sharedDir, false, executor);
LockListener listener = () -> {
lostLock = true;
try {
nodeManager.crashLiveServer();
} catch (Throwable t) {
t.printStackTrace();
}
};
nodeManager.registerLockListener(listener);
try {
nodeManager.start();
ActivateCallback startLiveNode = nodeManager.startLiveNode();
startLiveNode.activationComplete();
} catch (Exception exception) {
exception.printStackTrace();
}
return listener;
}
}

View File

@ -78,7 +78,7 @@ public class FileLockNodeManagerTest {
manager.awaitLiveNode();
} catch (Exception e) {
long stop = System.currentTimeMillis();
if (!"timed out waiting for lock".equals(e.getMessage())) {
if (!"timed out waiting for lock".equals(e.getCause().getMessage())) {
throw e;
}
return stop - start;

View File

@ -1,151 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.extras.byteman;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.ScaleDownConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
import org.jboss.byteman.contrib.bmunit.BMRule;
import org.jboss.byteman.contrib.bmunit.BMRules;
import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(BMUnitRunner.class)
public class SharedStoreBackupActivationTest extends FailoverTestBase {
private static Logger logger = Logger.getLogger(SharedStoreBackupActivationTest.class);
private static volatile boolean throwException = false;
private static CountDownLatch exceptionThrownLatch;
public static synchronized boolean isThrowException() {
logger.debugf("Throwing IOException during FileLockNodeManager.tryLock(): %s", throwException);
if (exceptionThrownLatch != null) {
exceptionThrownLatch.countDown();
}
return throwException;
}
/**
* Waits for the backup server to call FileLockNodeManager.tryLock().
*/
public static void awaitTryLock(boolean throwException) throws InterruptedException {
synchronized (SharedStoreBackupActivationTest.class) {
SharedStoreBackupActivationTest.throwException = throwException;
exceptionThrownLatch = new CountDownLatch(1);
}
logger.debugf("Awaiting backup to perform FileLockNodeManager.tryLock()");
boolean ret = exceptionThrownLatch.await(10, TimeUnit.SECONDS);
SharedStoreBackupActivationTest.throwException = false;
Assert.assertTrue("FileLockNodeManager.tryLock() was not called during specified timeout", ret);
logger.debugf("Awaiting FileLockNodeManager.tryLock() done");
}
@Test
@BMRules(
rules = {@BMRule(
name = "throw IOException during activation",
targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager",
targetMethod = "tryLock",
targetLocation = "AT ENTRY",
condition = "org.apache.activemq.artemis.tests.extras.byteman.SharedStoreBackupActivationTest.isThrowException()",
action = "THROW new IOException(\"IO Error\");")
})
public void testFailOverAfterTryLockException() throws Exception {
Assert.assertTrue(liveServer.isActive());
Assert.assertFalse(backupServer.isActive());
// wait for backup to try to acquire lock, once without exception (acquiring will not succeed because live is
// still active)
awaitTryLock(false);
// wait for backup to try to acquire lock, this time throw an IOException
logger.debug("Causing exception");
awaitTryLock(true);
// stop live server
logger.debugf("Stopping live server");
liveServer.stop();
waitForServerToStop(liveServer.getServer());
logger.debugf("Live server stopped, waiting for backup activation");
backupServer.getServer().waitForActivation(10, TimeUnit.SECONDS);
// backup should be activated by now
Assert.assertFalse(liveServer.isActive());
Assert.assertTrue("Backup server didn't activate", backupServer.isActive());
}
@Override
protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMAcceptor(live);
}
@Override
protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) {
return TransportConfigurationUtils.getInVMConnector(live);
}
@Override
protected void createConfigs() throws Exception {
File sharedDir = File.createTempFile("shared-dir", "");
sharedDir.delete();
Assert.assertTrue(sharedDir.mkdir());
logger.debugf("Created shared store directory %s", sharedDir.getCanonicalPath());
TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
// nodes must use separate FileLockNodeManager instances!
NodeManager liveNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
NodeManager backupNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1));
backupConfig = super.createDefaultConfig(false)
.clearAcceptorConfigurations()
.addAcceptorConfiguration(getAcceptorTransportConfiguration(false))
.setHAPolicyConfiguration(
new SharedStoreSlavePolicyConfiguration()
.setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false))
.setRestartBackup(false))
.addConnectorConfiguration(liveConnector.getName(), liveConnector)
.addConnectorConfiguration(backupConnector.getName(), backupConnector)
.addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName()));
backupServer = createTestableServer(backupConfig, backupNodeManager);
liveConfig = super.createDefaultConfig(false)
.clearAcceptorConfigurations()
.addAcceptorConfiguration(getAcceptorTransportConfiguration(true))
.setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true))
.addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName()))
.addConnectorConfiguration(liveConnector.getName(), liveConnector);
liveServer = createTestableServer(liveConfig, liveNodeManager);
}
}

View File

@ -18,34 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.SpawnedVMSupport;
import org.apache.activemq.artemis.utils.UUID;
import org.junit.Assert;
import org.junit.Test;
public class RealNodeManagerTest extends NodeManagerTest {
@Test
public void testId() throws Exception {
NodeManager nodeManager = new FileLockNodeManager(new File(getTemporaryDir()), false);
nodeManager.start();
UUID id1 = nodeManager.getUUID();
nodeManager.stop();
nodeManager.start();
ActiveMQTestBase.assertEqualsByteArrays(id1.asBytes(), nodeManager.getUUID().asBytes());
nodeManager.stop();
}
public class FileLockNodeManagerTest extends NodeManagerTest {
@Override
public void performWork(NodeManagerAction... actions) throws Exception {
List<Process> processes = new ArrayList<>();
for (NodeManagerAction action : actions) {
Process p = SpawnedVMSupport.spawnVM(NodeManagerAction.class.getName(), "-Xms512m", "-Xmx512m", new String[0], true, true, true, action.getWork());
final String[] args = new String[action.works() + 1];
args[0] = getTemporaryDir();
action.getWork(args, 1);
Process p = SpawnedVMSupport.spawnVM(this.getClass().getName(), "-Xms50m", "-Xmx512m", new String[0], true, true, true, args);
processes.add(p);
}
for (Process process : processes) {
@ -58,4 +47,8 @@ public class RealNodeManagerTest extends NodeManagerTest {
}
}
public static void main(String[] args) throws Exception {
NodeManagerAction.execute(Arrays.copyOfRange(args, 1, args.length), new FileLockNodeManager(new File(args[0]), false));
}
}

View File

@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cluster;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager;
import org.apache.activemq.artemis.utils.ExecutorFactory;
import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory;
import org.junit.Assert;
public class JdbcNodeManagerTest extends NodeManagerTest {
@Override
public void performWork(NodeManagerAction... actions) throws Exception {
List<NodeRunner> nodeRunners = new ArrayList<>();
final ThreadFactory daemonThreadFactory = t -> {
final Thread th = new Thread(t);
th.setDaemon(true);
return th;
};
Thread[] threads = new Thread[actions.length];
List<ExecutorService> executors = new ArrayList<>(actions.length);
List<NodeManager> nodeManagers = new ArrayList<>(actions.length * 2);
AtomicBoolean failedRenew = new AtomicBoolean(false);
for (NodeManagerAction action : actions) {
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory);
final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
nodeManager.start();
NodeRunner nodeRunner = new NodeRunner(nodeManager, action);
nodeRunners.add(nodeRunner);
nodeManagers.add(nodeManager);
executors.add(scheduledExecutorService);
executors.add(executor);
}
for (int i = 0, nodeRunnersSize = nodeRunners.size(); i < nodeRunnersSize; i++) {
NodeRunner nodeRunner = nodeRunners.get(i);
threads[i] = new Thread(nodeRunner);
threads[i].start();
}
boolean isDebug = isDebug();
for (Thread thread : threads) {
try {
if (isDebug) {
thread.join();
} else {
thread.join(60_000);
}
} catch (InterruptedException e) {
//
}
if (thread.isAlive()) {
thread.interrupt();
fail("thread still running");
}
}
// forcibly stop node managers
nodeManagers.forEach(nodeManager -> {
try {
nodeManager.stop();
} catch (Exception e) {
// won't prevent the test to complete
e.printStackTrace();
}
});
// stop executors
executors.forEach(ExecutorService::shutdownNow);
for (NodeRunner nodeRunner : nodeRunners) {
if (nodeRunner.e != null) {
nodeRunner.e.printStackTrace();
fail(nodeRunner.e.getMessage());
}
}
Assert.assertFalse("Some of the lease locks has failed to renew the locks", failedRenew.get());
}
}

View File

@ -16,10 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.cluster;
import java.io.File;
import java.util.Arrays;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.utils.UUID;
public class NodeManagerAction {
@ -35,6 +35,7 @@ public class NodeManagerAction {
public static final int HAS_BACKUP = 11;
public static final int DOESNT_HAVE_LIVE = 12;
public static final int DOESNT_HAVE_BACKUP = 13;
public static final int CHECK_ID = 14;
private final int[] work;
@ -82,7 +83,6 @@ public class NodeManagerAction {
}
break;
case HAS_BACKUP:
if (!hasBackupLock) {
throw new IllegalStateException("backup lock not held");
}
@ -93,37 +93,50 @@ public class NodeManagerAction {
}
break;
case DOESNT_HAVE_BACKUP:
if (hasBackupLock) {
throw new IllegalStateException("backup lock held");
}
break;
case CHECK_ID:
nodeManager.start();
UUID id1 = nodeManager.getUUID();
nodeManager.stop();
nodeManager.start();
if (!Arrays.equals(id1.asBytes(), nodeManager.getUUID().asBytes())) {
throw new IllegalStateException("getUUID should be the same on restart");
}
break;
}
}
}
public String[] getWork() {
String[] strings = new String[work.length];
for (int i = 0, stringsLength = strings.length; i < stringsLength; i++) {
strings[i] = "" + work[i];
}
return strings;
public int works() {
return work.length;
}
public static void main(String[] args) throws Exception {
public int getWork(String[] works, int start) {
final int workLength = work.length;
for (int i = 0; i < workLength; i++) {
works[i + start] = Integer.toString(work[i]);
}
return workLength;
}
public static void execute(String[] args, NodeManager nodeManager) throws Exception {
int[] work1 = new int[args.length];
for (int i = 0; i < args.length; i++) {
work1[i] = Integer.parseInt(args[i]);
}
NodeManagerAction nodeManagerAction = new NodeManagerAction(work1);
FileLockNodeManager nodeManager = new FileLockNodeManager(new File("."), false);
nodeManager.start();
try {
nodeManagerAction.performWork(nodeManager);
} catch (Exception e) {
e.printStackTrace();
System.exit(9);
} finally {
nodeManager.stop();
}
}

View File

@ -24,7 +24,9 @@ import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.tests.util.SpawnedTestBase;
import org.junit.Test;
import static java.lang.management.ManagementFactory.getRuntimeMXBean;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.AWAIT_LIVE;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CHECK_ID;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CRASH_LIVE;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_BACKUP;
import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_LIVE;
@ -38,6 +40,12 @@ import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerA
public class NodeManagerTest extends SpawnedTestBase {
@Test
public void testID() throws Exception {
NodeManagerAction live1 = new NodeManagerAction(CHECK_ID);
performWork(live1);
}
@Test
public void testLive() throws Exception {
NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE);
@ -155,6 +163,10 @@ public class NodeManagerTest extends SpawnedTestBase {
}
}
protected static boolean isDebug() {
return getRuntimeMXBean().getInputArguments().toString().contains("jdwp");
}
static class NodeRunner implements Runnable {
private NodeManagerAction action;

View File

@ -147,10 +147,7 @@ public class FileLockNodeManagerTest extends FailoverTestBase {
executors.add(executor);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
code.printStackTrace();
Assert.fail(message);
});
return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
case File:
final Configuration config = createDefaultInVMConfig();
if (useSeparateLockFolder) {

View File

@ -116,10 +116,7 @@ public class NettyFailoverTest extends FailoverTest {
executors.add(executor);
final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration();
final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor);
return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> {
code.printStackTrace();
Assert.fail(message);
});
return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory);
default:
throw new AssertionError("enum type not supported!");
}

View File

@ -100,7 +100,7 @@ public class CriticalCrashTest extends SpawnedTestBase {
@Override
protected StorageManager createStorageManager() {
JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) {
JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
@Override
public void readLock() {
super.readLock();

View File

@ -91,7 +91,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase {
@Override
protected StorageManager createStorageManager() {
JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) {
JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) {
@Override
protected Journal createMessageJournal(Configuration config,

View File

@ -175,38 +175,38 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
protected final class FakeNodeManager extends NodeManager {
public FakeNodeManager(String nodeID) {
super(false, null);
super(false);
this.setNodeID(nodeID);
}
@Override
public void awaitLiveNode() throws Exception {
public void awaitLiveNode() {
}
@Override
public void awaitLiveStatus() throws Exception {
public void awaitLiveStatus() {
}
@Override
public void startBackup() throws Exception {
public void startBackup() {
}
@Override
public ActivateCallback startLiveNode() throws Exception {
public ActivateCallback startLiveNode() {
return new CleaningActivateCallback() {
};
}
@Override
public void pauseLiveServer() throws Exception {
public void pauseLiveServer() {
}
@Override
public void crashLiveServer() throws Exception {
public void crashLiveServer() {
}
@Override
public void releaseBackup() throws Exception {
public void releaseBackup() {
}
@Override
@ -215,12 +215,12 @@ public class DiscoveryBaseTest extends ActiveMQTestBase {
}
@Override
public boolean isAwaitingFailback() throws Exception {
public boolean isAwaitingFailback() {
return false;
}
@Override
public boolean isBackupLive() throws Exception {
public boolean isBackupLive() {
return false;
}