diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java index 4b5c3cbf5d..a6b05e1787 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/NodeManager.java @@ -16,52 +16,50 @@ */ package org.apache.activemq.artemis.core.server; -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; +import java.util.HashSet; +import java.util.Set; -import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; import org.apache.activemq.artemis.utils.UUID; -import org.apache.activemq.artemis.utils.UUIDGenerator; +import org.jboss.logging.Logger; public abstract class NodeManager implements ActiveMQComponent { - protected static final byte FIRST_TIME_START = '0'; - public static final String SERVER_LOCK_NAME = "server.lock"; - private static final String ACCESS_MODE = "rw"; + @FunctionalInterface + public interface LockListener { + void lostLock(); + } + + private static final Logger LOGGER = Logger.getLogger(NodeManager.class); protected final boolean replicatedBackup; - private final File directory; - private final Object nodeIDGuard = new Object(); + protected final Object nodeIDGuard = new Object(); private SimpleString nodeID; private UUID uuid; private boolean isStarted = false; + private final Set lockListeners; - protected FileChannel channel; - - public NodeManager(final boolean replicatedBackup, final File directory) { - this.directory = directory; + public NodeManager(final boolean replicatedBackup) { this.replicatedBackup = replicatedBackup; + this.lockListeners = new HashSet<>(); } // -------------------------------------------------------------------- - public abstract void awaitLiveNode() throws Exception; + public abstract void awaitLiveNode() throws NodeManagerException, InterruptedException; - public abstract void awaitLiveStatus() throws Exception; + public abstract void awaitLiveStatus() throws NodeManagerException, InterruptedException; - public abstract void startBackup() throws Exception; + public abstract void startBackup() throws NodeManagerException, InterruptedException; - public abstract ActivateCallback startLiveNode() throws Exception; + public abstract ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException; - public abstract void pauseLiveServer() throws Exception; + public abstract void pauseLiveServer() throws NodeManagerException; - public abstract void crashLiveServer() throws Exception; + public abstract void crashLiveServer() throws NodeManagerException; - public abstract void releaseBackup() throws Exception; + public abstract void releaseBackup() throws NodeManagerException; // -------------------------------------------------------------------- @@ -81,7 +79,7 @@ public abstract class NodeManager implements ActiveMQComponent { } } - public abstract SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException; + public abstract SimpleString readNodeId() throws NodeManagerException; public UUID getUUID() { synchronized (nodeIDGuard) { @@ -113,119 +111,63 @@ public abstract class NodeManager implements ActiveMQComponent { } } - public abstract boolean isAwaitingFailback() throws Exception; + public abstract boolean isAwaitingFailback() throws NodeManagerException; - public abstract boolean isBackupLive() throws Exception; + public abstract boolean isBackupLive() throws NodeManagerException; public abstract void interrupt(); @Override public synchronized void stop() throws Exception { - FileChannel channelCopy = channel; - if (channelCopy != null) - channelCopy.close(); + // force any running threads on node manager to stop isStarted = false; + lockListeners.clear(); } - public void stopBackup() throws Exception { - if (replicatedBackup && getNodeId() != null) { - setUpServerLockFile(); - } + public void stopBackup() throws NodeManagerException { releaseBackup(); } - /** - * Ensures existence of persistent information about the server's nodeID. - *

- * Roughly the different use cases are: - *

    - *
  1. old live server restarts: a server.lock file already exists and contains a nodeID. - *
  2. new live server starting for the first time: no file exists, and we just *create* a new - * UUID to use as nodeID - *
  3. replicated backup received its nodeID from its live: no file exists, we need to persist - * the *current* nodeID - *
- */ - protected synchronized void setUpServerLockFile() throws IOException { - File serverLockFile = newFile(SERVER_LOCK_NAME); + protected synchronized void checkStarted() { + if (!isStarted) { + throw new IllegalStateException("the node manager is supposed to be started"); + } + } - boolean fileCreated = false; - - int count = 0; - while (!serverLockFile.exists()) { + protected synchronized void notifyLostLock() { + if (!isStarted) { + return; + } + lockListeners.forEach(lockListener -> { try { - fileCreated = serverLockFile.createNewFile(); - } catch (RuntimeException e) { - ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile); - throw e; - } catch (IOException e) { - /* - * on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think - * */ - if (count < 5) { - try { - Thread.sleep(100); - } catch (InterruptedException e1) { - } - count++; - continue; - } - ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile); - throw e; + lockListener.lostLock(); + } catch (Exception e) { + LOGGER.warn("On notify lost lock", e); + // Need to notify everyone so ignore any exception } - } - - @SuppressWarnings("resource") - RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE); - - channel = raFile.getChannel(); - - if (fileCreated) { - ByteBuffer id = ByteBuffer.allocateDirect(3); - byte[] bytes = new byte[3]; - bytes[0] = FIRST_TIME_START; - bytes[1] = FIRST_TIME_START; - bytes[2] = FIRST_TIME_START; - id.put(bytes, 0, 3); - id.position(0); - channel.write(id, 0); - channel.force(true); - } - - createNodeId(); + }); } - /** - * @return - */ - protected final File newFile(final String fileName) { - File file = new File(directory, fileName); - return file; + public synchronized void registerLockListener(FileLockNodeManager.LockListener lockListener) { + lockListeners.add(lockListener); } - protected final synchronized void createNodeId() throws IOException { - synchronized (nodeIDGuard) { - ByteBuffer id = ByteBuffer.allocateDirect(16); - int read = channel.read(id, 3); - if (replicatedBackup) { - id.position(0); - id.put(getUUID().asBytes(), 0, 16); - id.position(0); - channel.write(id, 3); - channel.force(true); - } else if (read != 16) { - setUUID(UUIDGenerator.getInstance().generateUUID()); - id.put(getUUID().asBytes(), 0, 16); - id.position(0); - channel.write(id, 3); - channel.force(true); - } else { - byte[] bytes = new byte[16]; - id.position(0); - id.get(bytes); - setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes)); - } + public synchronized void unregisterLockListener(FileLockNodeManager.LockListener lockListener) { + lockListeners.remove(lockListener); + } + + public static final class NodeManagerException extends RuntimeException { + + public NodeManagerException(String message) { + super(message); + } + + public NodeManagerException(Throwable cause) { + super(cause); + } + + public NodeManagerException(String message, Throwable cause) { + super(message, cause); } } - } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java index b72c755ecf..572f54ce26 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ColocatedPolicy.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.Map; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ColocatedActivation; import org.apache.activemq.artemis.core.server.impl.LiveActivation; @@ -91,8 +92,8 @@ public class ColocatedPolicy implements HAPolicy { public LiveActivation createActivation(ActiveMQServerImpl server, boolean wasLive, Map activationParams, - ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception { - return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, shutdownOnCriticalIO)); + IOCriticalErrorListener ioCriticalErrorListener) throws Exception { + return new ColocatedActivation(server, this, livePolicy.createActivation(server, wasLive, activationParams, ioCriticalErrorListener)); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java index bb93014de4..c5d62ac378 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/HAPolicy.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha; import java.util.Map; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -34,7 +35,7 @@ public interface HAPolicy { T createActivation(ActiveMQServerImpl server, boolean wasLive, Map activationParams, - ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception; + IOCriticalErrorListener shutdownOnCriticalIO) throws Exception; boolean isSharedStore(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java index 442c22dd25..9aa6a90632 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/LiveOnlyPolicy.java @@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha; import java.util.Map; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation; @@ -37,7 +38,7 @@ public class LiveOnlyPolicy implements HAPolicy { public Activation createActivation(ActiveMQServerImpl server, boolean wasLive, Map activationParams, - ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { + IOCriticalErrorListener ioCriticalErrorListener) { return new LiveOnlyActivation(server, this); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java index 36e65f02e9..2fbdfaa6e4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicaPolicy.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha; import java.util.Map; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.NetworkHealthCheck; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; @@ -211,8 +212,8 @@ public class ReplicaPolicy extends BackupPolicy { public Activation createActivation(ActiveMQServerImpl server, boolean wasLive, Map activationParams, - ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) throws Exception { - SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, shutdownOnCriticalIO, this, networkHealthCheck); + IOCriticalErrorListener ioCriticalErrorListener) throws Exception { + SharedNothingBackupActivation backupActivation = new SharedNothingBackupActivation(server, wasLive, activationParams, ioCriticalErrorListener, this, networkHealthCheck); backupActivation.init(); return backupActivation; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java index 99b98fec52..d4851eda9e 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/ReplicatedPolicy.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha; import java.util.Map; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.NetworkHealthCheck; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.LiveActivation; @@ -231,7 +232,7 @@ public class ReplicatedPolicy implements HAPolicy { public LiveActivation createActivation(ActiveMQServerImpl server, boolean wasLive, Map activationParams, - ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { + IOCriticalErrorListener ioCriticalErrorListener) { return new SharedNothingLiveActivation(server, this); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java index 82bbaf7f54..3e3913e8cb 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreMasterPolicy.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha; import java.util.Map; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.LiveActivation; import org.apache.activemq.artemis.core.server.impl.SharedStoreLiveActivation; @@ -91,8 +92,8 @@ public class SharedStoreMasterPolicy implements HAPolicy { public LiveActivation createActivation(ActiveMQServerImpl server, boolean wasLive, Map activationParams, - ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { - return new SharedStoreLiveActivation(server, this); + IOCriticalErrorListener ioCriticalErrorListener) { + return new SharedStoreLiveActivation(server, this, ioCriticalErrorListener); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java index a4a0ed1613..4a00ccff1b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/cluster/ha/SharedStoreSlavePolicy.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.core.server.cluster.ha; import java.util.Map; import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.impl.Activation; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.SharedStoreBackupActivation; @@ -101,8 +102,8 @@ public class SharedStoreSlavePolicy extends BackupPolicy { public Activation createActivation(ActiveMQServerImpl server, boolean wasLive, Map activationParams, - ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO) { - return new SharedStoreBackupActivation(server, this); + IOCriticalErrorListener ioCriticalErrorListener) { + return new SharedStoreBackupActivation(server, this, ioCriticalErrorListener); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java index c538569021..0616362494 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java @@ -329,7 +329,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { private final Map activationParams = new HashMap<>(); - protected final ShutdownOnCriticalErrorListener shutdownOnCriticalIO = new ShutdownOnCriticalErrorListener(); + protected final IOCriticalErrorListener ioCriticalErrorListener = new DefaultCriticalErrorListener(); private final ActiveMQServer parentServer; @@ -522,7 +522,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { throw new IllegalArgumentException("replicatingBackup is not supported yet while using JDBC persistence"); } final DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration(); - manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory, shutdownOnCriticalIO); + manager = JdbcNodeManager.with(dbConf, scheduledPool, executorFactory); } else if (haType == null || haType == HAPolicyConfiguration.TYPE.LIVE_ONLY) { if (logger.isDebugEnabled()) { logger.debug("Detected no Shared Store HA options on JDBC store"); @@ -610,7 +610,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { final boolean wasLive = !haPolicy.isBackup(); if (!haPolicy.isBackup()) { - activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); + activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener); if (afterActivationCreated != null) { try { @@ -636,9 +636,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { // checking again here if (haPolicy.isBackup()) { if (haPolicy.isSharedStore()) { - activation = haPolicy.createActivation(this, false, activationParams, shutdownOnCriticalIO); + activation = haPolicy.createActivation(this, false, activationParams, ioCriticalErrorListener); } else { - activation = haPolicy.createActivation(this, wasLive, activationParams, shutdownOnCriticalIO); + activation = haPolicy.createActivation(this, wasLive, activationParams, ioCriticalErrorListener); } if (afterActivationCreated != null) { @@ -1117,12 +1117,23 @@ public class ActiveMQServerImpl implements ActiveMQServer { this.stop(failoverOnServerShutdown, criticalIOError, restarting, false); } + private void stop(boolean failoverOnServerShutdown, + final boolean criticalIOError, + boolean restarting, + boolean isShutdown) { + stop(failoverOnServerShutdown, criticalIOError, isShutdown || criticalIOError, restarting, isShutdown); + } + /** * Stops the server * * @param criticalIOError whether we have encountered an IO error with the journal etc */ - void stop(boolean failoverOnServerShutdown, final boolean criticalIOError, boolean restarting, boolean isShutdown) { + private void stop(boolean failoverOnServerShutdown, + final boolean criticalIOError, + boolean shutdownExternalComponents, + boolean restarting, + boolean isShutdown) { if (logger.isDebugEnabled()) { logger.debug("Stopping server " + this); @@ -1344,7 +1355,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { connectedClientIds.clear(); - stopExternalComponents(isShutdown || criticalIOError); + stopExternalComponents(shutdownExternalComponents); try { this.analyzer.clear(); @@ -2794,9 +2805,9 @@ public class ActiveMQServerImpl implements ActiveMQServer { protected PagingStoreFactory getPagingStoreFactory() throws Exception { if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { DatabaseStorageConfiguration dbConf = (DatabaseStorageConfiguration) configuration.getStoreConfiguration(); - return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, shutdownOnCriticalIO, configuration.isReadWholePage()); + return new PagingStoreFactoryDatabase(dbConf, storageManager, configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, false, ioCriticalErrorListener, configuration.isReadWholePage()); } - return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), shutdownOnCriticalIO, configuration.isReadWholePage()); + return new PagingStoreFactoryNIO(storageManager, configuration.getPagingLocation(), configuration.getPageSyncTimeout(), scheduledPool, ioExecutorFactory, configuration.isJournalSyncNonTransactional(), ioCriticalErrorListener, configuration.isReadWholePage()); } /** @@ -2805,12 +2816,12 @@ public class ActiveMQServerImpl implements ActiveMQServer { protected StorageManager createStorageManager() { if (configuration.isPersistenceEnabled()) { if (configuration.getStoreConfiguration() != null && configuration.getStoreConfiguration().getStoreType() == StoreConfiguration.StoreType.DATABASE) { - JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, shutdownOnCriticalIO); + JDBCJournalStorageManager journal = new JDBCJournalStorageManager(configuration, getCriticalAnalyzer(), getScheduledPool(), executorFactory, ioExecutorFactory, ioCriticalErrorListener); this.getCriticalAnalyzer().add(journal); return journal; } else { // Default to File Based Storage Manager, (Legacy default configuration). - JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO); + JournalStorageManager journal = new JournalStorageManager(configuration, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener); this.getCriticalAnalyzer().add(journal); return journal; } @@ -3136,7 +3147,7 @@ public class ActiveMQServerImpl implements ActiveMQServer { if (configuration.getMaxDiskUsage() != -1) { try { - injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, shutdownOnCriticalIO)); + injectMonitor(new FileStoreMonitor(getScheduledPool(), executorFactory.getExecutor(), configuration.getDiskScanPeriod(), TimeUnit.MILLISECONDS, configuration.getMaxDiskUsage() / 100f, ioCriticalErrorListener)); } catch (Exception e) { ActiveMQServerLogger.LOGGER.unableToInjectMonitor(e); } @@ -3929,23 +3940,23 @@ public class ActiveMQServerImpl implements ActiveMQServer { // Inner classes // -------------------------------------------------------------------------------- - public final class ShutdownOnCriticalErrorListener implements IOCriticalErrorListener { + public final class DefaultCriticalErrorListener implements IOCriticalErrorListener { - boolean failedAlready = false; + private final AtomicBoolean failedAlready = new AtomicBoolean(); @Override public synchronized void onIOException(Throwable cause, String message, SequentialFile file) { - if (!failedAlready) { - failedAlready = true; - - if (file == null) { - ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause); - } else { - ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause); - } - - stopTheServer(true); + if (!failedAlready.compareAndSet(false, true)) { + return; } + + if (file == null) { + ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, "NULL", cause); + } else { + ActiveMQServerLogger.LOGGER.ioCriticalIOError(message, file.toString(), cause); + } + + stopTheServer(true); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java new file mode 100644 index 0000000000..cfbcb47d02 --- /dev/null +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileBasedNodeManager.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.server.impl; + +import java.io.File; +import java.io.IOException; +import java.io.RandomAccessFile; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; + +import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; +import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.utils.UUID; +import org.apache.activemq.artemis.utils.UUIDGenerator; + +public abstract class FileBasedNodeManager extends NodeManager { + + protected static final byte FIRST_TIME_START = '0'; + public static final String SERVER_LOCK_NAME = "server.lock"; + private static final String ACCESS_MODE = "rw"; + private final File directory; + protected FileChannel channel; + + public FileBasedNodeManager(boolean replicatedBackup, File directory) { + super(replicatedBackup); + this.directory = directory; + } + + /** + * Ensures existence of persistent information about the server's nodeID. + *

+ * Roughly the different use cases are: + *

    + *
  1. old live server restarts: a server.lock file already exists and contains a nodeID. + *
  2. new live server starting for the first time: no file exists, and we just *create* a new + * UUID to use as nodeID + *
  3. replicated backup received its nodeID from its live: no file exists, we need to persist + * the *current* nodeID + *
+ */ + protected synchronized void setUpServerLockFile() throws IOException { + File serverLockFile = newFile(SERVER_LOCK_NAME); + + boolean fileCreated = false; + + int count = 0; + while (!serverLockFile.exists()) { + try { + fileCreated = serverLockFile.createNewFile(); + } catch (RuntimeException e) { + ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile); + throw e; + } catch (IOException e) { + /* + * on some OS's this may fail weirdly even tho the parent dir exists, retrying will work, some weird timing issue i think + * */ + if (count < 5) { + try { + Thread.sleep(100); + } catch (InterruptedException e1) { + } + count++; + continue; + } + ActiveMQServerLogger.LOGGER.nodeManagerCantOpenFile(e, serverLockFile); + throw e; + } + } + + @SuppressWarnings("resource") + RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE); + + channel = raFile.getChannel(); + + if (fileCreated) { + ByteBuffer id = ByteBuffer.allocateDirect(3); + byte[] bytes = new byte[3]; + bytes[0] = FIRST_TIME_START; + bytes[1] = FIRST_TIME_START; + bytes[2] = FIRST_TIME_START; + id.put(bytes, 0, 3); + id.position(0); + channel.write(id, 0); + channel.force(true); + } + + createNodeId(); + } + + /** + * @return + */ + protected final File newFile(final String fileName) { + File file = new File(directory, fileName); + return file; + } + + protected final synchronized void createNodeId() throws IOException { + synchronized (nodeIDGuard) { + ByteBuffer id = ByteBuffer.allocateDirect(16); + int read = channel.read(id, 3); + if (replicatedBackup) { + id.position(0); + id.put(getUUID().asBytes(), 0, 16); + id.position(0); + channel.write(id, 3); + channel.force(true); + } else if (read != 16) { + setUUID(UUIDGenerator.getInstance().generateUUID()); + id.put(getUUID().asBytes(), 0, 16); + id.position(0); + channel.write(id, 3); + channel.force(true); + } else { + byte[] bytes = new byte[16]; + id.position(0); + id.get(bytes); + setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes)); + } + } + } + + @Override + public synchronized void stop() throws Exception { + FileChannel channelCopy = channel; + if (channelCopy != null) + channelCopy.close(); + super.stop(); + } + + @Override + public void stopBackup() throws NodeManagerException { + if (replicatedBackup && getNodeId() != null) { + try { + setUpServerLockFile(); + } catch (IOException e) { + throw new NodeManagerException(e); + } + } + super.stopBackup(); + } +} diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java index ddd59fff16..3b3da1c8c4 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/FileLockNodeManager.java @@ -22,23 +22,18 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.channels.FileLock; -import java.util.Collections; -import java.util.HashSet; -import java.util.Set; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActivateCallback; import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.utils.UUID; import org.jboss.logging.Logger; -public class FileLockNodeManager extends NodeManager { +public class FileLockNodeManager extends FileBasedNodeManager { private static final Logger logger = Logger.getLogger(FileLockNodeManager.class); @@ -58,9 +53,9 @@ public class FileLockNodeManager extends NodeManager { private static final byte NOT_STARTED = 'N'; - private static final long LOCK_ACCESS_FAILURE_WAIT_TIME = 2000; + private static final long LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS = TimeUnit.SECONDS.toNanos(2); - private static final int LOCK_MONITOR_TIMEOUT_MILLIES = 2000; + private static final long LOCK_MONITOR_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(2); private volatile FileLock liveLock; @@ -68,28 +63,31 @@ public class FileLockNodeManager extends NodeManager { private final FileChannel[] lockChannels = new FileChannel[3]; - protected long lockAcquisitionTimeout = -1; + private final long lockAcquisitionTimeoutNanos; protected boolean interrupted = false; - private ScheduledExecutorService scheduledPool; + private final ScheduledExecutorService scheduledPool; public FileLockNodeManager(final File directory, boolean replicatedBackup, ScheduledExecutorService scheduledPool) { super(replicatedBackup, directory); this.scheduledPool = scheduledPool; + this.lockAcquisitionTimeoutNanos = -1; } public FileLockNodeManager(final File directory, boolean replicatedBackup) { super(replicatedBackup, directory); this.scheduledPool = null; + this.lockAcquisitionTimeoutNanos = -1; } - public FileLockNodeManager(final File directory, boolean replicatedBackup, long lockAcquisitionTimeout, - ScheduledExecutorService scheduledPool) { + public FileLockNodeManager(final File directory, + boolean replicatedBackup, + long lockAcquisitionTimeout, + ScheduledExecutorService scheduledPool) { super(replicatedBackup, directory); - this.scheduledPool = scheduledPool; - this.lockAcquisitionTimeout = lockAcquisitionTimeout; + this.lockAcquisitionTimeoutNanos = lockAcquisitionTimeout == -1 ? -1 : TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeout); } @Override @@ -141,19 +139,23 @@ public class FileLockNodeManager extends NodeManager { } @Override - public boolean isAwaitingFailback() throws Exception { + public boolean isAwaitingFailback() throws NodeManagerException { return getState() == FileLockNodeManager.FAILINGBACK; } @Override - public boolean isBackupLive() throws Exception { - FileLock liveAttemptLock; - liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS); - if (liveAttemptLock == null) { - return true; - } else { - liveAttemptLock.release(); - return false; + public boolean isBackupLive() throws NodeManagerException { + try { + FileLock liveAttemptLock; + liveAttemptLock = tryLock(FileLockNodeManager.LIVE_LOCK_POS); + if (liveAttemptLock == null) { + return true; + } else { + liveAttemptLock.release(); + return false; + } + } catch (IOException e) { + throw new NodeManagerException(e); } } @@ -167,182 +169,218 @@ public class FileLockNodeManager extends NodeManager { } @Override - public final void releaseBackup() throws Exception { - if (backupLock != null) { - backupLock.release(); - backupLock = null; + public final void releaseBackup() throws NodeManagerException { + try { + if (backupLock != null) { + backupLock.release(); + backupLock = null; + } + } catch (IOException e) { + throw new NodeManagerException(e); } } @Override - public void awaitLiveNode() throws Exception { - logger.debug("awaiting live node..."); - do { - byte state = getState(); - while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) { - logger.debug("awaiting live node startup state='" + state + "'"); - Thread.sleep(2000); + public void awaitLiveNode() throws NodeManagerException, InterruptedException { + try { + logger.debug("awaiting live node..."); + do { + byte state = getState(); + while (state == FileLockNodeManager.NOT_STARTED || state == FIRST_TIME_START) { + logger.debug("awaiting live node startup state='" + state + "'"); + Thread.sleep(2000); + state = getState(); + } + + liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS); + if (interrupted) { + interrupted = false; + throw new InterruptedException("Lock was interrupted"); + } state = getState(); + if (state == FileLockNodeManager.PAUSED) { + liveLock.release(); + logger.debug("awaiting live node restarting"); + Thread.sleep(2000); + } else if (state == FileLockNodeManager.FAILINGBACK) { + liveLock.release(); + logger.debug("awaiting live node failing back"); + Thread.sleep(2000); + } else if (state == FileLockNodeManager.LIVE) { + logger.debug("acquired live node lock state = " + (char) state); + break; + } } - - liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS); - if (interrupted) { - interrupted = false; - throw new InterruptedException("Lock was interrupted"); - } - state = getState(); - if (state == FileLockNodeManager.PAUSED) { - liveLock.release(); - logger.debug("awaiting live node restarting"); - Thread.sleep(2000); - } else if (state == FileLockNodeManager.FAILINGBACK) { - liveLock.release(); - logger.debug("awaiting live node failing back"); - Thread.sleep(2000); - } else if (state == FileLockNodeManager.LIVE) { - logger.debug("acquired live node lock state = " + (char) state); - break; - } + while (true); + } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) { + throw new NodeManagerException(e); } - while (true); } @Override - public void startBackup() throws Exception { + public void startBackup() throws NodeManagerException { assert !replicatedBackup; // should not be called if this is a replicating backup ActiveMQServerLogger.LOGGER.waitingToBecomeBackup(); - - backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS); + try { + backupLock = lock(FileLockNodeManager.BACKUP_LOCK_POS); + } catch (ActiveMQLockAcquisitionTimeoutException e) { + throw new NodeManagerException(e); + } ActiveMQServerLogger.LOGGER.gotBackupLock(); if (getUUID() == null) readNodeId(); } @Override - public ActivateCallback startLiveNode() throws Exception { - setFailingBack(); + public ActivateCallback startLiveNode() throws NodeManagerException { + try { + setFailingBack(); - String timeoutMessage = lockAcquisitionTimeout == -1 ? "indefinitely" : lockAcquisitionTimeout + " milliseconds"; + String timeoutMessage = lockAcquisitionTimeoutNanos == -1 ? "indefinitely" : TimeUnit.NANOSECONDS.toMillis(lockAcquisitionTimeoutNanos) + " milliseconds"; - ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage); + ActiveMQServerLogger.LOGGER.waitingToObtainLiveLock(timeoutMessage); - liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS); + liveLock = lock(FileLockNodeManager.LIVE_LOCK_POS); - ActiveMQServerLogger.LOGGER.obtainedLiveLock(); + ActiveMQServerLogger.LOGGER.obtainedLiveLock(); - return new CleaningActivateCallback() { - @Override - public void activationComplete() { - try { - setLive(); - startLockMonitoring(); - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + return new CleaningActivateCallback() { + @Override + public void activationComplete() { + try { + setLive(); + startLockMonitoring(); + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + // that allows to restart/stop the broker if needed + throw e; + } } - } - }; + }; + } catch (ActiveMQLockAcquisitionTimeoutException e) { + throw new NodeManagerException(e); + } } @Override - public void pauseLiveServer() throws Exception { + public void pauseLiveServer() throws NodeManagerException { stopLockMonitoring(); setPaused(); - if (liveLock != null) { - liveLock.release(); + try { + if (liveLock != null) { + liveLock.release(); + } + } catch (IOException e) { + throw new NodeManagerException(e); } } @Override - public void crashLiveServer() throws Exception { + public void crashLiveServer() throws NodeManagerException { stopLockMonitoring(); if (liveLock != null) { - liveLock.release(); - liveLock = null; + try { + liveLock.release(); + } catch (IOException e) { + throw new NodeManagerException(e); + } finally { + liveLock = null; + } } } @Override - public void awaitLiveStatus() throws Exception { + public void awaitLiveStatus() throws NodeManagerException, InterruptedException { while (getState() != LIVE) { Thread.sleep(2000); } } - private void setLive() throws Exception { + private void setLive() throws NodeManagerException { writeFileLockStatus(FileLockNodeManager.LIVE); } - private void setFailingBack() throws Exception { + private void setFailingBack() throws NodeManagerException { writeFileLockStatus(FAILINGBACK); } - private void setPaused() throws Exception { + private void setPaused() throws NodeManagerException { writeFileLockStatus(PAUSED); } /** * @param status - * @throws IOException + * @throws ActiveMQLockAcquisitionTimeoutException,IOException */ - private void writeFileLockStatus(byte status) throws Exception { + private void writeFileLockStatus(byte status) throws NodeManagerException { if (replicatedBackup && channel == null) return; logger.debug("writing status: " + status); ByteBuffer bb = ByteBuffer.allocateDirect(1); bb.put(status); bb.position(0); - if (!channel.isOpen()) { - setUpServerLockFile(); - } - FileLock lock = null; try { - lock = lock(STATE_LOCK_POS); - channel.write(bb, 0); - channel.force(true); - } finally { - if (lock != null) { - lock.release(); + if (!channel.isOpen()) { + setUpServerLockFile(); } + FileLock lock = null; + try { + lock = lock(STATE_LOCK_POS); + channel.write(bb, 0); + channel.force(true); + } finally { + if (lock != null) { + lock.release(); + } + } + } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) { + throw new NodeManagerException(e); } } - private byte getState() throws Exception { - byte result; - logger.debug("getting state..."); - ByteBuffer bb = ByteBuffer.allocateDirect(1); - int read; - FileLock lock = null; + private byte getState() throws NodeManagerException { try { - lock = lock(STATE_LOCK_POS); - read = channel.read(bb, 0); - if (read <= 0) { - result = FileLockNodeManager.NOT_STARTED; - } else { - result = bb.get(0); - } - } finally { - if (lock != null) { - lock.release(); + byte result; + logger.debug("getting state..."); + ByteBuffer bb = ByteBuffer.allocateDirect(1); + int read; + FileLock lock = null; + try { + lock = lock(STATE_LOCK_POS); + read = channel.read(bb, 0); + if (read <= 0) { + result = FileLockNodeManager.NOT_STARTED; + } else { + result = bb.get(0); + } + } finally { + if (lock != null) { + lock.release(); + } } + logger.debug("state: " + result); + return result; + } catch (IOException | ActiveMQLockAcquisitionTimeoutException e) { + throw new NodeManagerException(e); } - - logger.debug("state: " + result); - - return result; } @Override - public final SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException { - ByteBuffer id = ByteBuffer.allocateDirect(16); - int read = channel.read(id, 3); - if (read != 16) { - throw new ActiveMQIllegalStateException("live server did not write id to file"); + public final SimpleString readNodeId() throws NodeManagerException { + try { + ByteBuffer id = ByteBuffer.allocateDirect(16); + int read = channel.read(id, 3); + if (read != 16) { + throw new IOException("live server did not write id to file"); + } + byte[] bytes = new byte[16]; + id.position(0); + id.get(bytes); + setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes)); + return getNodeId(); + } catch (IOException e) { + throw new NodeManagerException(e); } - byte[] bytes = new byte[16]; - id.position(0); - id.get(bytes); - setUUID(new UUID(UUID.TYPE_TIME_BASED, bytes)); - return getNodeId(); } protected FileLock tryLock(final int lockPos) throws IOException { @@ -361,8 +399,8 @@ public class FileLockNodeManager extends NodeManager { } } - protected FileLock lock(final int lockPosition) throws Exception { - long start = System.currentTimeMillis(); + protected FileLock lock(final int lockPosition) throws ActiveMQLockAcquisitionTimeoutException { + long start = System.nanoTime(); boolean isRecurringFailure = false; while (!interrupted) { @@ -377,7 +415,7 @@ public class FileLockNodeManager extends NodeManager { return null; } - if (lockAcquisitionTimeout != -1 && (System.currentTimeMillis() - start) > lockAcquisitionTimeout) { + if (this.lockAcquisitionTimeoutNanos != -1 && (System.nanoTime() - start) > this.lockAcquisitionTimeoutNanos) { throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock"); } } else { @@ -390,9 +428,9 @@ public class FileLockNodeManager extends NodeManager { "Failure when accessing a lock file", e); isRecurringFailure = true; - long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME; - if (lockAcquisitionTimeout != -1) { - final long remainingTime = lockAcquisitionTimeout - (System.currentTimeMillis() - start); + long waitTime = LOCK_ACCESS_FAILURE_WAIT_TIME_NANOS; + if (this.lockAcquisitionTimeoutNanos != -1) { + final long remainingTime = this.lockAcquisitionTimeoutNanos - (System.nanoTime() - start); if (remainingTime <= 0) { throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock"); } @@ -400,7 +438,7 @@ public class FileLockNodeManager extends NodeManager { } try { - Thread.sleep(waitTime); + TimeUnit.NANOSECONDS.sleep(waitTime); } catch (InterruptedException interrupt) { return null; } @@ -414,7 +452,7 @@ public class FileLockNodeManager extends NodeManager { private synchronized void startLockMonitoring() { logger.debug("Starting the lock monitor"); if (monitorLock == null) { - monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_MILLIES, LOCK_MONITOR_TIMEOUT_MILLIES, TimeUnit.MILLISECONDS, false); + monitorLock = new MonitorLock(scheduledPool, LOCK_MONITOR_TIMEOUT_NANOS, LOCK_MONITOR_TIMEOUT_NANOS, TimeUnit.NANOSECONDS, false); monitorLock.start(); } else { logger.debug("Lock monitor was already started"); @@ -431,49 +469,22 @@ public class FileLockNodeManager extends NodeManager { } } - private void notifyLostLock() { - // Additional check we are not initializing or have no locking object anymore - // because of a shutdown - if (lockListeners != null && liveLock != null) { - Set lockListenersSnapshot = null; - - // Snapshot of the set because I'm not sure if we can trigger concurrent - // modification exception here if we don't - synchronized (lockListeners) { - lockListenersSnapshot = new HashSet<>(lockListeners); - } - - lockListenersSnapshot.forEach(lockListener -> { - try { - lockListener.lostLock(); - } catch (Exception e) { - // Need to notify everyone so ignore any exception - } - }); + @Override + protected synchronized void notifyLostLock() { + if (liveLock != null) { + super.notifyLostLock(); } } - public void registerLockListener(LockListener lockListener) { - lockListeners.add(lockListener); + // This has been introduced to help ByteMan test testLockMonitorInvalid on JDK 11: sun.nio.ch.FileLockImpl::isValid + // can affecting setLive, causing an infinite loop due to java.nio.channels.OverlappingFileLockException on tryLock + private boolean isLiveLockLost() { + final FileLock lock = this.liveLock; + return (lock != null && !lock.isValid()) || lock == null; } - public void unregisterLockListener(LockListener lockListener) { - lockListeners.remove(lockListener); - } - - protected final Set lockListeners = Collections.synchronizedSet(new HashSet()); - private MonitorLock monitorLock; - public abstract class LockListener { - protected abstract void lostLock() throws Exception; - - protected void unregisterListener() { - lockListeners.remove(this); - } - } - - public class MonitorLock extends ActiveMQScheduledComponent { public MonitorLock(ScheduledExecutorService scheduledExecutorService, long initialDelay, @@ -492,7 +503,7 @@ public class FileLockNodeManager extends NodeManager { if (liveLock == null) { logger.debug("Livelock is null"); } - lostLock = (liveLock != null && !liveLock.isValid()) || liveLock == null; + lostLock = isLiveLockLost(); if (!lostLock) { logger.debug("Server still has the lock, double check status is live"); // Java always thinks the lock is still valid even when there is no filesystem diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java index 7af25defde..bbe4191a09 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/InVMNodeManager.java @@ -17,14 +17,10 @@ package org.apache.activemq.artemis.core.server.impl; import java.io.File; -import java.io.IOException; import java.util.concurrent.Semaphore; -import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.server.ActivateCallback; -import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; -import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.utils.UUIDGenerator; import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State.FAILING_BACK; @@ -39,7 +35,7 @@ import static org.apache.activemq.artemis.core.server.impl.InVMNodeManager.State * multiple servers are run inside the same VM and File Locks can not be shared in the * same VM (it would cause a shared lock violation). */ -public final class InVMNodeManager extends NodeManager { +public final class InVMNodeManager extends FileBasedNodeManager { private final Semaphore liveLock; @@ -67,7 +63,7 @@ public final class InVMNodeManager extends NodeManager { } @Override - public void awaitLiveNode() throws Exception { + public void awaitLiveNode() throws InterruptedException { do { while (state == NOT_STARTED) { Thread.sleep(10); @@ -92,51 +88,47 @@ public final class InVMNodeManager extends NodeManager { } @Override - public void awaitLiveStatus() throws Exception { + public void awaitLiveStatus() throws InterruptedException { while (state != LIVE) { Thread.sleep(10); } } @Override - public void startBackup() throws Exception { + public void startBackup() throws InterruptedException { backupLock.acquire(); } @Override - public ActivateCallback startLiveNode() throws Exception { + public ActivateCallback startLiveNode() throws InterruptedException { state = FAILING_BACK; liveLock.acquire(); return new CleaningActivateCallback() { @Override public void activationComplete() { - try { - state = LIVE; - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); - } + state = LIVE; } }; } @Override - public void pauseLiveServer() throws Exception { + public void pauseLiveServer() { state = PAUSED; liveLock.release(); } @Override - public void crashLiveServer() throws Exception { + public void crashLiveServer() { liveLock.release(); } @Override - public boolean isAwaitingFailback() throws Exception { + public boolean isAwaitingFailback() { return state == FAILING_BACK; } @Override - public boolean isBackupLive() throws Exception { + public boolean isBackupLive() { return liveLock.availablePermits() == 0; } @@ -151,7 +143,7 @@ public final class InVMNodeManager extends NodeManager { } @Override - public SimpleString readNodeId() throws ActiveMQIllegalStateException, IOException { + public SimpleString readNodeId() { return getNodeId(); } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java index 38483d0ddd..002414a771 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingBackupActivation.java @@ -25,6 +25,7 @@ import org.apache.activemq.artemis.api.core.Pair; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.api.core.client.TopologyMember; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; @@ -67,7 +68,7 @@ public final class SharedNothingBackupActivation extends Activation { private SharedNothingBackupQuorum backupQuorum; private final boolean attemptFailBack; private final Map activationParams; - private final ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO; + private final IOCriticalErrorListener ioCriticalErrorListener; private String nodeID; ClusterControl clusterControl; private boolean closed; @@ -79,13 +80,13 @@ public final class SharedNothingBackupActivation extends Activation { public SharedNothingBackupActivation(ActiveMQServerImpl activeMQServer, boolean attemptFailBack, Map activationParams, - ActiveMQServerImpl.ShutdownOnCriticalErrorListener shutdownOnCriticalIO, + IOCriticalErrorListener ioCriticalErrorListener, ReplicaPolicy replicaPolicy, NetworkHealthCheck networkHealthCheck) { this.activeMQServer = activeMQServer; this.attemptFailBack = attemptFailBack; this.activationParams = activationParams; - this.shutdownOnCriticalIO = shutdownOnCriticalIO; + this.ioCriticalErrorListener = ioCriticalErrorListener; this.replicaPolicy = replicaPolicy; backupSyncLatch.setCount(1); this.networkHealthCheck = networkHealthCheck; @@ -95,7 +96,7 @@ public final class SharedNothingBackupActivation extends Activation { assert replicationEndpoint == null; activeMQServer.resetNodeManager(); backupUpToDate = false; - replicationEndpoint = new ReplicationEndpoint(activeMQServer, shutdownOnCriticalIO, attemptFailBack, this); + replicationEndpoint = new ReplicationEndpoint(activeMQServer, ioCriticalErrorListener, attemptFailBack, this); } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java index 7178cd822e..9de4be057b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedNothingLiveActivation.java @@ -317,7 +317,7 @@ public class SharedNothingLiveActivation extends LiveActivation { SimpleString nodeId0; try { nodeId0 = activeMQServer.getNodeManager().readNodeId(); - } catch (ActiveMQIllegalStateException e) { + } catch (NodeManager.NodeManagerException e) { nodeId0 = null; } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java index 463d97c0df..0b21445102 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreBackupActivation.java @@ -16,18 +16,24 @@ */ package org.apache.activemq.artemis.core.server.impl; +import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.postoffice.PostOffice; +import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException; import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.NodeManager.LockListener; +import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException; import org.apache.activemq.artemis.core.server.QueueFactory; import org.apache.activemq.artemis.core.server.cluster.ha.ScaleDownPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy; @@ -40,17 +46,24 @@ public final class SharedStoreBackupActivation extends Activation { private static final Logger logger = Logger.getLogger(SharedStoreBackupActivation.class); //this is how we act as a backup - private SharedStoreSlavePolicy sharedStoreSlavePolicy; + private final SharedStoreSlavePolicy sharedStoreSlavePolicy; - private ActiveMQServerImpl activeMQServer; + private final ActiveMQServerImpl activeMQServer; private final Object failbackCheckerGuard = new Object(); private boolean cancelFailBackChecker; - public SharedStoreBackupActivation(ActiveMQServerImpl server, SharedStoreSlavePolicy sharedStoreSlavePolicy) { + private LockListener activeLockListener; + + private final IOCriticalErrorListener ioCriticalErrorListener; + + public SharedStoreBackupActivation(ActiveMQServerImpl server, + SharedStoreSlavePolicy sharedStoreSlavePolicy, + IOCriticalErrorListener ioCriticalErrorListener) { this.activeMQServer = server; this.sharedStoreSlavePolicy = sharedStoreSlavePolicy; + this.ioCriticalErrorListener = ioCriticalErrorListener; synchronized (failbackCheckerGuard) { cancelFailBackChecker = false; } @@ -59,6 +72,8 @@ public final class SharedStoreBackupActivation extends Activation { @Override public void run() { try { + registerActiveLockListener(activeMQServer.getNodeManager()); + activeMQServer.getNodeManager().startBackup(); ScaleDownPolicy scaleDownPolicy = sharedStoreSlavePolicy.getScaleDownPolicy(); @@ -92,6 +107,11 @@ public final class SharedStoreBackupActivation extends Activation { activeMQServer.completeActivation(false); if (scalingDown) { + if (!restarting.compareAndSet(false, true)) { + return; + } + unregisterActiveLockListener(activeMQServer.getNodeManager()); + ActiveMQServerLogger.LOGGER.backupServerScaledDown(); Thread t = new Thread(new Runnable() { @Override @@ -117,6 +137,17 @@ public final class SharedStoreBackupActivation extends Activation { if (sharedStoreSlavePolicy.isAllowAutoFailBack() && ActiveMQServerImpl.SERVER_STATE.STOPPING != activeMQServer.getState() && ActiveMQServerImpl.SERVER_STATE.STOPPED != activeMQServer.getState()) { startFailbackChecker(); } + } catch (NodeManagerException nodeManagerException) { + if (nodeManagerException.getCause() instanceof ClosedChannelException) { + // this is ok, we are being stopped + return; + } + if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) { + ActiveMQServerLogger.LOGGER.initializationError(nodeManagerException.getCause()); + return; + } + unregisterActiveLockListener(activeMQServer.getNodeManager()); + ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null); } catch (ClosedChannelException | InterruptedException e) { // these are ok, we are being stopped } catch (Exception e) { @@ -144,16 +175,25 @@ public final class SharedStoreBackupActivation extends Activation { activeMQServer.interruptActivationThread(nodeManagerInUse); if (nodeManagerInUse != null) { + unregisterActiveLockListener(nodeManagerInUse); nodeManagerInUse.stopBackup(); } } else { if (nodeManagerInUse != null) { + unregisterActiveLockListener(nodeManagerInUse); // if we are now live, behave as live // We need to delete the file too, otherwise the backup will failover when we shutdown or if the backup is // started before the live if (sharedStoreSlavePolicy.isFailoverOnServerShutdown() || permanently) { - nodeManagerInUse.crashLiveServer(); + try { + nodeManagerInUse.crashLiveServer(); + } catch (Throwable t) { + if (!permanently) { + throw t; + } + logger.warn("Errored while closing activation: can be ignored because of permanent close", t); + } } else { nodeManagerInUse.pauseLiveServer(); } @@ -161,6 +201,27 @@ public final class SharedStoreBackupActivation extends Activation { } } + private void registerActiveLockListener(NodeManager nodeManager) { + LockListener lockListener = () -> { + if (!restarting.compareAndSet(false, true)) { + logger.warn("Restarting already happening on lost lock"); + return; + } + unregisterActiveLockListener(nodeManager); + ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null); + }; + activeLockListener = lockListener; + nodeManager.registerLockListener(lockListener); + } + + private void unregisterActiveLockListener(NodeManager nodeManager) { + LockListener activeLockListener = this.activeLockListener; + if (activeLockListener != null) { + nodeManager.unregisterLockListener(activeLockListener); + this.activeLockListener = null; + } + } + @Override public JournalLoader createJournalLoader(PostOffice postOffice, PagingManager pagingManager, @@ -178,6 +239,8 @@ public final class SharedStoreBackupActivation extends Activation { } } + private final AtomicBoolean restarting = new AtomicBoolean(false); + /** * To be called by backup trying to fail back the server */ @@ -195,47 +258,50 @@ public final class SharedStoreBackupActivation extends Activation { activeMQServer.getClusterManager().getDefaultConnection(null).addClusterTopologyListener(backupListener); } - private boolean restarting = false; - @Override public void run() { try { - if (!restarting && activeMQServer.getNodeManager().isAwaitingFailback()) { - if (backupListener.waitForBackup()) { - ActiveMQServerLogger.LOGGER.awaitFailBack(); - restarting = true; - Thread t = new Thread(new Runnable() { - @Override - public void run() { - try { - logger.debug(activeMQServer + "::Stopping live node in favor of failback"); - - NodeManager nodeManager = activeMQServer.getNodeManager(); - activeMQServer.stop(true, false, true); - - // ensure that the server to which we are failing back actually starts fully before we restart - nodeManager.start(); - try { - nodeManager.awaitLiveStatus(); - } finally { - nodeManager.stop(); - } - - synchronized (failbackCheckerGuard) { - if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup()) - return; - - activeMQServer.setHAPolicy(sharedStoreSlavePolicy); - logger.debug(activeMQServer + "::Starting backup node now after failback"); - activeMQServer.start(); - } - } catch (Exception e) { - ActiveMQServerLogger.LOGGER.serverRestartWarning(e); - } - } - }); - t.start(); + if (!restarting.get() && activeMQServer.getNodeManager().isAwaitingFailback() && backupListener.waitForBackup()) { + if (!restarting.compareAndSet(false, true)) { + return; } + ActiveMQServerLogger.LOGGER.awaitFailBack(); + Thread t = new Thread(new Runnable() { + @Override + public void run() { + try { + logger.debug(activeMQServer + "::Stopping live node in favor of failback"); + + NodeManager nodeManager = activeMQServer.getNodeManager(); + activeMQServer.stop(true, false, true); + + // ensure that the server to which we are failing back actually starts fully before we restart + nodeManager.start(); + try { + nodeManager.awaitLiveStatus(); + } finally { + nodeManager.stop(); + } + + synchronized (failbackCheckerGuard) { + if (cancelFailBackChecker || !sharedStoreSlavePolicy.isRestartBackup()) + return; + + activeMQServer.setHAPolicy(sharedStoreSlavePolicy); + logger.debug(activeMQServer + "::Starting backup node now after failback"); + activeMQServer.start(); + + LockListener lockListener = activeLockListener; + if (lockListener != null) { + activeMQServer.getNodeManager().registerLockListener(lockListener); + } + } + } catch (Exception e) { + ActiveMQServerLogger.LOGGER.serverRestartWarning(e); + } + } + }); + t.start(); } } catch (Exception e) { ActiveMQServerLogger.LOGGER.serverRestartWarning(e); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java index 6745ed987e..1733f34f4b 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/SharedStoreLiveActivation.java @@ -16,11 +16,17 @@ */ package org.apache.activemq.artemis.core.server.impl; +import java.io.IOException; +import java.nio.channels.ClosedChannelException; + +import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.ActivateCallback; +import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.NodeManager.LockListener; +import org.apache.activemq.artemis.core.server.NodeManager.NodeManagerException; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy; -import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener; import org.jboss.logging.Logger; public final class SharedStoreLiveActivation extends LiveActivation { @@ -28,17 +34,22 @@ public final class SharedStoreLiveActivation extends LiveActivation { private static final Logger logger = Logger.getLogger(SharedStoreLiveActivation.class); // this is how we act when we initially start as live - private SharedStoreMasterPolicy sharedStoreMasterPolicy; + private final SharedStoreMasterPolicy sharedStoreMasterPolicy; - private ActiveMQServerImpl activeMQServer; + private final ActiveMQServerImpl activeMQServer; - private volatile FileLockNodeManager.LockListener activeLockListener; + private volatile LockListener activeLockListener; private volatile ActivateCallback nodeManagerActivateCallback; - public SharedStoreLiveActivation(ActiveMQServerImpl server, SharedStoreMasterPolicy sharedStoreMasterPolicy) { + private final IOCriticalErrorListener ioCriticalErrorListener; + + public SharedStoreLiveActivation(ActiveMQServerImpl server, + SharedStoreMasterPolicy sharedStoreMasterPolicy, + IOCriticalErrorListener ioCriticalErrorListener) { this.activeMQServer = server; this.sharedStoreMasterPolicy = sharedStoreMasterPolicy; + this.ioCriticalErrorListener = ioCriticalErrorListener; } @Override @@ -71,9 +82,9 @@ public final class SharedStoreLiveActivation extends LiveActivation { activeMQServer.getBackupManager().announceBackup(); } + registerActiveLockListener(activeMQServer.getNodeManager()); nodeManagerActivateCallback = activeMQServer.getNodeManager().startLiveNode(); activeMQServer.registerActivateCallback(nodeManagerActivateCallback); - addLockListener(activeMQServer, activeMQServer.getNodeManager()); if (activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPED || activeMQServer.getState() == ActiveMQServerImpl.SERVER_STATE.STOPPING) { @@ -85,59 +96,40 @@ public final class SharedStoreLiveActivation extends LiveActivation { activeMQServer.completeActivation(false); ActiveMQServerLogger.LOGGER.serverIsLive(); + } catch (NodeManagerException nodeManagerException) { + if (nodeManagerException.getCause() instanceof ClosedChannelException) { + // this is ok, we are being stopped + return; + } + if (nodeManagerException.getCause() instanceof ActiveMQLockAcquisitionTimeoutException) { + onActivationFailure((ActiveMQLockAcquisitionTimeoutException) nodeManagerException.getCause()); + return; + } + unregisterActiveLockListener(activeMQServer.getNodeManager()); + ioCriticalErrorListener.onIOException(nodeManagerException, nodeManagerException.getMessage(), null); } catch (Exception e) { - ActiveMQServerLogger.LOGGER.initializationError(e); - activeMQServer.callActivationFailureListeners(e); + onActivationFailure(e); } } - private void addLockListener(ActiveMQServerImpl activeMQServer, NodeManager nodeManager) { - if (nodeManager instanceof FileLockNodeManager) { - FileLockNodeManager fileNodeManager = (FileLockNodeManager) nodeManager; - - activeLockListener = fileNodeManager.new LockListener() { - - @Override - public void lostLock() { - stopStartServerInSeperateThread(activeMQServer); - } - - }; - fileNodeManager.registerLockListener(activeLockListener); - } // else no business registering a listener + private void onActivationFailure(Exception e) { + unregisterActiveLockListener(activeMQServer.getNodeManager()); + ActiveMQServerLogger.LOGGER.initializationError(e); + activeMQServer.callActivationFailureListeners(e); } - /** - * We need to do this in a new thread because this takes to long to finish in - * the scheduled thread Also this is not the responsibility of the scheduled - * thread - * @param activeMQServer - */ - private void stopStartServerInSeperateThread(ActiveMQServerImpl activeMQServer) { - try { + private void registerActiveLockListener(NodeManager nodeManager) { + LockListener lockListener = () -> + ioCriticalErrorListener.onIOException(new IOException("lost lock"), "Lost NodeManager lock", null); + activeLockListener = lockListener; + nodeManager.registerLockListener(lockListener); + } - Runnable startServerRunnable = new Runnable() { - - @Override - public void run() { - try { - activeMQServer.stop(true, false); - } catch (Exception e) { - logger.warn("Failed to stop artemis server after loosing the lock", e); - } - - try { - activeMQServer.start(); - } catch (Exception e) { - logger.error("Failed to start artemis server after recovering from loosing the lock", e); - } - } - - }; - Thread startServer = new Thread(startServerRunnable); - startServer.start(); - } catch (Exception e) { - logger.error(e.getMessage()); + private void unregisterActiveLockListener(NodeManager nodeManager) { + LockListener activeLockListener = this.activeLockListener; + if (activeLockListener != null) { + nodeManager.unregisterLockListener(activeLockListener); + this.activeLockListener = null; } } @@ -147,16 +139,20 @@ public final class SharedStoreLiveActivation extends LiveActivation { NodeManager nodeManagerInUse = activeMQServer.getNodeManager(); if (nodeManagerInUse != null) { - LockListener closeLockListener = activeLockListener; - if (closeLockListener != null) { - closeLockListener.unregisterListener(); - } + unregisterActiveLockListener(nodeManagerInUse); ActivateCallback activateCallback = nodeManagerActivateCallback; if (activateCallback != null) { activeMQServer.unregisterActivateCallback(activateCallback); } if (sharedStoreMasterPolicy.isFailoverOnServerShutdown() || permanently) { - nodeManagerInUse.crashLiveServer(); + try { + nodeManagerInUse.crashLiveServer(); + } catch (Throwable t) { + if (!permanently) { + throw t; + } + logger.warn("Errored while closing activation: can be ignored because of permanent close", t); + } } else { nodeManagerInUse.pauseLiveServer(); } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java index ce91d70d5f..d1382cfc82 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ActiveMQScheduledLeaseLock.java @@ -20,13 +20,13 @@ package org.apache.activemq.artemis.core.server.impl.jdbc; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.ActiveMQScheduledComponent; +import org.apache.activemq.artemis.core.server.NodeManager.LockListener; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; import org.jboss.logging.Logger; /** - * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, IOCriticalErrorListener)}. + * Default implementation of a {@link ScheduledLeaseLock}: see {@link ScheduledLeaseLock#of(ScheduledExecutorService, ArtemisExecutor, String, LeaseLock, long, LockListener)}. */ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implements ScheduledLeaseLock { @@ -36,14 +36,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem private final LeaseLock lock; private long lastLockRenewStart; private final long renewPeriodMillis; - private final IOCriticalErrorListener ioCriticalErrorListener; + private final LockListener lockListener; ActiveMQScheduledLeaseLock(ScheduledExecutorService scheduledExecutorService, ArtemisExecutor executor, String lockName, LeaseLock lock, long renewPeriodMillis, - IOCriticalErrorListener ioCriticalErrorListener) { + LockListener lockListener) { super(scheduledExecutorService, executor, 0, renewPeriodMillis, TimeUnit.MILLISECONDS, false); if (renewPeriodMillis >= lock.expirationMillis()) { throw new IllegalArgumentException("renewPeriodMillis must be < lock's expirationMillis"); @@ -51,9 +51,14 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem this.lockName = lockName; this.lock = lock; this.renewPeriodMillis = renewPeriodMillis; - //already expired start time + // already expired start time this.lastLockRenewStart = System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(lock.expirationMillis()); - this.ioCriticalErrorListener = ioCriticalErrorListener; + this.lockListener = lockListener; + } + + @Override + public String lockName() { + return lockName; } @Override @@ -84,37 +89,55 @@ final class ActiveMQScheduledLeaseLock extends ActiveMQScheduledComponent implem } @Override - public void run() { + public synchronized void run() { + if (!isStarted()) { + return; + } final long lastRenewStart = this.lastLockRenewStart; final long renewStart = System.nanoTime(); + boolean lockLost = true; try { - if (!this.lock.renew()) { - ioCriticalErrorListener.onIOException(new IllegalStateException(lockName + " lock can't be renewed"), "Critical error while on " + lockName + " renew", null); - } + lockLost = !this.lock.renew(); } catch (Throwable t) { - ioCriticalErrorListener.onIOException(t, "Critical error while on " + lockName + " renew", null); - throw t; + LOGGER.warnf(t, "%s lock renew has failed", lockName); + if (lock.localExpirationTime() > 0) { + final long millisBeforeExpiration = (lock.localExpirationTime() - System.currentTimeMillis()); + // there is enough time to retry to renew it? + if (millisBeforeExpiration >= this.renewPeriodMillis) { + lockLost = false; + } + } + } + // a failed attempt to renew is treated as a lost lock + if (lockLost) { + try { + lockListener.lostLock(); + } catch (Throwable t) { + LOGGER.warnf(t, "Errored while notifying %s lock listener", lockName); + } } //logic to detect slowness of DB and/or the scheduled executor service - detectAndReportRenewSlowness(lockName, lastRenewStart, renewStart, renewPeriodMillis, lock.expirationMillis()); + detectAndReportRenewSlowness(lockName, lockLost, lastRenewStart, + renewStart, renewPeriodMillis, lock.expirationMillis()); this.lastLockRenewStart = renewStart; } private static void detectAndReportRenewSlowness(String lockName, + boolean lostLock, long lastRenewStart, long renewStart, long expectedRenewPeriodMillis, long expirationMillis) { final long elapsedMillisToRenew = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - renewStart); if (elapsedMillisToRenew > expectedRenewPeriodMillis) { - LOGGER.error(lockName + " lock renew tooks " + elapsedMillisToRenew + " ms, while is supposed to take <" + expectedRenewPeriodMillis + " ms"); + LOGGER.errorf("%s lock %s renew tooks %d ms, while is supposed to take <%d ms", lockName, lostLock ? "failed" : "successful", elapsedMillisToRenew, expectedRenewPeriodMillis); } final long measuredRenewPeriodNanos = renewStart - lastRenewStart; final long measuredRenewPeriodMillis = TimeUnit.NANOSECONDS.toMillis(measuredRenewPeriodNanos); if (measuredRenewPeriodMillis - expirationMillis > 100) { - LOGGER.error(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms"); + LOGGER.errorf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis); } else if (measuredRenewPeriodMillis - expectedRenewPeriodMillis > 100) { - LOGGER.warn(lockName + " lock renew period lasted " + measuredRenewPeriodMillis + " ms instead of " + expectedRenewPeriodMillis + " ms"); + LOGGER.warnf("%s lock %s renew period lasted %d ms instead of %d ms", lockName, lostLock ? "failed" : "successful", measuredRenewPeriodMillis, expectedRenewPeriodMillis); } } } diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java index 3b7124eeaa..f1c789c06f 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLock.java @@ -44,8 +44,10 @@ final class JdbcLeaseLock implements LeaseLock { private final String isLocked; private final String currentDateTime; private final long expirationMillis; + private final int queryTimeout; private boolean maybeAcquired; private final String lockName; + private long localExpirationTime; /** * The lock will be responsible (ie {@link #close()}) of all the {@link PreparedStatement}s used by it, but not of the {@link Connection}, @@ -59,6 +61,7 @@ final class JdbcLeaseLock implements LeaseLock { String isLocked, String currentDateTime, long expirationMIllis, + long queryTimeoutMillis, String lockName) { if (holderId.length() > MAX_HOLDER_ID_LENGTH) { throw new IllegalArgumentException("holderId length must be <=" + MAX_HOLDER_ID_LENGTH); @@ -73,12 +76,30 @@ final class JdbcLeaseLock implements LeaseLock { this.maybeAcquired = false; this.connectionProvider = connectionProvider; this.lockName = lockName; + this.localExpirationTime = -1; + int expectedTimeout = (int) (queryTimeoutMillis > 0 ? TimeUnit.MILLISECONDS.toSeconds(queryTimeoutMillis) : -1); + if (queryTimeoutMillis >= 0) { + LOGGER.warn("queryTimeoutMillis is too low: it's suggested to configure a multi-seconds value. Disabling it because too low."); + expectedTimeout = -1; + } + this.queryTimeout = expectedTimeout; + } public String holderId() { return holderId; } + /** + * Given that many DBMS won't support standard SQL queries to collect CURRENT_TIMESTAMP at milliseconds granularity, + * this value is stripped of the milliseconds part, making it less optimistic then the reality, if >= 0.

+ * It's commonly used as an hard deadline for JDBC operations, hence is fine to not have a high precision. + */ + @Override + public long localExpirationTime() { + return localExpirationTime; + } + @Override public long expirationMillis() { return expirationMillis; @@ -115,17 +136,24 @@ final class JdbcLeaseLock implements LeaseLock { } private long dbCurrentTimeMillis(Connection connection) throws SQLException { - final long start = System.nanoTime(); try (PreparedStatement currentDateTime = connection.prepareStatement(this.currentDateTime)) { + if (queryTimeout >= 0) { + currentDateTime.setQueryTimeout(queryTimeout); + } + final long startTime = stripMilliseconds(System.currentTimeMillis()); try (ResultSet resultSet = currentDateTime.executeQuery()) { resultSet.next(); + final long endTime = stripMilliseconds(System.currentTimeMillis()); final Timestamp currentTimestamp = resultSet.getTimestamp(1); - final long elapsedTime = System.nanoTime() - start; - if (LOGGER.isDebugEnabled()) { - LOGGER.debugf("[%s] %s query currentTimestamp = %s tooks %d ms", - lockName, holderId, currentTimestamp, TimeUnit.NANOSECONDS.toMillis(elapsedTime)); + final long currentTime = currentTimestamp.getTime(); + final long currentTimeMillis = stripMilliseconds(currentTime); + if (currentTimeMillis < startTime) { + LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen AFTER %s on broker", lockName, holderId, currentTimestamp, new Timestamp(startTime)); } - return currentTimestamp.getTime(); + if (currentTimeMillis > endTime) { + LOGGER.warnf("[%s] %s query currentTimestamp = %s on database should happen BEFORE %s on broker", lockName, holderId, currentTimestamp, new Timestamp(endTime)); + } + return currentTime; } } } @@ -138,7 +166,8 @@ final class JdbcLeaseLock implements LeaseLock { connection.setAutoCommit(false); try (PreparedStatement preparedStatement = connection.prepareStatement(this.renewLock)) { final long now = dbCurrentTimeMillis(connection); - final Timestamp expirationTime = new Timestamp(now + expirationMillis); + final long localExpirationTime = now + expirationMillis; + final Timestamp expirationTime = new Timestamp(localExpirationTime); if (LOGGER.isDebugEnabled()) { LOGGER.debugf("[%s] %s is renewing lock with expirationTime = %s", lockName, holderId, expirationTime); @@ -151,11 +180,13 @@ final class JdbcLeaseLock implements LeaseLock { final boolean renewed = updatedRows == 1; connection.commit(); if (!renewed) { + this.localExpirationTime = -1; if (LOGGER.isDebugEnabled()) { LOGGER.debugf("[%s] %s has failed to renew lock: lock status = { %s }", lockName, holderId, readableLockStatus()); } } else { + this.localExpirationTime = stripMilliseconds(localExpirationTime); LOGGER.debugf("[%s] %s has renewed lock", lockName, holderId); } return renewed; @@ -170,6 +201,10 @@ final class JdbcLeaseLock implements LeaseLock { } } + private static long stripMilliseconds(long time) { + return (time / 1000) * 1000; + } + @Override public boolean tryAcquire() { try (Connection connection = connectionProvider.getConnection()) { @@ -179,7 +214,8 @@ final class JdbcLeaseLock implements LeaseLock { try (PreparedStatement preparedStatement = connection.prepareStatement(this.tryAcquireLock)) { final long now = dbCurrentTimeMillis(connection); preparedStatement.setString(1, holderId); - final Timestamp expirationTime = new Timestamp(now + expirationMillis); + final long localExpirationTime = now + expirationMillis; + final Timestamp expirationTime = new Timestamp(localExpirationTime); preparedStatement.setTimestamp(2, expirationTime); preparedStatement.setTimestamp(3, expirationTime); LOGGER.debugf("[%s] %s is trying to acquire lock with expirationTime %s", @@ -188,6 +224,7 @@ final class JdbcLeaseLock implements LeaseLock { connection.commit(); if (acquired) { this.maybeAcquired = true; + this.localExpirationTime = stripMilliseconds(localExpirationTime); LOGGER.debugf("[%s] %s has acquired lock", lockName, holderId); } else { if (LOGGER.isDebugEnabled()) { @@ -272,6 +309,7 @@ final class JdbcLeaseLock implements LeaseLock { preparedStatement.setString(1, holderId); final boolean released = preparedStatement.executeUpdate() == 1; //consider it as released to avoid on finalize to be reclaimed + this.localExpirationTime = -1; this.maybeAcquired = false; connection.commit(); if (!released) { diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java index 212e6e12e1..f357925d73 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManager.java @@ -23,8 +23,8 @@ import java.util.function.Supplier; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; -import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.ActivateCallback; +import org.apache.activemq.artemis.core.server.ActiveMQLockAcquisitionTimeoutException; import org.apache.activemq.artemis.core.server.ActiveMQServerLogger; import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.impl.CleaningActivateCallback; @@ -36,6 +36,8 @@ import org.apache.activemq.artemis.utils.UUID; import org.apache.activemq.artemis.utils.UUIDGenerator; import org.jboss.logging.Logger; +import static org.apache.activemq.artemis.core.server.impl.jdbc.LeaseLock.AcquireResult.Timeout; + /** * JDBC implementation of {@link NodeManager}. */ @@ -53,12 +55,10 @@ public final class JdbcNodeManager extends NodeManager { private final long lockAcquisitionTimeoutMillis; private volatile boolean interrupted = false; private final LeaseLock.Pauser pauser; - private final IOCriticalErrorListener ioCriticalErrorListener; public static JdbcNodeManager with(DatabaseStorageConfiguration configuration, ScheduledExecutorService scheduledExecutorService, - ExecutorFactory executorFactory, - IOCriticalErrorListener ioCriticalErrorListener) { + ExecutorFactory executorFactory) { validateTimeoutConfiguration(configuration); final SQLProvider.Factory sqlProviderFactory; if (configuration.getSqlProviderFactory() != null) { @@ -74,8 +74,7 @@ public final class JdbcNodeManager extends NodeManager { configuration.getConnectionProvider(), sqlProviderFactory.create(configuration.getNodeManagerStoreTableName(), SQLProvider.DatabaseStoreType.NODE_MANAGER), scheduledExecutorService, - executorFactory, - ioCriticalErrorListener); + executorFactory); } private static JdbcNodeManager usingConnectionProvider(String brokerId, @@ -85,18 +84,16 @@ public final class JdbcNodeManager extends NodeManager { JDBCConnectionProvider connectionProvider, SQLProvider provider, ScheduledExecutorService scheduledExecutorService, - ExecutorFactory executorFactory, - IOCriticalErrorListener ioCriticalErrorListener) { - return new JdbcNodeManager( - () -> JdbcSharedStateManager.usingConnectionProvider(brokerId, - lockExpirationMillis, - connectionProvider, - provider), + ExecutorFactory executorFactory) { + return new JdbcNodeManager(() -> JdbcSharedStateManager.usingConnectionProvider(brokerId, lockExpirationMillis, + lockRenewPeriodMillis, + connectionProvider, + provider), + lockExpirationMillis, lockRenewPeriodMillis, lockAcquisitionTimeoutMillis, scheduledExecutorService, - executorFactory, - ioCriticalErrorListener); + executorFactory); } private static void validateTimeoutConfiguration(DatabaseStorageConfiguration configuration) { @@ -122,12 +119,12 @@ public final class JdbcNodeManager extends NodeManager { } private JdbcNodeManager(Supplier sharedStateManagerFactory, + long lockExpirationMillis, long lockRenewPeriodMillis, long lockAcquisitionTimeoutMillis, ScheduledExecutorService scheduledExecutorService, - ExecutorFactory executorFactory, - IOCriticalErrorListener ioCriticalErrorListener) { - super(false, null); + ExecutorFactory executorFactory) { + super(false); this.lockAcquisitionTimeoutMillis = lockAcquisitionTimeoutMillis; this.pauser = LeaseLock.Pauser.sleep(Math.min(lockRenewPeriodMillis, MAX_PAUSE_MILLIS), TimeUnit.MILLISECONDS); this.sharedStateManagerFactory = sharedStateManagerFactory; @@ -137,7 +134,7 @@ public final class JdbcNodeManager extends NodeManager { "live", this.sharedStateManager.liveLock(), lockRenewPeriodMillis, - ioCriticalErrorListener); + this::notifyLostLock); this.scheduledBackupLockFactory = () -> ScheduledLeaseLock.of( scheduledExecutorService, executorFactory != null ? @@ -145,35 +142,47 @@ public final class JdbcNodeManager extends NodeManager { "backup", this.sharedStateManager.backupLock(), lockRenewPeriodMillis, - ioCriticalErrorListener); - this.ioCriticalErrorListener = ioCriticalErrorListener; + this::notifyLostLock); this.sharedStateManager = null; this.scheduledLiveLock = null; this.scheduledBackupLock = null; } @Override - public void start() throws Exception { + protected synchronized void notifyLostLock() { try { - synchronized (this) { - if (isStarted()) { - return; - } - this.sharedStateManager = sharedStateManagerFactory.get(); - LOGGER.debug("setup sharedStateManager on start"); - final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); - setUUID(nodeId); - this.scheduledLiveLock = scheduledLiveLockFactory.get(); - this.scheduledBackupLock = scheduledBackupLockFactory.get(); - super.start(); + super.notifyLostLock(); + } finally { + // if any of the notified listener has stopped the node manager or + // the node manager was already stopped + if (!isStarted()) { + return; } + try { + stop(); + } catch (Exception ex) { + LOGGER.warn("Stopping node manager has errored on lost lock notification", ex); + } + } + } + + @Override + public synchronized void start() throws Exception { + try { + if (isStarted()) { + return; + } + this.sharedStateManager = sharedStateManagerFactory.get(); + LOGGER.debug("setup sharedStateManager on start"); + final UUID nodeId = sharedStateManager.setup(UUIDGenerator.getInstance()::generateUUID); + setUUID(nodeId); + this.scheduledLiveLock = scheduledLiveLockFactory.get(); + this.scheduledBackupLock = scheduledBackupLockFactory.get(); + super.start(); } catch (IllegalStateException e) { this.sharedStateManager = null; this.scheduledLiveLock = null; this.scheduledBackupLock = null; - if (this.ioCriticalErrorListener != null) { - this.ioCriticalErrorListener.onIOException(e, "Failed to setup the JdbcNodeManager", null); - } throw e; } } @@ -200,42 +209,33 @@ public final class JdbcNodeManager extends NodeManager { } @Override - public boolean isAwaitingFailback() throws Exception { + public boolean isAwaitingFailback() throws NodeManagerException { + checkStarted(); LOGGER.debug("ENTER isAwaitingFailback"); try { return readSharedState() == SharedStateManager.State.FAILING_BACK; + } catch (IllegalStateException e) { + LOGGER.warn("cannot retrieve the live state: assume it's not yet failed back", e); + return false; } finally { LOGGER.debug("EXIT isAwaitingFailback"); } } @Override - public boolean isBackupLive() throws Exception { + public boolean isBackupLive() throws NodeManagerException { + checkStarted(); LOGGER.debug("ENTER isBackupLive"); try { //is anyone holding the live lock? return this.scheduledLiveLock.lock().isHeld(); + } catch (IllegalStateException e) { + throw new NodeManagerException(e); } finally { LOGGER.debug("EXIT isBackupLive"); } } - @Override - public void stopBackup() throws Exception { - LOGGER.debug("ENTER stopBackup"); - try { - if (this.scheduledBackupLock.isStarted()) { - LOGGER.debug("scheduledBackupLock is running: stop it and release backup lock"); - this.scheduledBackupLock.stop(); - this.scheduledBackupLock.lock().release(); - } else { - LOGGER.debug("scheduledBackupLock is not running"); - } - } finally { - LOGGER.debug("EXIT stopBackup"); - } - } - @Override public void interrupt() { LOGGER.debug("ENTER interrupted"); @@ -245,7 +245,8 @@ public final class JdbcNodeManager extends NodeManager { } @Override - public void releaseBackup() throws Exception { + public void releaseBackup() throws NodeManagerException { + checkStarted(); LOGGER.debug("ENTER releaseBackup"); try { if (this.scheduledBackupLock.isStarted()) { @@ -255,19 +256,47 @@ public final class JdbcNodeManager extends NodeManager { } else { LOGGER.debug("scheduledBackupLock is not running"); } + } catch (IllegalStateException e) { + throw new NodeManagerException(e); } finally { LOGGER.debug("EXIT releaseBackup"); } } /** - * Try to acquire a lock, failing with an exception otherwise. + * Try to acquire a lock */ - private void lock(LeaseLock lock) throws Exception { - final LeaseLock.AcquireResult acquireResult = lock.tryAcquire(this.lockAcquisitionTimeoutMillis, this.pauser, () -> !this.interrupted); + private void lock(LeaseLock lock) throws ActiveMQLockAcquisitionTimeoutException, InterruptedException { + final long lockAcquisitionTimeoutNanos = lockAcquisitionTimeoutMillis >= 0 ? + TimeUnit.MILLISECONDS.toNanos(lockAcquisitionTimeoutMillis) : -1; + LeaseLock.AcquireResult acquireResult = null; + final long start = System.nanoTime(); + while (acquireResult == null) { + checkStarted(); + // measure distance from the timeout + final long remainingNanos = remainingNanos(start, lockAcquisitionTimeoutNanos); + if (remainingNanos == 0) { + acquireResult = Timeout; + continue; + } + final long remainingMillis = remainingNanos > 0 ? TimeUnit.NANOSECONDS.toMillis(remainingNanos) : -1; + try { + acquireResult = lock.tryAcquire(remainingMillis, this.pauser, () -> !this.interrupted); + } catch (IllegalStateException e) { + LOGGER.warn("Errored while trying to acquire lock", e); + if (remainingNanos(start, lockAcquisitionTimeoutNanos) == 0) { + acquireResult = Timeout; + continue; + } + // that's not precise, but it's ok: it can trigger the timeout right after the pause, + // depending by the pause length. The sole purpose of the pause is to save + // hammering with requests the DBMS if the connection is down + this.pauser.idle(); + } + } switch (acquireResult) { case Timeout: - throw new Exception("timed out waiting for lock"); + throw new ActiveMQLockAcquisitionTimeoutException("timed out waiting for lock"); case Exit: this.interrupted = false; throw new InterruptedException("LeaseLock was interrupted"); @@ -279,6 +308,20 @@ public final class JdbcNodeManager extends NodeManager { } + private static long remainingNanos(long start, long timeoutNanos) { + if (timeoutNanos > 0) { + final long elapsedNanos = (System.nanoTime() - start); + if (elapsedNanos < timeoutNanos) { + return timeoutNanos - elapsedNanos; + } else { + return 0; + } + } else { + assert timeoutNanos == -1; + return -1; + } + } + private void checkInterrupted(Supplier message) throws InterruptedException { if (this.interrupted) { interrupted = false; @@ -286,52 +329,77 @@ public final class JdbcNodeManager extends NodeManager { } } - private void renewLiveLockIfNeeded(final long acquiredOn) { - final long acquiredMillis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - acquiredOn); - if (acquiredMillis > this.scheduledLiveLock.renewPeriodMillis()) { - if (!this.scheduledLiveLock.lock().renew()) { - final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); - ioCriticalErrorListener.onIOException(e, "live lock can't be renewed", null); - throw e; + private void renewLock(ScheduledLeaseLock lock) { + boolean lostLock = true; + IllegalStateException renewEx = null; + try { + lostLock = !this.scheduledLiveLock.lock().renew(); + } catch (IllegalStateException e) { + renewEx = e; + } + if (lostLock) { + notifyLostLock(); + if (renewEx == null) { + renewEx = new IllegalStateException(lock.lockName() + " lock isn't renewed"); } + throw renewEx; } } /** * Lock live node and check for a live state, taking care to renew it (if needed) or releasing it otherwise */ - private boolean lockLiveAndCheckLiveState() throws Exception { - lock(this.scheduledLiveLock.lock()); - final long acquiredOn = System.nanoTime(); - boolean liveWhileLocked = false; - //check if the state is live - final SharedStateManager.State stateWhileLocked; + private boolean lockLiveAndCheckLiveState() throws ActiveMQLockAcquisitionTimeoutException, InterruptedException { try { - stateWhileLocked = readSharedState(); - } catch (Throwable t) { - LOGGER.error("error while holding the live node lock and tried to read the shared state", t); - this.scheduledLiveLock.lock().release(); - throw t; + lock(this.scheduledLiveLock.lock()); + //check if the state is live + while (true) { + try { + final SharedStateManager.State stateWhileLocked = readSharedState(); + final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime(); + if (System.currentTimeMillis() > localExpirationTime) { + // the lock can be assumed to be expired, + // so the state isn't worthy to be considered + return false; + } + if (stateWhileLocked == SharedStateManager.State.LIVE) { + // TODO need some tolerance//renew here? + return true; + } else { + // state is not live: can (try to) release the lock + this.scheduledLiveLock.lock().release(); + return false; + } + } catch (IllegalStateException e) { + LOGGER.error("error while holding the live node lock and tried to read the shared state or to release the lock", e); + checkStarted(); + checkInterrupted(() -> "interrupt on error while checking live state"); + pauser.idle(); + final long localExpirationTime = this.scheduledLiveLock.lock().localExpirationTime(); + if (System.currentTimeMillis() > localExpirationTime) { + return false; + } + } + } + } catch (InterruptedException e) { + throw e; } - if (stateWhileLocked == SharedStateManager.State.LIVE) { - renewLiveLockIfNeeded(acquiredOn); - liveWhileLocked = true; - } else { - LOGGER.debugf("state is %s while holding the live lock: releasing live lock", stateWhileLocked); - //state is not live: can (try to) release the lock - this.scheduledLiveLock.lock().release(); - } - return liveWhileLocked; } @Override - public void awaitLiveNode() throws Exception { + public void awaitLiveNode() throws NodeManagerException, InterruptedException { + checkStarted(); LOGGER.debug("ENTER awaitLiveNode"); try { boolean liveWhileLocked = false; while (!liveWhileLocked) { //check first without holding any lock - final SharedStateManager.State state = readSharedState(); + SharedStateManager.State state = null; + try { + state = readSharedState(); + } catch (IllegalStateException e) { + LOGGER.warn("Errored while reading shared state", e); + } if (state == SharedStateManager.State.LIVE) { //verify if the state is live while holding the live node lock too liveWhileLocked = lockLiveAndCheckLiveState(); @@ -339,6 +407,7 @@ public final class JdbcNodeManager extends NodeManager { LOGGER.debugf("state while awaiting live node: %s", state); } if (!liveWhileLocked) { + checkStarted(); checkInterrupted(() -> "awaitLiveNode got interrupted!"); pauser.idle(); } @@ -346,32 +415,51 @@ public final class JdbcNodeManager extends NodeManager { //state is LIVE and live lock is acquired and valid LOGGER.debugf("acquired live node lock while state is %s: starting scheduledLiveLock", SharedStateManager.State.LIVE); this.scheduledLiveLock.start(); + } catch (InterruptedException e) { + throw e; + } catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) { + throw new NodeManagerException(e); } finally { LOGGER.debug("EXIT awaitLiveNode"); } } @Override - public void startBackup() throws Exception { + public void startBackup() throws NodeManagerException, InterruptedException { + checkStarted(); LOGGER.debug("ENTER startBackup"); try { ActiveMQServerLogger.LOGGER.waitingToBecomeBackup(); - lock(scheduledBackupLock.lock()); scheduledBackupLock.start(); ActiveMQServerLogger.LOGGER.gotBackupLock(); if (getUUID() == null) readNodeId(); + } catch (InterruptedException ie) { + throw ie; + } catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) { + throw new NodeManagerException(e); } finally { LOGGER.debug("EXIT startBackup"); } } @Override - public ActivateCallback startLiveNode() throws Exception { + public ActivateCallback startLiveNode() throws NodeManagerException, InterruptedException { + checkStarted(); LOGGER.debug("ENTER startLiveNode"); try { - setFailingBack(); + boolean done = false; + while (!done) { + try { + setFailingBack(); + done = true; + } catch (IllegalStateException e) { + LOGGER.warn("cannot set failing back state, retry", e); + pauser.idle(); + checkInterrupted(() -> "interrupt while trying to set failing back state"); + } + } final String timeoutMessage = lockAcquisitionTimeoutMillis == -1 ? "indefinitely" : lockAcquisitionTimeoutMillis + " milliseconds"; @@ -389,21 +477,42 @@ public final class JdbcNodeManager extends NodeManager { LOGGER.debug("ENTER activationComplete"); try { //state can be written only if the live renew task is running - setLive(); - } catch (Exception e) { + boolean done = false; + while (!done) { + try { + setLive(); + done = true; + } catch (IllegalStateException e) { + LOGGER.warn("Errored while trying to setLive", e); + checkStarted(); + pauser.idle(); + final long localExpirationTime = scheduledLiveLock.lock().localExpirationTime(); + // optimistic: is just to set a deadline while retrying + if (System.currentTimeMillis() > localExpirationTime) { + throw new IllegalStateException("live lock is probably expired: failed to setLive"); + } + } + } + } catch (IllegalStateException e) { ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e); + throw new NodeManagerException(e); } finally { LOGGER.debug("EXIT activationComplete"); } } }; + } catch (InterruptedException ie) { + throw ie; + } catch (ActiveMQLockAcquisitionTimeoutException | IllegalStateException e) { + throw new NodeManagerException(e); } finally { LOGGER.debug("EXIT startLiveNode"); } } @Override - public void pauseLiveServer() throws Exception { + public void pauseLiveServer() throws NodeManagerException { + checkStarted(); LOGGER.debug("ENTER pauseLiveServer"); try { if (scheduledLiveLock.isStarted()) { @@ -413,23 +522,21 @@ public final class JdbcNodeManager extends NodeManager { scheduledLiveLock.lock().release(); } else { LOGGER.debug("scheduledLiveLock is not running: try renew live lock"); - if (scheduledLiveLock.lock().renew()) { - LOGGER.debug("live lock renewed: set paused shared state and release live lock"); - setPaused(); - scheduledLiveLock.lock().release(); - } else { - final IllegalStateException e = new IllegalStateException("live lock can't be renewed"); - ioCriticalErrorListener.onIOException(e, "live lock can't be renewed on pauseLiveServer", null); - throw e; - } + renewLock(scheduledLiveLock); + LOGGER.debug("live lock renewed: set paused shared state and release live lock"); + setPaused(); + scheduledLiveLock.lock().release(); } + } catch (IllegalStateException e) { + throw new NodeManagerException(e); } finally { LOGGER.debug("EXIT pauseLiveServer"); } } @Override - public void crashLiveServer() throws Exception { + public void crashLiveServer() throws NodeManagerException { + checkStarted(); LOGGER.debug("ENTER crashLiveServer"); try { if (this.scheduledLiveLock.isStarted()) { @@ -446,10 +553,18 @@ public final class JdbcNodeManager extends NodeManager { @Override public void awaitLiveStatus() { + checkStarted(); LOGGER.debug("ENTER awaitLiveStatus"); try { - while (readSharedState() != SharedStateManager.State.LIVE) { + SharedStateManager.State state = null; + while (state != SharedStateManager.State.LIVE) { + try { + state = readSharedState(); + } catch (IllegalStateException e) { + LOGGER.warn("Errored while trying to read shared state", e); + } pauser.idle(); + checkStarted(); } } finally { LOGGER.debug("EXIT awaitLiveStatus"); @@ -481,6 +596,7 @@ public final class JdbcNodeManager extends NodeManager { @Override public SimpleString readNodeId() { + checkStarted(); final UUID nodeId = this.sharedStateManager.readNodeId(); LOGGER.debugf("readNodeId nodeId = %s", nodeId); setUUID(nodeId); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java index 06f7e2a9b7..df43c6b3ea 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcSharedStateManager.java @@ -39,6 +39,7 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS private static final int MAX_SETUP_ATTEMPTS = 20; private final String holderId; private final long lockExpirationMillis; + private final long queryTimeoutMillis; private JdbcLeaseLock liveLock; private JdbcLeaseLock backupLock; private String readNodeId; @@ -48,10 +49,19 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS private String writeState; public static JdbcSharedStateManager usingConnectionProvider(String holderId, - long locksExpirationMillis, - JDBCConnectionProvider connectionProvider, - SQLProvider provider) { - final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis); + long locksExpirationMillis, + JDBCConnectionProvider connectionProvider, + SQLProvider provider) { + return usingConnectionProvider(holderId, locksExpirationMillis, -1, connectionProvider, provider); + } + + public static JdbcSharedStateManager usingConnectionProvider(String holderId, + long locksExpirationMillis, + long queryTimeoutMillis, + JDBCConnectionProvider connectionProvider, + SQLProvider provider) { + final JdbcSharedStateManager sharedStateManager = new JdbcSharedStateManager(holderId, locksExpirationMillis, + queryTimeoutMillis); sharedStateManager.setJdbcConnectionProvider(connectionProvider); sharedStateManager.setSqlProvider(provider); try { @@ -76,20 +86,35 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS JDBCConnectionProvider connectionProvider, SQLProvider sqlProvider, long expirationMillis) { - return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "LIVE"); + return createLiveLock(holderId, connectionProvider, sqlProvider, expirationMillis, -1); + } + + static JdbcLeaseLock createLiveLock(String holderId, + JDBCConnectionProvider connectionProvider, + SQLProvider sqlProvider, + long expirationMillis, + long queryTimeoutMillis) { + return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireLiveLockSQL(), + sqlProvider.tryReleaseLiveLockSQL(), sqlProvider.renewLiveLockSQL(), + sqlProvider.isLiveLockedSQL(), sqlProvider.currentTimestampSQL(), + expirationMillis, queryTimeoutMillis, "LIVE"); } static JdbcLeaseLock createBackupLock(String holderId, JDBCConnectionProvider connectionProvider, SQLProvider sqlProvider, - long expirationMillis) { - return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), expirationMillis, "BACKUP"); + long expirationMillis, + long queryTimeoutMillis) { + return new JdbcLeaseLock(holderId, connectionProvider, sqlProvider.tryAcquireBackupLockSQL(), + sqlProvider.tryReleaseBackupLockSQL(), sqlProvider.renewBackupLockSQL(), + sqlProvider.isBackupLockedSQL(), sqlProvider.currentTimestampSQL(), + expirationMillis, queryTimeoutMillis, "BACKUP"); } @Override protected void prepareStatements() { - this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis); - this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis); + this.liveLock = createLiveLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis); + this.backupLock = createBackupLock(this.holderId, this.connectionProvider, sqlProvider, lockExpirationMillis, queryTimeoutMillis); this.readNodeId = sqlProvider.readNodeIdSQL(); this.writeNodeId = sqlProvider.writeNodeIdSQL(); this.initializeNodeId = sqlProvider.initializeNodeIdSQL(); @@ -97,9 +122,10 @@ final class JdbcSharedStateManager extends AbstractJDBCDriver implements SharedS this.readState = sqlProvider.readStateSQL(); } - private JdbcSharedStateManager(String holderId, long lockExpirationMillis) { + private JdbcSharedStateManager(String holderId, long lockExpirationMillis, long queryTimeoutMillis) { this.holderId = holderId; this.lockExpirationMillis = lockExpirationMillis; + this.queryTimeoutMillis = queryTimeoutMillis; } @Override diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java index 8deda12898..8f2de112fd 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/LeaseLock.java @@ -41,6 +41,8 @@ interface LeaseLock extends AutoCloseable { boolean keepRunning(); } + long localExpirationTime(); + interface Pauser { void idle(); diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java index 43751f8671..8590fc6108 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/jdbc/ScheduledLeaseLock.java @@ -19,8 +19,8 @@ package org.apache.activemq.artemis.core.server.impl.jdbc; import java.util.concurrent.ScheduledExecutorService; -import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.server.ActiveMQComponent; +import org.apache.activemq.artemis.core.server.NodeManager.LockListener; import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; /** @@ -28,17 +28,25 @@ import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; */ interface ScheduledLeaseLock extends ActiveMQComponent { + @Override + void start(); + + @Override + void stop(); + LeaseLock lock(); long renewPeriodMillis(); + String lockName(); + static ScheduledLeaseLock of(ScheduledExecutorService scheduledExecutorService, ArtemisExecutor executor, String lockName, LeaseLock lock, long renewPeriodMillis, - IOCriticalErrorListener ioCriticalErrorListener) { - return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, ioCriticalErrorListener); + LockListener lockListener) { + return new ActiveMQScheduledLeaseLock(scheduledExecutorService, executor, lockName, lock, renewPeriodMillis, lockListener); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java index 77f488e551..03febc2b20 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcLeaseLockTest.java @@ -21,15 +21,23 @@ import java.util.Arrays; import java.util.List; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Stream; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.core.server.NodeManager.LockListener; import org.apache.activemq.artemis.jdbc.store.drivers.JDBCUtils; import org.apache.activemq.artemis.jdbc.store.sql.SQLProvider; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; +import org.apache.activemq.artemis.utils.Wait; +import org.apache.activemq.artemis.utils.actors.ArtemisExecutor; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -38,6 +46,9 @@ import org.junit.runner.RunWith; import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameter; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.core.Is.is; + @RunWith(Parameterized.class) public class JdbcLeaseLockTest extends ActiveMQTestBase { @@ -183,7 +194,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { @Test public void shouldAcquireExpiredLock() throws InterruptedException { - final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + final LeaseLock lock = lock(10); Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); try { Thread.sleep(lock.expirationMillis() * 2); @@ -197,13 +208,13 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { @Test public void shouldOtherAcquireExpiredLock() throws InterruptedException { - final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + final LeaseLock lock = lock(10); Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); try { Thread.sleep(lock.expirationMillis() * 2); Assert.assertFalse("lock is already expired", lock.isHeldByCaller()); Assert.assertFalse("lock is already expired", lock.isHeld()); - final LeaseLock otherLock = lock(TimeUnit.SECONDS.toMillis(10)); + final LeaseLock otherLock = lock(10); try { Assert.assertTrue("lock is already expired", otherLock.tryAcquire()); } finally { @@ -237,7 +248,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { @Test public void shouldRenewExpiredLockNotAcquiredByOthers() throws InterruptedException { - final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + final LeaseLock lock = lock(10); Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); try { Thread.sleep(lock.expirationMillis() * 2); @@ -251,7 +262,7 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { @Test public void shouldNotRenewLockAcquiredByOthers() throws InterruptedException { - final LeaseLock lock = lock(TimeUnit.SECONDS.toMillis(1)); + final LeaseLock lock = lock(10); Assert.assertTrue("lock is not owned by anyone", lock.tryAcquire()); try { Thread.sleep(lock.expirationMillis() * 2); @@ -268,5 +279,97 @@ public class JdbcLeaseLockTest extends ActiveMQTestBase { lock.release(); } } + + @Test + public void shouldNotNotifyLostLock() throws Exception { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService); + final ArtemisExecutor artemisExecutor = factory.getExecutor(); + final AtomicLong lostLock = new AtomicLong(); + final LockListener lockListener = () -> { + lostLock.incrementAndGet(); + }; + final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock + .of(scheduledExecutorService, artemisExecutor, + "test", lock(), dbConf.getJdbcLockRenewPeriodMillis(), lockListener); + + Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire()); + scheduledLeaseLock.start(); + Assert.assertEquals(0, lostLock.get()); + scheduledLeaseLock.stop(); + Assert.assertEquals(0, lostLock.get()); + executorService.shutdown(); + scheduledExecutorService.shutdown(); + scheduledLeaseLock.lock().release(); + } + + + @Test + public void shouldNotifyManyTimesLostLock() throws Exception { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService); + final ArtemisExecutor artemisExecutor = factory.getExecutor(); + final AtomicLong lostLock = new AtomicLong(); + final LockListener lockListener = () -> { + lostLock.incrementAndGet(); + }; + final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock + .of(scheduledExecutorService, artemisExecutor, + "test", lock(TimeUnit.SECONDS.toMillis(1)), 100, lockListener); + + Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire()); + scheduledLeaseLock.start(); + // should let the renew to happen at least 1 time, excluding the time to start a scheduled task + TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis()); + Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller()); + Assert.assertEquals(0, lostLock.get()); + scheduledLeaseLock.lock().release(); + Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller()); + TimeUnit.MILLISECONDS.sleep(2 * scheduledLeaseLock.renewPeriodMillis()); + Assert.assertThat(lostLock.get(), is(greaterThanOrEqualTo(2L))); + scheduledLeaseLock.stop(); + executorService.shutdown(); + scheduledExecutorService.shutdown(); + } + + @Test + public void shouldNotifyOnceLostLockIfStopped() throws Exception { + final ExecutorService executorService = Executors.newSingleThreadExecutor(); + final ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + final OrderedExecutorFactory factory = new OrderedExecutorFactory(executorService); + final ArtemisExecutor artemisExecutor = factory.getExecutor(); + final AtomicLong lostLock = new AtomicLong(); + final AtomicReference lock = new AtomicReference<>(); + final AtomicReference stopErrors = new AtomicReference<>(); + final LockListener lockListener = () -> { + lostLock.incrementAndGet(); + try { + lock.get().stop(); + } catch (Throwable e) { + stopErrors.set(e); + } + }; + final ScheduledLeaseLock scheduledLeaseLock = ScheduledLeaseLock + .of(scheduledExecutorService, artemisExecutor, "test", lock(TimeUnit.SECONDS.toMillis(1)), + 100, lockListener); + lock.set(scheduledLeaseLock); + Assert.assertTrue(scheduledLeaseLock.lock().tryAcquire()); + lostLock.set(0); + scheduledLeaseLock.start(); + Assert.assertTrue(scheduledLeaseLock.lock().isHeldByCaller()); + scheduledLeaseLock.lock().release(); + Assert.assertFalse(scheduledLeaseLock.lock().isHeldByCaller()); + Wait.assertTrue(() -> lostLock.get() > 0); + Assert.assertFalse(scheduledLeaseLock.isStarted()); + // wait enough to see if it get triggered again + TimeUnit.MILLISECONDS.sleep(scheduledLeaseLock.renewPeriodMillis()); + Assert.assertEquals(1, lostLock.getAndSet(0)); + Assert.assertNull(stopErrors.getAndSet(null)); + scheduledLeaseLock.stop(); + executorService.shutdown(); + scheduledExecutorService.shutdown(); + } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java index 762e051c03..47e76609de 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/server/impl/jdbc/JdbcNodeManagerTest.java @@ -23,7 +23,6 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -105,17 +104,9 @@ public class JdbcNodeManagerTest extends ActiveMQTestBase { @Test public void shouldStartAndStopGracefullyTest() throws Exception { - final AtomicReference criticalError = new AtomicReference<>(); - final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null, (code, message, file) -> criticalError.lazySet(message)); - try { - nodeManager.start(); - } finally { - nodeManager.stop(); - final String error = criticalError.get(); - if (error != null) { - Assert.fail(error); - } - } + final JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, leaseLockExecutor, null); + nodeManager.start(); + nodeManager.stop(); } } diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index be6d2429bc..3f5436a81c 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -522,6 +522,7 @@ public abstract class ActiveMQTestBase extends Assert { dbStorageConfiguration.setJdbcLockAcquisitionTimeoutMillis(getJdbcLockAcquisitionTimeoutMillis()); dbStorageConfiguration.setJdbcLockExpirationMillis(getJdbcLockExpirationMillis()); dbStorageConfiguration.setJdbcLockRenewPeriodMillis(getJdbcLockRenewPeriodMillis()); + dbStorageConfiguration.setJdbcNetworkTimeout(-1); return dbStorageConfiguration; } @@ -530,11 +531,11 @@ public abstract class ActiveMQTestBase extends Assert { } protected long getJdbcLockExpirationMillis() { - return Long.getLong("jdbc.lock.expiration", ActiveMQDefaultConfiguration.getDefaultJdbcLockExpirationMillis()); + return Long.getLong("jdbc.lock.expiration", 4_000); } protected long getJdbcLockRenewPeriodMillis() { - return Long.getLong("jdbc.lock.renew", ActiveMQDefaultConfiguration.getDefaultJdbcLockRenewPeriodMillis()); + return Long.getLong("jdbc.lock.renew", 200); } public void destroyTables(List tableNames) throws Exception { diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java index 548ec1da98..2aad98c1e7 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockMonitorTest.java @@ -1,120 +1,124 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.activemq.artemis.tests.extras.byteman; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.ScheduledThreadPoolExecutor; - -import org.apache.activemq.artemis.core.server.ActivateCallback; -import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; -import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager.LockListener; -import org.apache.activemq.artemis.utils.Wait; -import org.jboss.byteman.contrib.bmunit.BMRule; -import org.jboss.byteman.contrib.bmunit.BMRules; -import org.jboss.byteman.contrib.bmunit.BMUnitRunner; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; - -@RunWith(BMUnitRunner.class) -public class FileLockMonitorTest { - - private File sharedDir; - private volatile boolean lostLock = false; - private volatile FileLockNodeManager nodeManager; - private ScheduledThreadPoolExecutor executor; - - @Before - public void handleLockFile() throws IOException { - sharedDir = File.createTempFile("shared-dir", ""); - sharedDir.delete(); - Assert.assertTrue(sharedDir.mkdir()); - lostLock = false; - } - - @Test - @BMRules(rules = { - @BMRule(name = "lock is invalid", targetClass = "sun.nio.ch.FileLockImpl", targetMethod = "isValid", action = "return false;") }) - public void testLockMonitorInvalid() throws Exception { - lostLock = false; - startServer(); - Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100); - nodeManager.isStarted(); - nodeManager.crashLiveServer(); - executor.shutdown(); - } - - @Test - @BMRules(rules = { - @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "throw new java.io.IOException(\"EFS is disconnected\");") }) - public void testLockMonitorIOException() throws Exception { - lostLock = false; - startServer(); - Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100); - nodeManager.crashLiveServer(); - executor.shutdown(); - } - - @Test - public void testLockMonitorHasCorrectLockAndState() throws Exception { - lostLock = false; - startServer(); - Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100)); - nodeManager.crashLiveServer(); - executor.shutdown(); - } - - @Test - @BMRules(rules = { - @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") }) - public void testLockMonitorHasLockWrongState() throws Exception { - lostLock = false; - startServer(); - Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100)); - nodeManager.crashLiveServer(); - executor.shutdown(); - } - - public LockListener startServer() throws Exception { - executor = new ScheduledThreadPoolExecutor(2); - nodeManager = new FileLockNodeManager(sharedDir, false, executor); - LockListener listener = nodeManager.new LockListener() { - - @Override - protected void lostLock() throws Exception { - lostLock = true; - nodeManager.crashLiveServer(); - } - - }; - nodeManager.registerLockListener(listener); - - try { - nodeManager.start(); - ActivateCallback startLiveNode = nodeManager.startLiveNode(); - startLiveNode.activationComplete(); - - } catch (Exception exception) { - exception.printStackTrace(); - } - - return listener; - } -} +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.extras.byteman; + +import java.io.File; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +import org.apache.activemq.artemis.core.server.ActivateCallback; +import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.NodeManager.LockListener; +import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; +import org.apache.activemq.artemis.utils.Wait; +import org.jboss.byteman.contrib.bmunit.BMRule; +import org.jboss.byteman.contrib.bmunit.BMRules; +import org.jboss.byteman.contrib.bmunit.BMUnitRunner; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; + +@RunWith(BMUnitRunner.class) +public class FileLockMonitorTest { + + private File sharedDir; + private volatile boolean lostLock = false; + private volatile FileLockNodeManager nodeManager; + private ScheduledThreadPoolExecutor executor; + + @Before + public void handleLockFile() throws Exception { + sharedDir = File.createTempFile("shared-dir", ""); + sharedDir.delete(); + Assert.assertTrue(sharedDir.mkdir()); + } + + @Test + @BMRules(rules = { + @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "isLiveLockLost", action = "return true;") }) + public void testLockMonitorInvalid() throws Exception { + lostLock = false; + startServer(); + Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 20_000, 100); + nodeManager.isStarted(); + nodeManager.crashLiveServer(); + executor.shutdown(); + } + + public static void throwNodeManagerException(String msg) { + throw new NodeManager.NodeManagerException(msg); + } + + @Test + @BMRules(rules = { + @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", + targetMethod = "getState", + action = "org.apache.activemq.artemis.tests.extras.byteman.FileLockMonitorTest.throwNodeManagerException(\"EFS is disconnected\");") }) + public void testLockMonitorIOException() throws Exception { + lostLock = false; + startServer(); + Wait.assertTrue("The FileLockNodeManager should have lost the lock", () -> lostLock, 5000, 100); + nodeManager.crashLiveServer(); + executor.shutdown(); + } + + @Test + public void testLockMonitorHasCorrectLockAndState() throws Exception { + lostLock = false; + startServer(); + Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100)); + nodeManager.crashLiveServer(); + executor.shutdown(); + } + + @Test + @BMRules(rules = { + @BMRule(name = "lock is invalid", targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", targetMethod = "getState", action = "return 70;") }) + public void testLockMonitorHasLockWrongState() throws Exception { + lostLock = false; + startServer(); + Assert.assertFalse("The FileLockNodeManager should not have lost the lock", Wait.waitFor(() -> lostLock, 5000, 100)); + nodeManager.crashLiveServer(); + executor.shutdown(); + } + + public LockListener startServer() throws Exception { + executor = new ScheduledThreadPoolExecutor(2); + nodeManager = new FileLockNodeManager(sharedDir, false, executor); + LockListener listener = () -> { + lostLock = true; + try { + nodeManager.crashLiveServer(); + } catch (Throwable t) { + t.printStackTrace(); + } + }; + nodeManager.registerLockListener(listener); + + try { + nodeManager.start(); + ActivateCallback startLiveNode = nodeManager.startLiveNode(); + startLiveNode.activationComplete(); + + } catch (Exception exception) { + exception.printStackTrace(); + } + + return listener; + } +} diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java index 519cb63da7..148755f69a 100644 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java +++ b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/FileLockNodeManagerTest.java @@ -78,7 +78,7 @@ public class FileLockNodeManagerTest { manager.awaitLiveNode(); } catch (Exception e) { long stop = System.currentTimeMillis(); - if (!"timed out waiting for lock".equals(e.getMessage())) { + if (!"timed out waiting for lock".equals(e.getCause().getMessage())) { throw e; } return stop - start; diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/SharedStoreBackupActivationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/SharedStoreBackupActivationTest.java deleted file mode 100644 index ae763c10bf..0000000000 --- a/tests/extra-tests/src/test/java/org/apache/activemq/artemis/tests/extras/byteman/SharedStoreBackupActivationTest.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.activemq.artemis.tests.extras.byteman; - -import java.io.File; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.activemq.artemis.api.core.TransportConfiguration; -import org.apache.activemq.artemis.core.config.ScaleDownConfiguration; -import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration; -import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; -import org.apache.activemq.artemis.core.server.NodeManager; -import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; -import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; -import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; -import org.jboss.byteman.contrib.bmunit.BMRule; -import org.jboss.byteman.contrib.bmunit.BMRules; -import org.jboss.byteman.contrib.bmunit.BMUnitRunner; -import org.jboss.logging.Logger; -import org.junit.Assert; -import org.junit.Test; -import org.junit.runner.RunWith; - -@RunWith(BMUnitRunner.class) -public class SharedStoreBackupActivationTest extends FailoverTestBase { - - private static Logger logger = Logger.getLogger(SharedStoreBackupActivationTest.class); - - private static volatile boolean throwException = false; - private static CountDownLatch exceptionThrownLatch; - - public static synchronized boolean isThrowException() { - logger.debugf("Throwing IOException during FileLockNodeManager.tryLock(): %s", throwException); - if (exceptionThrownLatch != null) { - exceptionThrownLatch.countDown(); - } - return throwException; - } - - /** - * Waits for the backup server to call FileLockNodeManager.tryLock(). - */ - public static void awaitTryLock(boolean throwException) throws InterruptedException { - synchronized (SharedStoreBackupActivationTest.class) { - SharedStoreBackupActivationTest.throwException = throwException; - exceptionThrownLatch = new CountDownLatch(1); - } - logger.debugf("Awaiting backup to perform FileLockNodeManager.tryLock()"); - boolean ret = exceptionThrownLatch.await(10, TimeUnit.SECONDS); - SharedStoreBackupActivationTest.throwException = false; - - Assert.assertTrue("FileLockNodeManager.tryLock() was not called during specified timeout", ret); - logger.debugf("Awaiting FileLockNodeManager.tryLock() done"); - } - - @Test - @BMRules( - rules = {@BMRule( - name = "throw IOException during activation", - targetClass = "org.apache.activemq.artemis.core.server.impl.FileLockNodeManager", - targetMethod = "tryLock", - targetLocation = "AT ENTRY", - condition = "org.apache.activemq.artemis.tests.extras.byteman.SharedStoreBackupActivationTest.isThrowException()", - action = "THROW new IOException(\"IO Error\");") - }) - public void testFailOverAfterTryLockException() throws Exception { - Assert.assertTrue(liveServer.isActive()); - Assert.assertFalse(backupServer.isActive()); - - // wait for backup to try to acquire lock, once without exception (acquiring will not succeed because live is - // still active) - awaitTryLock(false); - - // wait for backup to try to acquire lock, this time throw an IOException - logger.debug("Causing exception"); - awaitTryLock(true); - - // stop live server - logger.debugf("Stopping live server"); - liveServer.stop(); - waitForServerToStop(liveServer.getServer()); - logger.debugf("Live server stopped, waiting for backup activation"); - backupServer.getServer().waitForActivation(10, TimeUnit.SECONDS); - - // backup should be activated by now - Assert.assertFalse(liveServer.isActive()); - Assert.assertTrue("Backup server didn't activate", backupServer.isActive()); - } - - @Override - protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { - return TransportConfigurationUtils.getInVMAcceptor(live); - } - - @Override - protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { - return TransportConfigurationUtils.getInVMConnector(live); - } - - @Override - protected void createConfigs() throws Exception { - File sharedDir = File.createTempFile("shared-dir", ""); - sharedDir.delete(); - Assert.assertTrue(sharedDir.mkdir()); - logger.debugf("Created shared store directory %s", sharedDir.getCanonicalPath()); - - TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); - TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); - - // nodes must use separate FileLockNodeManager instances! - NodeManager liveNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1)); - NodeManager backupNodeManager = new FileLockNodeManager(sharedDir, false, new ScheduledThreadPoolExecutor(1)); - - backupConfig = super.createDefaultConfig(false) - .clearAcceptorConfigurations() - .addAcceptorConfiguration(getAcceptorTransportConfiguration(false)) - .setHAPolicyConfiguration( - new SharedStoreSlavePolicyConfiguration() - .setScaleDownConfiguration(new ScaleDownConfiguration().setEnabled(false)) - .setRestartBackup(false)) - .addConnectorConfiguration(liveConnector.getName(), liveConnector) - .addConnectorConfiguration(backupConnector.getName(), backupConnector) - .addClusterConfiguration(basicClusterConnectionConfig(backupConnector.getName(), liveConnector.getName())); - backupServer = createTestableServer(backupConfig, backupNodeManager); - - liveConfig = super.createDefaultConfig(false) - .clearAcceptorConfigurations() - .addAcceptorConfiguration(getAcceptorTransportConfiguration(true)) - .setHAPolicyConfiguration(new SharedStoreMasterPolicyConfiguration().setFailoverOnServerShutdown(true)) - .addClusterConfiguration(basicClusterConnectionConfig(liveConnector.getName())) - .addConnectorConfiguration(liveConnector.getName(), liveConnector); - liveServer = createTestableServer(liveConfig, liveNodeManager); - } - -} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/RealNodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/FileLockNodeManagerTest.java similarity index 66% rename from tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/RealNodeManagerTest.java rename to tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/FileLockNodeManagerTest.java index 1dfe48bb71..be2dc7dbd4 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/RealNodeManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/FileLockNodeManagerTest.java @@ -18,34 +18,23 @@ package org.apache.activemq.artemis.tests.integration.cluster; import java.io.File; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; -import org.apache.activemq.artemis.core.server.NodeManager; import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; -import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.utils.SpawnedVMSupport; -import org.apache.activemq.artemis.utils.UUID; import org.junit.Assert; -import org.junit.Test; -public class RealNodeManagerTest extends NodeManagerTest { - - @Test - public void testId() throws Exception { - NodeManager nodeManager = new FileLockNodeManager(new File(getTemporaryDir()), false); - nodeManager.start(); - UUID id1 = nodeManager.getUUID(); - nodeManager.stop(); - nodeManager.start(); - ActiveMQTestBase.assertEqualsByteArrays(id1.asBytes(), nodeManager.getUUID().asBytes()); - nodeManager.stop(); - } +public class FileLockNodeManagerTest extends NodeManagerTest { @Override public void performWork(NodeManagerAction... actions) throws Exception { List processes = new ArrayList<>(); for (NodeManagerAction action : actions) { - Process p = SpawnedVMSupport.spawnVM(NodeManagerAction.class.getName(), "-Xms512m", "-Xmx512m", new String[0], true, true, true, action.getWork()); + final String[] args = new String[action.works() + 1]; + args[0] = getTemporaryDir(); + action.getWork(args, 1); + Process p = SpawnedVMSupport.spawnVM(this.getClass().getName(), "-Xms50m", "-Xmx512m", new String[0], true, true, true, args); processes.add(p); } for (Process process : processes) { @@ -58,4 +47,8 @@ public class RealNodeManagerTest extends NodeManagerTest { } } + + public static void main(String[] args) throws Exception { + NodeManagerAction.execute(Arrays.copyOfRange(args, 1, args.length), new FileLockNodeManager(new File(args[0]), false)); + } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/JdbcNodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/JdbcNodeManagerTest.java new file mode 100644 index 0000000000..8c0ea8e6c0 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/JdbcNodeManagerTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.artemis.core.config.storage.DatabaseStorageConfiguration; +import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.impl.jdbc.JdbcNodeManager; +import org.apache.activemq.artemis.utils.ExecutorFactory; +import org.apache.activemq.artemis.utils.actors.OrderedExecutorFactory; +import org.junit.Assert; + +public class JdbcNodeManagerTest extends NodeManagerTest { + + @Override + public void performWork(NodeManagerAction... actions) throws Exception { + List nodeRunners = new ArrayList<>(); + final ThreadFactory daemonThreadFactory = t -> { + final Thread th = new Thread(t); + th.setDaemon(true); + return th; + }; + Thread[] threads = new Thread[actions.length]; + List executors = new ArrayList<>(actions.length); + List nodeManagers = new ArrayList<>(actions.length * 2); + AtomicBoolean failedRenew = new AtomicBoolean(false); + for (NodeManagerAction action : actions) { + final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(daemonThreadFactory); + final ExecutorService executor = Executors.newFixedThreadPool(2, daemonThreadFactory); + final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration(); + final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); + JdbcNodeManager nodeManager = JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory); + nodeManager.start(); + NodeRunner nodeRunner = new NodeRunner(nodeManager, action); + nodeRunners.add(nodeRunner); + nodeManagers.add(nodeManager); + executors.add(scheduledExecutorService); + executors.add(executor); + } + for (int i = 0, nodeRunnersSize = nodeRunners.size(); i < nodeRunnersSize; i++) { + NodeRunner nodeRunner = nodeRunners.get(i); + threads[i] = new Thread(nodeRunner); + threads[i].start(); + } + boolean isDebug = isDebug(); + for (Thread thread : threads) { + try { + if (isDebug) { + thread.join(); + } else { + thread.join(60_000); + } + } catch (InterruptedException e) { + // + } + if (thread.isAlive()) { + thread.interrupt(); + fail("thread still running"); + } + } + // forcibly stop node managers + nodeManagers.forEach(nodeManager -> { + try { + nodeManager.stop(); + } catch (Exception e) { + // won't prevent the test to complete + e.printStackTrace(); + } + }); + + // stop executors + executors.forEach(ExecutorService::shutdownNow); + + for (NodeRunner nodeRunner : nodeRunners) { + if (nodeRunner.e != null) { + nodeRunner.e.printStackTrace(); + fail(nodeRunner.e.getMessage()); + } + } + Assert.assertFalse("Some of the lease locks has failed to renew the locks", failedRenew.get()); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java index 7d5632194b..6685aa559e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerAction.java @@ -16,10 +16,10 @@ */ package org.apache.activemq.artemis.tests.integration.cluster; -import java.io.File; +import java.util.Arrays; import org.apache.activemq.artemis.core.server.NodeManager; -import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager; +import org.apache.activemq.artemis.utils.UUID; public class NodeManagerAction { @@ -35,6 +35,7 @@ public class NodeManagerAction { public static final int HAS_BACKUP = 11; public static final int DOESNT_HAVE_LIVE = 12; public static final int DOESNT_HAVE_BACKUP = 13; + public static final int CHECK_ID = 14; private final int[] work; @@ -82,7 +83,6 @@ public class NodeManagerAction { } break; case HAS_BACKUP: - if (!hasBackupLock) { throw new IllegalStateException("backup lock not held"); } @@ -93,37 +93,50 @@ public class NodeManagerAction { } break; case DOESNT_HAVE_BACKUP: - if (hasBackupLock) { throw new IllegalStateException("backup lock held"); } break; + case CHECK_ID: + nodeManager.start(); + UUID id1 = nodeManager.getUUID(); + nodeManager.stop(); + nodeManager.start(); + if (!Arrays.equals(id1.asBytes(), nodeManager.getUUID().asBytes())) { + throw new IllegalStateException("getUUID should be the same on restart"); + } + break; } } } - public String[] getWork() { - String[] strings = new String[work.length]; - for (int i = 0, stringsLength = strings.length; i < stringsLength; i++) { - strings[i] = "" + work[i]; - } - return strings; + public int works() { + return work.length; } - public static void main(String[] args) throws Exception { + public int getWork(String[] works, int start) { + final int workLength = work.length; + for (int i = 0; i < workLength; i++) { + works[i + start] = Integer.toString(work[i]); + } + return workLength; + } + + public static void execute(String[] args, NodeManager nodeManager) throws Exception { int[] work1 = new int[args.length]; for (int i = 0; i < args.length; i++) { work1[i] = Integer.parseInt(args[i]); } NodeManagerAction nodeManagerAction = new NodeManagerAction(work1); - FileLockNodeManager nodeManager = new FileLockNodeManager(new File("."), false); nodeManager.start(); try { nodeManagerAction.performWork(nodeManager); } catch (Exception e) { e.printStackTrace(); System.exit(9); + } finally { + nodeManager.stop(); } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java index 4c6f6caeea..c484bf19ee 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/NodeManagerTest.java @@ -24,7 +24,9 @@ import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; import org.apache.activemq.artemis.tests.util.SpawnedTestBase; import org.junit.Test; +import static java.lang.management.ManagementFactory.getRuntimeMXBean; import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.AWAIT_LIVE; +import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CHECK_ID; import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.CRASH_LIVE; import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_BACKUP; import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerAction.DOESNT_HAVE_LIVE; @@ -38,6 +40,12 @@ import static org.apache.activemq.artemis.tests.integration.cluster.NodeManagerA public class NodeManagerTest extends SpawnedTestBase { + @Test + public void testID() throws Exception { + NodeManagerAction live1 = new NodeManagerAction(CHECK_ID); + performWork(live1); + } + @Test public void testLive() throws Exception { NodeManagerAction live1 = new NodeManagerAction(START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE, START_LIVE, HAS_LIVE, DOESNT_HAVE_BACKUP, CRASH_LIVE, DOESNT_HAVE_BACKUP, DOESNT_HAVE_LIVE); @@ -155,6 +163,10 @@ public class NodeManagerTest extends SpawnedTestBase { } } + protected static boolean isDebug() { + return getRuntimeMXBean().getInputArguments().toString().contains("jdwp"); + } + static class NodeRunner implements Runnable { private NodeManagerAction action; diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java index 301938100a..fc1b3b9bcb 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FileLockNodeManagerTest.java @@ -147,10 +147,7 @@ public class FileLockNodeManagerTest extends FailoverTestBase { executors.add(executor); final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration(); final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); - return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> { - code.printStackTrace(); - Assert.fail(message); - }); + return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory); case File: final Configuration config = createDefaultInVMConfig(); if (useSeparateLockFolder) { diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java index d32e46a674..1438d463ba 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/NettyFailoverTest.java @@ -116,10 +116,7 @@ public class NettyFailoverTest extends FailoverTest { executors.add(executor); final DatabaseStorageConfiguration dbConf = createDefaultDatabaseStorageConfiguration(); final ExecutorFactory executorFactory = new OrderedExecutorFactory(executor); - return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory, (code, message, file) -> { - code.printStackTrace(); - Assert.fail(message); - }); + return JdbcNodeManager.with(dbConf, scheduledExecutorService, executorFactory); default: throw new AssertionError("enum type not supported!"); } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java index a76922eabd..979954c53d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/CriticalCrashTest.java @@ -100,7 +100,7 @@ public class CriticalCrashTest extends SpawnedTestBase { @Override protected StorageManager createStorageManager() { - JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) { + JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) { @Override public void readLock() { super.readLock(); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java index 28d8db6c96..2c0a5177ea 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/critical/ShutdownOnCriticalIOErrorMoveNextTest.java @@ -91,7 +91,7 @@ public class ShutdownOnCriticalIOErrorMoveNextTest extends ActiveMQTestBase { @Override protected StorageManager createStorageManager() { - JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, shutdownOnCriticalIO) { + JournalStorageManager storageManager = new JournalStorageManager(conf, getCriticalAnalyzer(), executorFactory, scheduledPool, ioExecutorFactory, ioCriticalErrorListener) { @Override protected Journal createMessageJournal(Configuration config, diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java index 3182bdbe2c..850430929f 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/discovery/DiscoveryBaseTest.java @@ -175,38 +175,38 @@ public class DiscoveryBaseTest extends ActiveMQTestBase { protected final class FakeNodeManager extends NodeManager { public FakeNodeManager(String nodeID) { - super(false, null); + super(false); this.setNodeID(nodeID); } @Override - public void awaitLiveNode() throws Exception { + public void awaitLiveNode() { } @Override - public void awaitLiveStatus() throws Exception { + public void awaitLiveStatus() { } @Override - public void startBackup() throws Exception { + public void startBackup() { } @Override - public ActivateCallback startLiveNode() throws Exception { + public ActivateCallback startLiveNode() { return new CleaningActivateCallback() { }; } @Override - public void pauseLiveServer() throws Exception { + public void pauseLiveServer() { } @Override - public void crashLiveServer() throws Exception { + public void crashLiveServer() { } @Override - public void releaseBackup() throws Exception { + public void releaseBackup() { } @Override @@ -215,12 +215,12 @@ public class DiscoveryBaseTest extends ActiveMQTestBase { } @Override - public boolean isAwaitingFailback() throws Exception { + public boolean isAwaitingFailback() { return false; } @Override - public boolean isBackupLive() throws Exception { + public boolean isBackupLive() { return false; }