This commit is contained in:
franz1981 2021-09-06 18:14:29 +02:00
commit 4944016cb3
29 changed files with 1178 additions and 202 deletions

View File

@ -32,6 +32,7 @@ import org.apache.activemq.artemis.cli.commands.InvalidOptionsError;
import org.apache.activemq.artemis.cli.commands.Kill;
import org.apache.activemq.artemis.cli.commands.Mask;
import org.apache.activemq.artemis.cli.commands.PrintVersion;
import org.apache.activemq.artemis.cli.commands.activation.ActivationSequenceSet;
import org.apache.activemq.artemis.cli.commands.check.HelpCheck;
import org.apache.activemq.artemis.cli.commands.check.NodeCheck;
import org.apache.activemq.artemis.cli.commands.check.QueueCheck;
@ -52,6 +53,7 @@ import org.apache.activemq.artemis.cli.commands.queue.DeleteQueue;
import org.apache.activemq.artemis.cli.commands.queue.HelpQueue;
import org.apache.activemq.artemis.cli.commands.queue.PurgeQueue;
import org.apache.activemq.artemis.cli.commands.queue.UpdateQueue;
import org.apache.activemq.artemis.cli.commands.activation.ActivationSequenceList;
import org.apache.activemq.artemis.cli.commands.tools.HelpData;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.cli.commands.tools.RecoverMessages;
@ -171,6 +173,10 @@ public class Artemis {
withDefaultCommand(HelpAddress.class).withCommands(CreateAddress.class, DeleteAddress.class, UpdateAddress.class, ShowAddress.class);
if (instance != null) {
builder.withGroup("activation")
.withDescription("activation tools group (sync) (example ./artemis activation list)")
.withDefaultCommand(ActivationSequenceList.class)
.withCommands(ActivationSequenceList.class, ActivationSequenceSet.class);
builder.withGroup("data").withDescription("data tools group (print|imp|exp|encode|decode|compact) (example ./artemis data print)").
withDefaultCommand(HelpData.class).withCommands(RecoverMessages.class, PrintData.class, XmlDataExporter.class, XmlDataImporter.class, DecodeJournal.class, EncodeJournal.class, CompactJournal.class);
builder.withGroup("user").withDescription("default file-based user management (add|rm|list|reset) (example ./artemis user list)").

View File

@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.activation;
import java.io.PrintStream;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
import org.apache.activemq.artemis.quorum.MutableLong;
import static org.apache.activemq.artemis.cli.commands.activation.ActivationSequenceUtils.applyCoordinationId;
@Command(name = "list", description = "list local and/or coordinated activation sequences")
public class ActivationSequenceList extends LockAbstract {
private static final int MANAGER_START_TIMEOUT_SECONDS = 60;
@Option(name = "--node-id", description = "This can be used just with --remote option. If not set, broker NodeID is used instead")
public String nodeId = null;
@Option(name = "--remote", description = "List just coordinated activation sequence")
public boolean remote = false;
@Option(name = "--local", description = "List just local activation sequence")
public boolean local = false;
@Override
public Object execute(ActionContext context) throws Exception {
final Object output = super.execute(context);
execute(this, getFileConfiguration(), context.out);
return output;
}
public static final class ListResult {
public final String nodeId;
public final Long coordinatedActivationSequence;
public final Long localActivationSequence;
private ListResult(String nodeId, Long coordinatedActivationSequence, Long localActivationSequence) {
this.nodeId = nodeId;
this.coordinatedActivationSequence = coordinatedActivationSequence;
this.localActivationSequence = localActivationSequence;
}
}
/**
* This has been exposed to ease testing it on integration tests: no need for brokerInstance
*/
public static ListResult execute(final ActivationSequenceList command,
final Configuration config,
final PrintStream out) throws Exception {
String nodeId = command.nodeId;
final boolean remote = command.remote;
final boolean local = command.local;
if (remote && local) {
throw new IllegalArgumentException("--local and --remote cannot be both present: to list both sequences just drop both options");
}
if (nodeId != null && !command.remote) {
throw new IllegalArgumentException("--node-id must be used just with --remote");
}
final HAPolicyConfiguration policyConfig = config.getHAPolicyConfiguration();
final DistributedPrimitiveManagerConfiguration managerConfiguration;
String coordinationId = null;
if (policyConfig instanceof ReplicationBackupPolicyConfiguration) {
ReplicationBackupPolicyConfiguration backupPolicyConfig = (ReplicationBackupPolicyConfiguration) policyConfig;
managerConfiguration = backupPolicyConfig.getDistributedManagerConfiguration();
} else if (policyConfig instanceof ReplicationPrimaryPolicyConfiguration) {
ReplicationPrimaryPolicyConfiguration primaryPolicyConfig = (ReplicationPrimaryPolicyConfiguration) policyConfig;
managerConfiguration = primaryPolicyConfig.getDistributedManagerConfiguration();
if (primaryPolicyConfig.getCoordinationId() != null) {
coordinationId = primaryPolicyConfig.getCoordinationId();
}
} else {
throw new UnsupportedOperationException("This command support just <primary> or <backup> replication configuration");
}
Objects.requireNonNull(managerConfiguration);
NodeManager nodeManager = null;
if (nodeId == null) {
// check local activation sequence and Node ID
nodeManager = new FileLockNodeManager(config.getNodeManagerLockLocation(), false);
nodeManager.start();
}
try {
if (nodeManager != null) {
if (coordinationId != null) {
if (!coordinationId.equals(nodeManager.getNodeId())) {
nodeManager = applyCoordinationId(coordinationId, nodeManager, config.getNodeManagerLockLocation());
}
}
}
Long localSequence = null;
if (nodeManager != null) {
assert nodeId == null;
nodeId = nodeManager.getNodeId().toString();
} else {
assert nodeId != null;
}
if (!remote) {
final long localActivationSequence = nodeManager.getNodeActivationSequence();
if (localActivationSequence == NodeManager.NULL_NODE_ACTIVATION_SEQUENCE) {
if (out != null) {
out.println("No local activation sequence for NodeID=" + nodeId);
}
} else {
localSequence = localActivationSequence;
if (out != null) {
out.println("Local activation sequence for NodeID=" + nodeId + ": " + localActivationSequence);
}
}
}
Long coordinatedSequence = null;
if (!local) {
try (DistributedPrimitiveManager manager = DistributedPrimitiveManager.newInstanceOf(
managerConfiguration.getClassName(), managerConfiguration.getProperties())) {
if (!manager.start(MANAGER_START_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
throw new IllegalStateException("distributed manager isn't started in " + MANAGER_START_TIMEOUT_SECONDS + " seconds");
}
try (MutableLong coordinatedActivationSequence = manager.getMutableLong(nodeId);
DistributedLock liveLock = manager.getDistributedLock(nodeId)) {
if (!liveLock.tryLock()) {
throw new IllegalStateException("Cannot safely get the coordinated activation sequence for NodeID=" + nodeId + ": maybe the live lock is still held.");
}
coordinatedSequence = coordinatedActivationSequence.get();
if (out != null) {
out.println("Coordinated activation sequence for NodeID=" + nodeId + ": " + coordinatedSequence);
}
}
}
}
return new ListResult(nodeId, coordinatedSequence, localSequence);
} finally {
if (nodeManager != null) {
nodeManager.stop();
}
}
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.activation;
import java.io.PrintStream;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.tools.LockAbstract;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
import org.apache.activemq.artemis.quorum.MutableLong;
import static org.apache.activemq.artemis.cli.commands.activation.ActivationSequenceUtils.applyCoordinationId;
@Command(name = "set", description = "Set local and/or remote activation sequence")
public class ActivationSequenceSet extends LockAbstract {
private static final int MANAGER_START_TIMEOUT_SECONDS = 60;
@Option(name = "--node-id", description = "Target sequence for this UUID overwriting the NodeID of this broker too. If not set, broker NodeID is used instead.")
public String nodeId = null;
@Option(name = "--remote", description = "Set just coordinated activation sequence")
public boolean remote = false;
@Option(name = "--local", description = "Set just local activation sequence")
public boolean local = false;
@Option(name = "--to", description = "new activation sequence.", required = true)
public long value;
@Override
public Object execute(ActionContext context) throws Exception {
final Object output = super.execute(context);
execute(this, getFileConfiguration(), context.out);
return output;
}
/**
* This has been exposed to ease testing it on integration tests: no need for brokerInstance
*/
public static void execute(final ActivationSequenceSet command,
final Configuration config,
final PrintStream out) throws Exception {
final String nodeId = command.nodeId;
final boolean remote = command.remote;
final boolean local = command.local;
final long value = command.value;
if (remote && local) {
throw new IllegalArgumentException("--local and --remote cannot be both present: to set both sequences just drop both options");
}
if (value < 0) {
throw new IllegalArgumentException("--to must be >= 0");
}
final HAPolicyConfiguration policyConfig = config.getHAPolicyConfiguration();
final DistributedPrimitiveManagerConfiguration managerConfiguration;
String coordinationId = nodeId;
if (policyConfig instanceof ReplicationBackupPolicyConfiguration) {
ReplicationBackupPolicyConfiguration backupPolicyConfig = (ReplicationBackupPolicyConfiguration) policyConfig;
managerConfiguration = backupPolicyConfig.getDistributedManagerConfiguration();
} else if (policyConfig instanceof ReplicationPrimaryPolicyConfiguration) {
ReplicationPrimaryPolicyConfiguration primaryPolicyConfig = (ReplicationPrimaryPolicyConfiguration) policyConfig;
managerConfiguration = primaryPolicyConfig.getDistributedManagerConfiguration();
if (primaryPolicyConfig.getCoordinationId() != null) {
if (nodeId != null) {
throw new IllegalArgumentException("forcing NodeID with multi-primary is not supported! Try again without --node-id");
}
coordinationId = primaryPolicyConfig.getCoordinationId();
}
} else {
throw new UnsupportedOperationException("This command support just <primary> or <backup> replication configuration");
}
Objects.requireNonNull(managerConfiguration);
NodeManager nodeManager = new FileLockNodeManager(config.getNodeManagerLockLocation(), false);
nodeManager.start();
try {
if (coordinationId != null) {
// force using coordinationId whatever it is - either for multi-primary or just forced through CLI
if (!coordinationId.equals(nodeManager.getNodeId())) {
nodeManager = applyCoordinationId(coordinationId, nodeManager, config.getNodeManagerLockLocation());
}
}
final String localNodeId = nodeManager.getNodeId().toString();
if (!remote) {
final long localActivationSequence = nodeManager.getNodeActivationSequence();
nodeManager.writeNodeActivationSequence(value);
if (out != null) {
if (localActivationSequence == NodeManager.NULL_NODE_ACTIVATION_SEQUENCE) {
out.println("Forced local activation sequence for NodeID=" + localNodeId + " to " + value);
} else {
out.println("Forced local activation sequence for NodeID=" + localNodeId + " from " + localActivationSequence + " to " + value);
}
}
}
if (!local) {
try (DistributedPrimitiveManager manager = DistributedPrimitiveManager.newInstanceOf(
managerConfiguration.getClassName(), managerConfiguration.getProperties())) {
if (!manager.start(MANAGER_START_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
throw new IllegalStateException("distributed manager isn't started in " + MANAGER_START_TIMEOUT_SECONDS + " seconds");
}
try (MutableLong coordinatedActivationSequence = manager.getMutableLong(localNodeId);
DistributedLock liveLock = manager.getDistributedLock(localNodeId)) {
if (!liveLock.tryLock()) {
throw new IllegalStateException("Cannot safely set coordinated activation sequence for NodeID=" + localNodeId + ": live lock is still held.");
}
final long remoteActivationSequence = coordinatedActivationSequence.get();
coordinatedActivationSequence.set(value);
if (out != null) {
out.println("Forced coordinated activation sequence for NodeID=" + localNodeId + " from " + remoteActivationSequence + " to " + value);
}
}
}
}
} finally {
nodeManager.stop();
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.cli.commands.activation;
import java.io.File;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
final class ActivationSequenceUtils {
private ActivationSequenceUtils() {
}
public static NodeManager applyCoordinationId(final String nodeId,
final NodeManager nodeManager,
final File nodeManagerLockLocation) throws Exception {
final long activationSequence = nodeManager.getNodeActivationSequence();
nodeManager.stop();
// REVISIT: this is quite clunky, also in backup activation, we just need new nodeID persisted!
FileLockNodeManager replicatedNodeManager = new FileLockNodeManager(nodeManagerLockLocation, true);
replicatedNodeManager.start();
replicatedNodeManager.setNodeID(nodeId);
// create and persist server.lock file
replicatedNodeManager.stopBackup();
// despite NM is restarted as "replicatedBackup" we need the last written activation sequence value in-memory
final long freshActivationSequence = replicatedNodeManager.readNodeActivationSequence();
assert freshActivationSequence == activationSequence;
return replicatedNodeManager;
}
}

View File

@ -17,7 +17,7 @@
# Additional logger names to configure (root logger is always configured)
# Root logger option
loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource
loggers=org.eclipse.jetty,org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.utils.critical,org.apache.activemq.artemis.journal,org.apache.activemq.artemis.jms.server,org.apache.activemq.artemis.integration.bootstrap,org.apache.activemq.audit.base,org.apache.activemq.audit.message,org.apache.activemq.audit.resource,org.apache.curator,org.apache.zookeeper
# Root logger level
logger.level=INFO
@ -36,6 +36,10 @@ logger.org.eclipse.jetty.level=WARN
# Root logger handlers
logger.handlers=FILE,CONSOLE
# quorum logger levels
logger.org.apache.curator.level=WARN
logger.org.apache.zookeeper.level=ERROR
# to enable audit change the level to INFO
logger.org.apache.activemq.audit.base.level=ERROR
logger.org.apache.activemq.audit.base.handlers=AUDIT_FILE

View File

@ -122,6 +122,15 @@
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<systemProperties>
<fast-tests>${fast-tests}</fast-tests>
</systemProperties>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -38,6 +38,7 @@ import org.apache.curator.framework.state.ConnectionState;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.retry.RetryForever;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.DebugUtils;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.joining;
@ -146,6 +147,13 @@ public class CuratorDistributedPrimitiveManager implements DistributedPrimitiveM
private boolean handlingEvents;
private final CuratorFrameworkFactory.Builder curatorBuilder;
static {
// this is going to set curator/zookeeper log level as per https://cwiki.apache.org/confluence/display/CURATOR/TN8
if (System.getProperty(DebugUtils.PROPERTY_LOG_EVENTS) == null) {
System.setProperty(DebugUtils.PROPERTY_LOG_EVENTS, "false");
}
}
public CuratorDistributedPrimitiveManager(Map<String, String> config) {
this(validateParameters(config), true);
}

View File

@ -61,8 +61,10 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
private static final int RETRIES_MS = 100;
private static final int RETRIES = 1;
// fast-tests runs doesn't need to run 3 ZK nodes
private static final int ZK_NODES = Boolean.getBoolean("fast-tests") ? 1 : 3;
@Parameterized.Parameter
public int nodes;
public int zkNodes;
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
private TestingCluster testingServer;
@ -71,13 +73,13 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Parameterized.Parameters(name = "nodes={0}")
public static Iterable<Object[]> getTestParameters() {
return Arrays.asList(new Object[][]{{3}, {5}});
return Arrays.asList(new Object[][]{{ZK_NODES}});
}
@Override
public void setupEnv() throws Throwable {
clusterSpecs = new InstanceSpec[nodes];
for (int i = 0; i < nodes; i++) {
clusterSpecs = new InstanceSpec[zkNodes];
for (int i = 0; i < zkNodes; i++) {
clusterSpecs[i] = new InstanceSpec(tmpFolder.newFolder(), BASE_SERVER_PORT + i, -1, -1, true, -1, SERVER_TICK_MS, -1);
}
testingServer = new TestingCluster(clusterSpecs);
@ -181,6 +183,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Test
public void canAcquireLockOnMajorityRestart() throws Exception {
Assume.assumeThat(zkNodes, greaterThan(1));
final DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
final DistributedLock lock = manager.getDistributedLock("a");
@ -203,7 +206,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Test
public void cannotStartManagerWithoutQuorum() throws Exception {
Assume.assumeThat(nodes, greaterThan(1));
Assume.assumeThat(zkNodes, greaterThan(1));
DistributedPrimitiveManager manager = createManagedDistributeManager();
stopMajorityNotLeaderNodes(true);
Assert.assertFalse(manager.start(2, TimeUnit.SECONDS));
@ -212,7 +215,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Test(expected = UnavailableStateException.class)
public void cannotAcquireLockWithoutQuorum() throws Exception {
Assume.assumeThat(nodes, greaterThan(1));
Assume.assumeThat(zkNodes, greaterThan(1));
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
stopMajorityNotLeaderNodes(true);
@ -222,7 +225,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Test
public void cannotCheckLockWithoutQuorum() throws Exception {
Assume.assumeThat(nodes, greaterThan(1));
Assume.assumeThat(zkNodes, greaterThan(1));
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
stopMajorityNotLeaderNodes(true);
@ -238,7 +241,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Test
public void canGetLockWithoutQuorum() throws Exception {
Assume.assumeThat(nodes, greaterThan(1));
Assume.assumeThat(zkNodes, greaterThan(1));
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
stopMajorityNotLeaderNodes(true);
@ -248,7 +251,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Test
public void notifiedAsUnavailableWhileLoosingQuorum() throws Exception {
Assume.assumeThat(nodes, greaterThan(1));
Assume.assumeThat(zkNodes, greaterThan(1));
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
DistributedLock lock = manager.getDistributedLock("a");
@ -260,7 +263,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Test
public void beNotifiedOnce() throws Exception {
Assume.assumeThat(nodes, greaterThan(1));
Assume.assumeThat(zkNodes, greaterThan(1));
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
DistributedLock lock = manager.getDistributedLock("a");
@ -276,7 +279,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Test
public void beNotifiedOfUnavailabilityWhileBlockedOnTimedLock() throws Exception {
Assume.assumeThat(nodes, greaterThan(1));
Assume.assumeThat(zkNodes, greaterThan(1));
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
DistributedLock lock = manager.getDistributedLock("a");
@ -312,6 +315,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
@Test
public void beNotifiedOfAlreadyUnavailableManagerAfterAddingListener() throws Exception {
Assume.assumeThat(zkNodes, greaterThan(1));
DistributedPrimitiveManager manager = createManagedDistributeManager();
manager.start();
final AtomicBoolean unavailable = new AtomicBoolean(false);
@ -338,6 +342,9 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
}
private static boolean isLeader(TestingZooKeeperServer server) {
if (server.getInstanceSpecs().size() == 1) {
return true;
}
long leaderId = server.getQuorumPeer().getLeaderId();
long id = server.getQuorumPeer().getId();
return id == leaderId;
@ -345,7 +352,7 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
private void stopMajorityNotLeaderNodes(boolean fromLast) throws Exception {
List<TestingZooKeeperServer> followers = testingServer.getServers().stream().filter(Predicates.not(CuratorDistributedLockTest::isLeader)).collect(Collectors.toList());
final int quorum = (nodes / 2) + 1;
final int quorum = (zkNodes / 2) + 1;
for (int i = 0; i < quorum; i++) {
final int nodeIndex = fromLast ? (followers.size() - 1) - i : i;
followers.get(nodeIndex).stop();
@ -353,9 +360,9 @@ public class CuratorDistributedLockTest extends DistributedLockTest {
}
private void restartMajorityNodes(boolean startFromLast) throws Exception {
final int quorum = (nodes / 2) + 1;
final int quorum = (zkNodes / 2) + 1;
for (int i = 0; i < quorum; i++) {
final int nodeIndex = startFromLast ? (nodes - 1) - i : i;
final int nodeIndex = startFromLast ? (zkNodes - 1) - i : i;
if (!testingServer.restartServer(clusterSpecs[nodeIndex])) {
throw new IllegalStateException("errored while restarting " + clusterSpecs[nodeIndex]);
}

View File

@ -94,7 +94,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
public interface ReplicationEndpointEventListener {
void onRemoteBackupUpToDate();
void onRemoteBackupUpToDate(String nodeId, long activationSequence);
void onLiveStopping(ReplicationLiveIsStoppingMessage.LiveStopping message) throws ActiveMQException;
@ -419,9 +419,9 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
}
private synchronized void finishSynchronization(String liveID) throws Exception {
private synchronized void finishSynchronization(String liveID, long activationSequence) throws Exception {
if (logger.isTraceEnabled()) {
logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID);
logger.trace("BACKUP-SYNC-START: finishSynchronization::" + liveID + " activationSequence = " + activationSequence);
}
for (JournalContent jc : EnumSet.allOf(JournalContent.class)) {
Journal journal = journalsHolder.remove(jc);
@ -476,8 +476,7 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
}
journalsHolder = null;
eventListener.onLiveNodeId(liveID);
eventListener.onRemoteBackupUpToDate();
eventListener.onRemoteBackupUpToDate(liveID, activationSequence);
if (logger.isTraceEnabled()) {
logger.trace("Backup is synchronized / BACKUP-SYNC-DONE");
@ -560,12 +559,13 @@ public final class ReplicationEndpoint implements ChannelHandler, ActiveMQCompon
return replicationResponseMessage;
if (packet.isSynchronizationFinished()) {
long activationSequence = 0;
if (packet.getFileIds() != null && packet.getFileIds().length == 1) {
// this is the version sequence of the data we are replicating
// verified if we activate with this data
server.getNodeManager().writeNodeActivationSequence(packet.getFileIds()[0]);
activationSequence = packet.getFileIds()[0];
}
finishSynchronization(packet.getNodeID());
finishSynchronization(packet.getNodeID(), activationSequence);
replicationResponseMessage.setSynchronizationIsFinishedAcknowledgement(true);
return replicationResponseMessage;
}

View File

@ -39,7 +39,8 @@ public abstract class NodeManager implements ActiveMQComponent {
private UUID uuid;
private boolean isStarted = false;
private final Set<FileLockNodeManager.LockListener> lockListeners;
protected long nodeActivationSequence; // local version of a coordinated sequence, tracking state transitions of ownership
public static final long NULL_NODE_ACTIVATION_SEQUENCE = -1;
protected long nodeActivationSequence = NULL_NODE_ACTIVATION_SEQUENCE; // local version of a coordinated sequence, tracking state transitions of ownership
public NodeManager(final boolean replicatedBackup) {
this.replicatedBackup = replicatedBackup;
@ -98,9 +99,12 @@ public abstract class NodeManager implements ActiveMQComponent {
}
}
public void setNodeActivationSequence(long activationSequence) {
public void setNodeActivationSequence(long sequence) {
if (sequence != NULL_NODE_ACTIVATION_SEQUENCE && sequence < 0) {
throw new IllegalArgumentException("activation sequence must be >=0 or NULL_NODE_ACTIVATION_SEQUENCE");
}
synchronized (nodeIDGuard) {
nodeActivationSequence = activationSequence;
nodeActivationSequence = sequence;
}
}

View File

@ -22,11 +22,15 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Path;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
import static java.nio.file.StandardOpenOption.CREATE;
import static java.nio.file.StandardOpenOption.READ;
@ -34,11 +38,13 @@ import static java.nio.file.StandardOpenOption.WRITE;
public abstract class FileBasedNodeManager extends NodeManager {
private static final Logger LOGGER = Logger.getLogger(FileBasedNodeManager.class);
protected static final byte FIRST_TIME_START = '0';
public static final String SERVER_LOCK_NAME = "server.lock";
public static final String SERVER_ACTIVATION_SEQUENCE_NAME = "server.activation.sequence";
private static final String ACCESS_MODE = "rw";
private final File directory;
private final Path activationSequencePath;
protected FileChannel channel;
protected FileChannel activationSequenceChannel;
@ -48,13 +54,31 @@ public abstract class FileBasedNodeManager extends NodeManager {
if (directory != null) {
directory.mkdirs();
}
activationSequencePath = new File(directory, SERVER_ACTIVATION_SEQUENCE_NAME).toPath();
}
protected void useActivationSequenceChannel() throws IOException {
if (activationSequenceChannel != null) {
return;
/**
* If {@code createIfNotExists} and activation sequence file doesn't exist yet, it returns {@code null},
* otherwise it opens it.<br>
* if {@code !createIfNotExists} it just open to create it.
*/
private FileChannel useActivationSequenceChannel(final boolean createIfNotExists) throws IOException {
FileChannel channel = this.activationSequenceChannel;
if (channel != null) {
return channel;
}
activationSequenceChannel = FileChannel.open(newFile(SERVER_ACTIVATION_SEQUENCE_NAME).toPath(), READ, WRITE, CREATE);
final OpenOption[] openOptions;
if (!createIfNotExists) {
if (!Files.exists(activationSequencePath)) {
return null;
}
openOptions = new OpenOption[]{READ, WRITE};
} else {
openOptions = new OpenOption[]{READ, WRITE, CREATE};
}
channel = FileChannel.open(activationSequencePath, openOptions);
activationSequenceChannel = channel;
return channel;
}
@Override
@ -63,30 +87,37 @@ public abstract class FileBasedNodeManager extends NodeManager {
throw new NodeManagerException(new IllegalStateException("node manager must be started first"));
}
try {
useActivationSequenceChannel();
ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
if (activationSequenceChannel.read(tmpBuffer, 0) != Long.BYTES) {
return 0;
final FileChannel channel = useActivationSequenceChannel(false);
if (channel == null) {
setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE);
return NULL_NODE_ACTIVATION_SEQUENCE;
}
final ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
if (channel.read(tmpBuffer, 0) != Long.BYTES) {
setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE);
return NULL_NODE_ACTIVATION_SEQUENCE;
}
tmpBuffer.flip();
return tmpBuffer.getLong(0);
final long activationSequence = tmpBuffer.getLong(0);
setNodeActivationSequence(activationSequence);
return activationSequence;
} catch (IOException ie) {
throw new NodeManagerException(ie);
}
}
@Override
public void writeNodeActivationSequence(long version) throws NodeManagerException {
public void writeNodeActivationSequence(long sequence) throws NodeManagerException {
if (!isStarted()) {
throw new NodeManagerException(new IllegalStateException("node manager must be started first"));
}
try {
useActivationSequenceChannel();
ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
tmpBuffer.putLong(0, version);
activationSequenceChannel.write(tmpBuffer, 0);
activationSequenceChannel.force(false);
setNodeActivationSequence(version);
final FileChannel channel = useActivationSequenceChannel(true);
final ByteBuffer tmpBuffer = ByteBuffer.allocate(Long.BYTES).order(ByteOrder.BIG_ENDIAN);
tmpBuffer.putLong(0, sequence);
channel.write(tmpBuffer, 0);
channel.force(false);
setNodeActivationSequence(sequence);
} catch (IOException ie) {
throw new NodeManagerException(ie);
}
@ -133,6 +164,16 @@ public abstract class FileBasedNodeManager extends NodeManager {
}
}
if (channel != null) {
try {
channel.close();
} catch (IOException ignored) {
// can ignore it: going to open a new file and that's the I/O to care about
} finally {
channel = null;
}
}
@SuppressWarnings("resource")
RandomAccessFile raFile = new RandomAccessFile(serverLockFile, ACCESS_MODE);
@ -190,6 +231,7 @@ public abstract class FileBasedNodeManager extends NodeManager {
channelCopy.close();
} finally {
try {
setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE);
FileChannel dataVersionChannel = this.activationSequenceChannel;
this.activationSequenceChannel = null;
if (dataVersionChannel != null) {
@ -203,13 +245,19 @@ public abstract class FileBasedNodeManager extends NodeManager {
@Override
public void stopBackup() throws NodeManagerException {
if (replicatedBackup && getNodeId() != null) {
try {
setUpServerLockFile();
} catch (IOException e) {
throw new NodeManagerException(e);
synchronized (nodeIDGuard) {
if (replicatedBackup && getNodeId() != null) {
try {
setUpServerLockFile();
final long nodeActivationSequence = this.nodeActivationSequence;
if (nodeActivationSequence != NULL_NODE_ACTIVATION_SEQUENCE) {
writeNodeActivationSequence(nodeActivationSequence);
}
} catch (IOException e) {
throw new NodeManagerException(e);
}
}
super.stopBackup();
}
super.stopBackup();
}
}

View File

@ -100,6 +100,7 @@ public class FileLockNodeManager extends FileBasedNodeManager {
}
super.start();
readNodeActivationSequence();
}
@Override

View File

@ -62,6 +62,15 @@ public final class InVMNodeManager extends FileBasedNodeManager {
setUUID(UUIDGenerator.getInstance().generateUUID());
}
@Override
public synchronized void start() throws Exception {
if (isStarted()) {
return;
}
super.start();
readNodeActivationSequence();
}
@Override
public void awaitLiveNode() throws InterruptedException {
do {

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
import org.apache.activemq.artemis.quorum.UnavailableStateException;
import org.jboss.logging.Logger;
import static org.apache.activemq.artemis.core.server.NodeManager.NULL_NODE_ACTIVATION_SEQUENCE;
import static org.apache.activemq.artemis.core.server.impl.ReplicationObserver.ReplicationFailure;
import static org.apache.activemq.artemis.core.server.impl.quorum.ActivationSequenceStateMachine.ensureSequentialAccessToNodeData;
import static org.apache.activemq.artemis.core.server.impl.quorum.ActivationSequenceStateMachine.tryActivate;
@ -74,11 +75,17 @@ public final class ReplicationBackupActivation extends Activation implements Dis
final ReplicationBackupPolicy policy) {
this.activeMQServer = activeMQServer;
if (policy.isTryFailback()) {
final SimpleString serverNodeID = activeMQServer.getNodeID();
if (serverNodeID == null || serverNodeID.isEmpty()) {
throw new IllegalStateException("A failback activation must be biased around a specific NodeID");
// patch expectedNodeID
final String coordinationId = policy.getLivePolicy().getCoordinationId();
if (coordinationId != null) {
expectedNodeID = coordinationId;
} else {
final SimpleString serverNodeID = activeMQServer.getNodeID();
if (serverNodeID == null || serverNodeID.isEmpty()) {
throw new IllegalStateException("A failback activation must be biased around a specific NodeID");
}
this.expectedNodeID = serverNodeID.toString();
}
this.expectedNodeID = serverNodeID.toString();
} else {
this.expectedNodeID = null;
}
@ -142,19 +149,33 @@ public final class ReplicationBackupActivation extends Activation implements Dis
}
}
try {
distributedManager.start();
final long nodeActivationSequence = activeMQServer.getNodeManager().readNodeActivationSequence();
// only a backup with positive local activation sequence could contain valuable data
if (nodeActivationSequence > 0) {
synchronized (activeMQServer) {
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
}
// restart of server due to unavailable quorum can cause NM restart losing the coordination-id
final String coordinationId = policy.getLivePolicy().getCoordinationId();
if (coordinationId != null) {
final String nodeId = activeMQServer.getNodeManager().getNodeId().toString();
if (!coordinationId.equals(nodeId)) {
ReplicationPrimaryActivation.applyCoordinationId(coordinationId, activeMQServer);
}
}
distributedManager.start();
final NodeManager nodeManager = activeMQServer.getNodeManager();
// only a backup with positive local activation sequence could contain valuable data
if (nodeManager.getNodeActivationSequence() > 0) {
DistributedLock liveLockWithInSyncReplica;
while (true) {
distributedManager.start();
try {
liveLockWithInSyncReplica = tryActivate(nodeId, nodeActivationSequence, distributedManager, LOGGER);
liveLockWithInSyncReplica = tryActivate(activeMQServer.getNodeManager(), distributedManager, LOGGER);
break;
} catch (UnavailableStateException canRecoverEx) {
distributedManager.stop();
} catch (NodeManager.NodeManagerException fatalEx) {
LOGGER.warn("Failed while auto-repairing activation sequence: stop server now", fatalEx);
asyncRestartServer(activeMQServer, false);
return;
}
}
if (liveLockWithInSyncReplica != null) {
@ -162,7 +183,6 @@ public final class ReplicationBackupActivation extends Activation implements Dis
if (!activeMQServer.initialisePart1(false)) {
return;
}
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
startAsLive(liveLockWithInSyncReplica);
return;
}
@ -174,6 +194,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis
// A primary need to preserve NodeID across runs
activeMQServer.moveServerData(policy.getMaxSavedReplicatedJournalsSize(), policy.isTryFailback());
activeMQServer.getNodeManager().start();
// allow JMX to query Artemis state
if (!activeMQServer.initialisePart1(false)) {
return;
}
@ -190,7 +211,6 @@ public final class ReplicationBackupActivation extends Activation implements Dis
clusterController.awaitConnectionToReplicationCluster();
activeMQServer.getBackupManager().start();
activeMQServer.setState(ActiveMQServerImpl.SERVER_STATE.STARTED);
final DistributedLock liveLock = replicateAndFailover(clusterController);
if (liveLock == null) {
return;
@ -213,8 +233,12 @@ public final class ReplicationBackupActivation extends Activation implements Dis
liveLock.close();
return;
}
final NodeManager nodeManager = activeMQServer.getNodeManager();
try {
ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER);
// stopBackup is going to write the NodeID and activation sequence previously set on the NodeManager,
// because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true.
nodeManager.stopBackup();
ensureSequentialAccessToNodeData(activeMQServer.toString(), nodeManager, distributedManager, LOGGER);
} catch (Throwable fatal) {
LOGGER.warn(fatal);
// policy is already live one, but there's no activation yet: we can just stop
@ -222,9 +246,6 @@ public final class ReplicationBackupActivation extends Activation implements Dis
throw new ActiveMQIllegalStateException("This server cannot ensure sequential access to broker data: activation is failed");
}
ActiveMQServerLogger.LOGGER.becomingLive(activeMQServer);
// stopBackup is going to write the NodeID previously set on the NodeManager,
// because activeMQServer.resetNodeManager() has created a NodeManager with replicatedBackup == true.
activeMQServer.getNodeManager().stopBackup();
activeMQServer.getStorageManager().start();
activeMQServer.getBackupManager().activated();
// IMPORTANT:
@ -280,6 +301,9 @@ public final class ReplicationBackupActivation extends Activation implements Dis
return null;
}
}
if (expectedNodeID != null) {
LOGGER.infof("awaiting connecting to any live node with NodeID = %s", expectedNodeID);
}
final ReplicationFailure failure = replicateLive(clusterController, nodeLocator, registrationFailureForwarder);
if (failure == null) {
Thread.sleep(clusterController.getRetryIntervalForReplicatedCluster());
@ -300,19 +324,19 @@ public final class ReplicationBackupActivation extends Activation implements Dis
}
// no more interested into these events: handling it manually from here
distributedManager.removeUnavailableManagerListener(this);
final long nodeActivationSequence = activeMQServer.getNodeManager().readNodeActivationSequence();
final String nodeId = activeMQServer.getNodeManager().getNodeId().toString();
final NodeManager nodeManager = activeMQServer.getNodeManager();
DistributedLock liveLockWithInSyncReplica = null;
if (nodeActivationSequence > 0) {
if (nodeManager.getNodeActivationSequence() > 0) {
try {
liveLockWithInSyncReplica = tryActivate(nodeId, nodeActivationSequence, distributedManager, LOGGER);
liveLockWithInSyncReplica = tryActivate(nodeManager, distributedManager, LOGGER);
} catch (Throwable error) {
// no need to retry here, can just restart as backup that will handle a more resilient tryActivate
LOGGER.warn("Errored while attempting failover", error);
liveLockWithInSyncReplica = null;
}
} else {
LOGGER.warnf("We expect local activation sequence for NodeID = %s to be > 0 on a fail-over, while is %d", nodeId, nodeActivationSequence);
LOGGER.errorf("Expected positive local activation sequence for NodeID = %s during fail-over, but was %d: restarting as backup",
nodeManager.getNodeId(), nodeManager.getNodeActivationSequence());
}
assert stopping.get();
if (liveLockWithInSyncReplica != null) {
@ -340,7 +364,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis
if (activationSequence != 0) {
final SimpleString syncNodeId = activeMQServer.getNodeManager().getNodeId();
try {
activeMQServer.getNodeManager().writeNodeActivationSequence(0);
activeMQServer.getNodeManager().setNodeActivationSequence(NULL_NODE_ACTIVATION_SEQUENCE);
} catch (Throwable fatal) {
LOGGER.errorf(fatal, "Errored while resetting local activation sequence %d for NodeID = %s: stopping broker",
activationSequence, syncNodeId);
@ -356,6 +380,10 @@ public final class ReplicationBackupActivation extends Activation implements Dis
LOGGER.error("Stopping broker because of wrong node ID communication from live: maybe a misbehaving live?");
asyncRestartServer(activeMQServer, false);
return null;
case WrongActivationSequence:
LOGGER.error("Stopping broker because of wrong activation sequence communication from live: maybe a misbehaving live?");
asyncRestartServer(activeMQServer, false);
return null;
default:
throw new AssertionError("Unsupported failure " + failure);
}
@ -560,7 +588,7 @@ public final class ReplicationBackupActivation extends Activation implements Dis
}
private static boolean connectToReplicationEndpoint(final ClusterControl liveControl,
final ReplicationEndpoint replicationEndpoint) {
final ReplicationEndpoint replicationEndpoint) {
final Channel replicationChannel = liveControl.createReplicationChannel();
replicationChannel.setHandler(replicationEndpoint);
replicationEndpoint.setChannel(replicationChannel);

View File

@ -41,7 +41,7 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu
private static final Logger LOGGER = Logger.getLogger(ReplicationObserver.class);
public enum ReplicationFailure {
VoluntaryFailOver, BackupNotInSync, NonVoluntaryFailover, RegistrationError, AlreadyReplicating, ClosedObserver, WrongNodeId;
VoluntaryFailOver, BackupNotInSync, NonVoluntaryFailover, RegistrationError, AlreadyReplicating, ClosedObserver, WrongNodeId, WrongActivationSequence
}
private final NodeManager nodeManager;
@ -244,7 +244,7 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu
}
@Override
public void onRemoteBackupUpToDate() {
public void onRemoteBackupUpToDate(String nodeId, long activationSequence) {
if (backupUpToDate || closed || replicationFailure.isDone()) {
return;
}
@ -252,7 +252,29 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu
if (backupUpToDate || closed || replicationFailure.isDone()) {
return;
}
assert liveID != null;
if (!validateNodeId(nodeId)) {
stopForcedFailoverAfterDelay();
unlistenConnectionFailures();
replicationFailure.complete(ReplicationFailure.WrongNodeId);
return;
}
if (liveID == null) {
liveID = nodeId;
}
if (activationSequence <= 0) {
// NOTE:
// activationSequence == 0 is still illegal,
// because live has to increase the sequence before replicating
stopForcedFailoverAfterDelay();
unlistenConnectionFailures();
LOGGER.errorf("Illegal activation sequence %d from NodeID = %s", activationSequence, nodeId);
replicationFailure.complete(ReplicationFailure.WrongActivationSequence);
return;
}
nodeManager.setNodeID(nodeId);
nodeManager.setNodeActivationSequence(activationSequence);
// persists nodeID and nodeActivationSequence
nodeManager.stopBackup();
backupManager.announceBackup();
backupUpToDate = true;
}
@ -298,8 +320,9 @@ final class ReplicationObserver implements ClusterTopologyListener, SessionFailu
unlistenConnectionFailures();
replicationFailure.complete(ReplicationFailure.WrongNodeId);
} else if (liveID == null) {
// just store it locally: if is stored on the node manager
// it will be persisted on broker's stop while data is not yet in sync
liveID = nodeId;
nodeManager.setNodeID(nodeId);
}
}
}

View File

@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.server.impl;
import javax.annotation.concurrent.GuardedBy;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -110,20 +111,18 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
try {
// we have a common nodeId that we can share and coordinate with between peers
if (policy.getCoordinationId() != null) {
LOGGER.infof("Applying shared peer NodeID=%s to enable coordinated live activation", policy.getCoordinationId());
// REVISIT: this is quite clunky, also in backup activation, we just need new nodeID persisted!
activeMQServer.resetNodeManager();
activeMQServer.getNodeManager().start();
activeMQServer.getNodeManager().setNodeID(policy.getCoordinationId());
activeMQServer.getNodeManager().stopBackup();
applyCoordinationId(policy.getCoordinationId(), activeMQServer);
}
final NodeManager nodeManager = activeMQServer.getNodeManager();
if (nodeManager.getNodeActivationSequence() == NodeManager.NULL_NODE_ACTIVATION_SEQUENCE) {
// persists an initial activation sequence
nodeManager.writeNodeActivationSequence(0);
}
final long nodeActivationSequence = activeMQServer.getNodeManager().readNodeActivationSequence();
final String nodeId = activeMQServer.getNodeManager().readNodeId().toString();
DistributedLock liveLock;
while (true) {
distributedManager.start();
try {
liveLock = tryActivate(nodeId, nodeActivationSequence, distributedManager, LOGGER);
liveLock = tryActivate(nodeManager, distributedManager, LOGGER);
break;
} catch (UnavailableStateException canRecoverEx) {
distributedManager.stop();
@ -131,12 +130,12 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
}
if (liveLock == null) {
distributedManager.stop();
LOGGER.infof("This broker cannot become a live server with NodeID = %s: restarting as backup", nodeId);
LOGGER.infof("This broker cannot become a live server with NodeID = %s: restarting as backup", nodeManager.getNodeId());
activeMQServer.setHAPolicy(policy.getBackupPolicy());
return;
}
ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER);
ensureSequentialAccessToNodeData(activeMQServer.toString(), nodeManager, distributedManager, LOGGER);
activeMQServer.initialisePart1(false);
@ -166,6 +165,25 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
}
}
public static void applyCoordinationId(final String coordinationId,
final ActiveMQServerImpl activeMQServer) throws Exception {
Objects.requireNonNull(coordinationId);
if (!activeMQServer.getNodeManager().isStarted()) {
throw new IllegalStateException("NodeManager should be started");
}
final long activationSequence = activeMQServer.getNodeManager().getNodeActivationSequence();
LOGGER.infof("Applying shared peer NodeID=%s to enable coordinated live activation", coordinationId);
// REVISIT: this is quite clunky, also in backup activation, we just need new nodeID persisted!
activeMQServer.resetNodeManager();
final NodeManager nodeManager = activeMQServer.getNodeManager();
nodeManager.start();
nodeManager.setNodeID(coordinationId);
nodeManager.stopBackup();
// despite NM is restarted as "replicatedBackup" we need the last written activation sequence value in-memory
final long freshActivationSequence = nodeManager.readNodeActivationSequence();
assert freshActivationSequence == activationSequence;
}
@Override
public ChannelHandler getActivationChannelHandler(final Channel channel, final Acceptor acceptorUsed) {
if (stoppingServer.get()) {
@ -367,7 +385,7 @@ public class ReplicationPrimaryActivation extends LiveActivation implements Dist
// we increment only if we are staying alive
if (!stoppingServer.get() && STARTED.equals(activeMQServer.getState())) {
try {
ensureSequentialAccessToNodeData(activeMQServer, distributedManager, LOGGER);
ensureSequentialAccessToNodeData(activeMQServer.toString(), activeMQServer.getNodeManager(), distributedManager, LOGGER);
} catch (Throwable fatal) {
LOGGER.errorf(fatal, "Unexpected exception: %s on attempted activation sequence increment; stopping server async", fatal.getLocalizedMessage());
asyncStopServer();

View File

@ -465,7 +465,8 @@ public final class SharedNothingBackupActivation extends Activation implements R
}
@Override
public void onRemoteBackupUpToDate() {
public void onRemoteBackupUpToDate(String nodeId, long ignoredActivationSequence) {
backupQuorum.liveIDSet(nodeId);
activeMQServer.getBackupManager().announceBackup();
backupUpToDate = true;
backupSyncLatch.countDown();

View File

@ -23,7 +23,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
@ -40,7 +39,6 @@ import org.jboss.logging.Logger;
public final class ActivationSequenceStateMachine {
private static final long CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS = 200;
private static final long CHECK_REPAIRED_ACTIVATION_SEQUENCE_WAIT_MILLIS = 2000;
private static final long LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS = 2000;
private ActivationSequenceStateMachine() {
@ -49,53 +47,56 @@ public final class ActivationSequenceStateMachine {
/**
* It loops if the data of the broker is still valuable, but cannot become live.
* It loops (temporarly) if data is in sync or can self-heal, but cannot yet acquire the live lock.
* It loops (temporarily) if data is in sync or can auto-repair, but cannot yet acquire the live lock.
* <p>
* It stops loop and return:
* <p><ul>
* <li>{@code null}: if data is stale (and there are no rights to become live)
* <li>{@code !=null}: if data is in sync and the {@link DistributedLock} is correctly acquired
* <li>{@code !=null}: if data is in sync/repaired and the {@link DistributedLock} is correctly acquired
* </ul><p>
* <p>
* After successfully returning from this method ie not null return value, a broker should use
* {@link #ensureSequentialAccessToNodeData(ActiveMQServer, DistributedPrimitiveManager, Logger)} to complete
* {@link #ensureSequentialAccessToNodeData} to complete
* the activation and guarantee the initial not-replicated ownership of data.
*/
public static DistributedLock tryActivate(final String nodeId,
final long nodeActivationSequence,
public static DistributedLock tryActivate(final NodeManager nodeManager,
final DistributedPrimitiveManager distributedManager,
final Logger logger) throws InterruptedException, ExecutionException, TimeoutException, UnavailableStateException {
Objects.requireNonNull(nodeManager);
Objects.requireNonNull(distributedManager);
Objects.requireNonNull(logger);
final String nodeId = nodeManager.getNodeId() == null ? null : nodeManager.getNodeId().toString();
Objects.requireNonNull(nodeId);
final long nodeActivationSequence = nodeManager.getNodeActivationSequence();
if (nodeActivationSequence < 0) {
throw new IllegalArgumentException("nodeActivationSequence must be > 0");
}
final DistributedLock activationLock = distributedManager.getDistributedLock(nodeId);
try (MutableLong coordinatedNodeSequence = distributedManager.getMutableLong(nodeId)) {
while (true) {
// dirty read is sufficient to know if we are *not* an in sync replica
// typically the lock owner will increment to signal our data is stale and we are happy without any
// further coordination at this point
long timeout = 0;
switch (validateActivationSequence(coordinatedNodeSequence, activationLock, nodeId, nodeActivationSequence, logger)) {
case Stale:
activationLock.close();
return null;
case SelfRepair:
case InSync:
timeout = LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS;
break;
case SelfRepair:
case MaybeInSync:
if (activationLock.tryLock()) {
// BAD: where's the broker that should commit it?
activationLock.unlock();
logger.warnf("Cannot assume live role for NodeID = %s: claimed activation sequence need to be repaired",
nodeId);
TimeUnit.MILLISECONDS.sleep(CHECK_REPAIRED_ACTIVATION_SEQUENCE_WAIT_MILLIS);
continue;
}
// quick path while data is still valuable: wait until something change (commit/repair)
TimeUnit.MILLISECONDS.sleep(CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS);
continue;
break;
}
// SelfRepair, InSync
if (!activationLock.tryLock(LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
// SelfRepair, MaybeInSync, InSync
if (!activationLock.tryLock(timeout, TimeUnit.MILLISECONDS)) {
logger.debugf("Candidate for Node ID = %s, with local activation sequence: %d, cannot acquire live lock within %dms; retrying",
nodeId, nodeActivationSequence, LIVE_LOCK_ACQUIRE_TIMEOUT_MILLIS);
nodeId, nodeActivationSequence, timeout);
if (timeout == 0) {
Thread.sleep(CHECK_ACTIVATION_SEQUENCE_WAIT_MILLIS);
}
continue;
}
switch (validateActivationSequence(coordinatedNodeSequence, activationLock, nodeId, nodeActivationSequence, logger)) {
@ -103,25 +104,46 @@ public final class ActivationSequenceStateMachine {
case Stale:
activationLock.close();
return null;
case SelfRepair:
// Self-repair sequence ie we were the only one with the most up to date data.
// NOTE: We cannot move the sequence now, let's delay it on ensureSequentialAccessToNodeData
logger.infof("Assuming live role for NodeID = %s: local activation sequence %d matches claimed coordinated activation sequence %d. Repairing sequence", nodeId, nodeActivationSequence, nodeActivationSequence);
return activationLock;
case InSync:
// we are an in_sync_replica, good to go live as UNREPLICATED
logger.infof("Assuming live role for NodeID = %s, local activation sequence %d matches current coordinated activation sequence %d", nodeId, nodeActivationSequence, nodeActivationSequence);
return activationLock;
case SelfRepair:
// Self-repair sequence
logger.infof("Assuming live role for NodeID = %s: local activation sequence %d matches claimed coordinated activation sequence %d. Repairing sequence", nodeId, nodeActivationSequence, nodeActivationSequence);
try {
repairActivationSequence(nodeManager, coordinatedNodeSequence, nodeActivationSequence, true);
return activationLock;
} catch (NodeManager.NodeManagerException | UnavailableStateException ex) {
activationLock.close();
throw ex;
}
case MaybeInSync:
activationLock.unlock();
logger.warnf("Cannot assume live role for NodeID = %s: claimed activation sequence need to be repaired", nodeId);
TimeUnit.MILLISECONDS.sleep(CHECK_REPAIRED_ACTIVATION_SEQUENCE_WAIT_MILLIS);
continue;
// Auto-repair sequence
logger.warnf("Assuming live role for NodeID = %s: repairing claimed activation sequence", nodeId);
try {
repairActivationSequence(nodeManager, coordinatedNodeSequence, nodeActivationSequence, false);
return activationLock;
} catch (NodeManager.NodeManagerException | UnavailableStateException ex) {
activationLock.close();
throw ex;
}
}
}
}
}
private static void repairActivationSequence(final NodeManager nodeManager,
final MutableLong coordinatedNodeSequence,
final long nodeActivationSequence,
final boolean selfHeal) throws UnavailableStateException {
final long coordinatedNodeActivationSequence = selfHeal ? nodeActivationSequence : nodeActivationSequence + 1;
if (!selfHeal) {
nodeManager.writeNodeActivationSequence(coordinatedNodeActivationSequence);
}
coordinatedNodeSequence.set(coordinatedNodeActivationSequence);
}
private enum ValidationResult {
/**
* coordinated activation sequence (claimed/committed) is far beyond the local one: data is not valuable anymore
@ -231,82 +253,64 @@ public final class ActivationSequenceStateMachine {
}
/**
* This is going to increment the coordinated activation sequence while holding the live lock, failing with some exception otherwise.<br>
* <p>
* The acceptable states are {@link ValidationResult#InSync} and {@link ValidationResult#SelfRepair}, throwing some exception otherwise.
* This is going to increment the coordinated activation sequence and the local activation sequence
* (using {@link NodeManager#writeNodeActivationSequence}) while holding the live lock,
* failing with some exception otherwise.<br>
* <p>
* This must be used while holding a live lock to ensure not-exclusive ownership of data ie can be both used
* while loosing connectivity with a replica or after successfully {@link #tryActivate(String, long, DistributedPrimitiveManager, Logger)}.
* while loosing connectivity with a replica or after successfully {@link #tryActivate}.
*/
public static void ensureSequentialAccessToNodeData(ActiveMQServer activeMQServer,
DistributedPrimitiveManager distributedPrimitiveManager,
public static void ensureSequentialAccessToNodeData(final String serverDescription,
final NodeManager nodeManager,
final DistributedPrimitiveManager distributedManager,
final Logger logger) throws ActiveMQException, InterruptedException, UnavailableStateException, ExecutionException, TimeoutException {
final NodeManager nodeManager = activeMQServer.getNodeManager();
final String lockAndLongId = nodeManager.getNodeId().toString();
final DistributedLock liveLock = distributedPrimitiveManager.getDistributedLock(lockAndLongId);
Objects.requireNonNull(serverDescription);
Objects.requireNonNull(nodeManager);
Objects.requireNonNull(distributedManager);
Objects.requireNonNull(logger);
final String lockAndLongId = nodeManager.getNodeId() == null ? null : nodeManager.getNodeId().toString();
Objects.requireNonNull(lockAndLongId);
final long nodeActivationSequence = nodeManager.getNodeActivationSequence();
if (nodeActivationSequence < 0) {
throw new IllegalArgumentException("nodeActivationSequence must be >= 0");
}
final DistributedLock liveLock = distributedManager.getDistributedLock(lockAndLongId);
if (!liveLock.isHeldByCaller()) {
final String message = String.format("Server [%s], live lock for NodeID = %s, not held, activation sequence cannot be safely changed",
activeMQServer, lockAndLongId);
serverDescription, lockAndLongId);
logger.info(message);
throw new UnavailableStateException(message);
}
final long nodeActivationSequence = nodeManager.readNodeActivationSequence();
final MutableLong coordinatedNodeActivationSequence = distributedPrimitiveManager.getMutableLong(lockAndLongId);
final long currentCoordinatedActivationSequence = coordinatedNodeActivationSequence.get();
final long nextActivationSequence;
if (currentCoordinatedActivationSequence < 0) {
// Check Self-Repair
if (nodeActivationSequence != -currentCoordinatedActivationSequence) {
final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, local activation sequence %d does not match current claimed coordinated sequence %d: need repair",
activeMQServer, lockAndLongId, nodeActivationSequence, -currentCoordinatedActivationSequence);
logger.info(message);
throw new ActiveMQException(message);
}
// auto-repair: this is the same server that failed to commit its claimed sequence
nextActivationSequence = nodeActivationSequence;
} else {
// Check InSync
if (nodeActivationSequence != currentCoordinatedActivationSequence) {
final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, local activation sequence %d does not match current coordinated sequence %d",
activeMQServer, lockAndLongId, nodeActivationSequence, currentCoordinatedActivationSequence);
logger.info(message);
throw new ActiveMQException(message);
}
nextActivationSequence = nodeActivationSequence + 1;
}
final MutableLong coordinatedNodeActivationSequence = distributedManager.getMutableLong(lockAndLongId);
final long nextActivationSequence = nodeActivationSequence + 1;
// UN_REPLICATED STATE ENTER: auto-repair doesn't need to claim and write locally
if (nodeActivationSequence != nextActivationSequence) {
// claim
if (!coordinatedNodeActivationSequence.compareAndSet(nodeActivationSequence, -nextActivationSequence)) {
final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence claim failed, local activation sequence %d no longer matches current coordinated sequence %d",
activeMQServer, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get());
logger.infof(message);
throw new ActiveMQException(message);
}
// claim success: write locally
try {
nodeManager.writeNodeActivationSequence(nextActivationSequence);
} catch (NodeManager.NodeManagerException fatal) {
logger.errorf("Server [%s] failed to set local activation sequence to: %d for NodeId =%s. Cannot continue committing coordinated activation sequence: REQUIRES ADMIN INTERVENTION",
activeMQServer, nextActivationSequence, lockAndLongId);
throw new UnavailableStateException(fatal);
}
logger.infof("Server [%s], incremented local activation sequence to: %d for NodeId = %s",
activeMQServer, nextActivationSequence, lockAndLongId);
} else {
// self-heal need to update the in-memory sequence, because no writes will do it
nodeManager.setNodeActivationSequence(nextActivationSequence);
// claim
if (!coordinatedNodeActivationSequence.compareAndSet(nodeActivationSequence, -nextActivationSequence)) {
final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence claim failed, local activation sequence %d no longer matches current coordinated sequence %d",
serverDescription, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get());
logger.infof(message);
throw new ActiveMQException(message);
}
// claim success: write locally
try {
nodeManager.writeNodeActivationSequence(nextActivationSequence);
} catch (NodeManager.NodeManagerException fatal) {
logger.errorf("Server [%s] failed to set local activation sequence to: %d for NodeId =%s. Cannot continue committing coordinated activation sequence: REQUIRES ADMIN INTERVENTION",
serverDescription, nextActivationSequence, lockAndLongId);
throw new UnavailableStateException(fatal);
}
logger.infof("Server [%s], incremented local activation sequence to: %d for NodeId = %s",
serverDescription, nextActivationSequence, lockAndLongId);
// commit
if (!coordinatedNodeActivationSequence.compareAndSet(-nextActivationSequence, nextActivationSequence)) {
final String message = String.format("Server [%s], cannot assume live role for NodeID = %s, activation sequence commit failed, local activation sequence %d no longer matches current coordinated sequence %d",
activeMQServer, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get());
serverDescription, lockAndLongId, nodeActivationSequence, coordinatedNodeActivationSequence.get());
logger.infof(message);
throw new ActiveMQException(message);
}
logger.infof("Server [%s], incremented coordinated activation sequence to: %d for NodeId = %s",
activeMQServer, nextActivationSequence, lockAndLongId);
serverDescription, nextActivationSequence, lockAndLongId);
}
}

View File

@ -78,6 +78,7 @@
* [CDI Integration](cdi-integration.md)
* [Intercepting Operations](intercepting-operations.md)
* [Data Tools](data-tools.md)
* [Activation Tools](activation-tools.md)
* [Maven Plugin](maven-plugin.md)
* [Unit Testing](unit-testing.md)
* [Troubleshooting and Performance Tuning](perf-tuning.md)

View File

@ -0,0 +1,46 @@
# Activation Sequence Tools
You can use the Artemis CLI to execute activation sequence maintenance/recovery tools
for [Pluggable Quorum Replication](ha.md).
The 2 main commands are `activation list` and `activation set`, that can be used together to recover some disaster
happened to local/coordinated activation sequences.
Here is a disaster scenario built around the RI (using [Apache Zookeeper](https://zookeeper.apache.org/)
and [Apache curator](https://curator.apache.org/)) to demonstrate the usage of such commands.
## Troubleshooting Case: Zookeeper Cluster disaster
A proper Zookeeper cluster should use at least 3 nodes, but what happens if all these nodes crash loosing any activation
state information required to run an Artemis replication cluster?
During the disaster ie Zookeeper nodes no longer reachable, brokers:
- live ones shutdown (and if restarted by a script, should hang awaiting to connect to the Zookeeper cluster again)
- replicas become passive, awaiting to connect to the Zookeeper cluster again
Admin should:
1. stop all Artemis brokers
2. restart Zookeeper cluster
3. search brokers with the highest local activation sequence for their `NodeID`, by running this command from the `bin`
folder of the broker:
```bash
$ ./artemis activation list --local
Local activation sequence for NodeID=7debb3d1-0d4b-11ec-9704-ae9213b68ac4: 1
```
4. from the `bin` folder of the brokers with the highest local activation sequence
```bash
# assuming 1 to be the highest local activation sequence obtained at the previous step
# for NodeID 7debb3d1-0d4b-11ec-9704-ae9213b68ac4
$ ./artemis activation set --remote --to 1
Forced coordinated activation sequence for NodeID=7debb3d1-0d4b-11ec-9704-ae9213b68ac4 from 0 to 1
```
5. restart all brokers: previously live ones should be able to be live again
The higher the number of Zookeeper nodes are, the less the chance are that a disaster like this requires Admin
intervention, because it allows the Zookeeper cluster to tolerate more failures.

View File

@ -107,8 +107,7 @@
<mockito.version>3.11.2</mockito.version>
<jctools.version>2.1.2</jctools.version>
<netty.version>4.1.66.Final</netty.version>
<curator.version>5.1.0</curator.version>
<!-- While waiting https://issues.apache.org/jira/browse/CURATOR-595 fix -->
<curator.version>5.2.0</curator.version>
<zookeeper.version>3.6.3</zookeeper.version>
<!-- this is basically for tests -->
@ -219,6 +218,7 @@
<!-- Ignore failed tests by default because there are "known" failures in the full test-suite.
This will be set to false for the "fast-tests" profile as none of those tests should fail. -->
<testFailureIgnore>true</testFailureIgnore>
<fast-tests>false</fast-tests>
</properties>
<scm>
@ -1206,6 +1206,7 @@
This is used on PR checks -->
<id>fast-tests</id>
<properties>
<fast-tests>true</fast-tests>
<skipUnitTests>false</skipUnitTests>
<skipJmsTests>false</skipJmsTests>
<skipJoramTests>false</skipJoramTests>

View File

@ -17,7 +17,7 @@
# Additional logger names to configure (root logger is always configured)
# Root logger option
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.core.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.smoke,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests,org.apache.activemq.cli.test,org.apache.activemq.audit,org.apache.activemq.audit.message
loggers=org.jboss.logging,org.apache.activemq.artemis.core.server,org.apache.activemq.artemis.utils,org.apache.activemq.artemis.core.journal,org.apache.activemq.artemis.jms,org.apache.activemq.artemis.ra,org.apache.activemq.artemis.tests.smoke,org.apache.activemq.artemis.tests.unit,org.apache.activemq.artemis.tests.integration,org.apache.activemq.artemis.jms.tests,org.apache.activemq.cli.test,org.apache.activemq.audit,org.apache.activemq.audit.message,org.apache.curator,org.apache.zookeeper
# Root logger level
logger.level=INFO
@ -40,6 +40,10 @@ logger.org.apache.activemq.artemis.tests.smoke.level=DEBUG
logger.handlers=CONSOLE,TEST
#logger.handlers=CONSOLE,FILE
# quorum logger levels
logger.org.apache.curator.level=WARN
logger.org.apache.zookeeper.level=ERROR
# to enable audit change the level to INFO
logger.org.apache.activemq.audit.level=ERROR
logger.org.apache.activemq.audit.handlers=CONSOLE,FILE,TEST

View File

@ -0,0 +1,236 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.cli;
import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.cli.commands.activation.ActivationSequenceList;
import org.apache.activemq.artemis.cli.commands.activation.ActivationSequenceSet;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.quorum.DistributedLock;
import org.apache.activemq.artemis.quorum.DistributedPrimitiveManager;
import org.apache.activemq.artemis.quorum.MutableLong;
import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.tests.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class ActivationSequenceCommandsTest extends ActiveMQTestBase {
@Rule
public TemporaryFolder brokersFolder = new TemporaryFolder();
protected DistributedPrimitiveManagerConfiguration managerConfiguration;
@Before
@Override
public void setUp() throws Exception {
super.setUp();
managerConfiguration = new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(),
Collections.singletonMap("locks-folder", temporaryFolder.newFolder("manager").toString()));
}
@After
@Override
public void tearDown() throws Exception {
super.tearDown();
}
protected Configuration createLiveConfiguration() throws Exception {
Configuration conf = new ConfigurationImpl();
conf.setName("localhost::live");
File liveDir = brokersFolder.newFolder("live");
conf.setBrokerInstance(liveDir);
conf.addAcceptorConfiguration("live", "tcp://localhost:61616");
conf.addConnectorConfiguration("backup", "tcp://localhost:61617");
conf.addConnectorConfiguration("live", "tcp://localhost:61616");
conf.setClusterUser("mycluster");
conf.setClusterPassword("mypassword");
conf.setHAPolicyConfiguration(createReplicationLiveConfiguration());
ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration();
ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup");
ccconf.setName("cluster");
ccconf.setConnectorName("live");
conf.addClusterConfiguration(ccconf);
conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L);
return conf;
}
protected Configuration createBackupConfiguration() throws Exception {
Configuration conf = new ConfigurationImpl();
conf.setName("localhost::backup");
File backupDir = brokersFolder.newFolder("backup");
conf.setBrokerInstance(backupDir);
conf.setHAPolicyConfiguration(createReplicationBackupConfiguration());
conf.addAcceptorConfiguration("backup", "tcp://localhost:61617");
conf.addConnectorConfiguration("live", "tcp://localhost:61616");
conf.addConnectorConfiguration("backup", "tcp://localhost:61617");
conf.setClusterUser("mycluster");
conf.setClusterPassword("mypassword");
ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration();
ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("live");
ccconf.setName("cluster");
ccconf.setConnectorName("backup");
conf.addClusterConfiguration(ccconf);
conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L);
return conf;
}
protected HAPolicyConfiguration createReplicationLiveConfiguration() {
ReplicationPrimaryPolicyConfiguration haPolicy = ReplicationPrimaryPolicyConfiguration.withDefault();
haPolicy.setDistributedManagerConfiguration(managerConfiguration);
return haPolicy;
}
protected HAPolicyConfiguration createReplicationBackupConfiguration() {
ReplicationBackupPolicyConfiguration haPolicy = ReplicationBackupPolicyConfiguration.withDefault();
haPolicy.setDistributedManagerConfiguration(managerConfiguration);
haPolicy.setClusterName("cluster");
return haPolicy;
}
@Test
public void restorePrimaryCoordinatedSequence() throws Exception {
// start live
final Configuration liveConfiguration = createLiveConfiguration();
final ActiveMQServer liveServer = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration));
liveServer.setIdentity("LIVE");
liveServer.start();
Wait.waitFor(liveServer::isStarted);
final String nodeID = liveServer.getNodeID().toString();
liveServer.stop();
restoreCoordinatedSequence(liveConfiguration, liveServer, nodeID, 1);
}
@Test
public void restoreBackupCoordinatedSequence() throws Exception {
final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
// start live
Configuration liveConfiguration = createLiveConfiguration();
ActiveMQServer primaryInstance = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration));
primaryInstance.setIdentity("PRIMARY");
primaryInstance.start();
// primary initially UN REPLICATED
Assert.assertEquals(1L, primaryInstance.getNodeManager().getNodeActivationSequence());
// start backup
Configuration backupConfiguration = createBackupConfiguration();
((ReplicationBackupPolicyConfiguration)backupConfiguration.getHAPolicyConfiguration()).setAllowFailBack(true);
ActiveMQServer backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration));
backupServer.setIdentity("BACKUP");
backupServer.start();
Wait.waitFor(backupServer::isStarted);
org.apache.activemq.artemis.utils.Wait.assertTrue(() -> backupServer.isReplicaSync(), timeout);
// primary REPLICATED, backup matches (has replicated) activation sequence
Assert.assertEquals(1L, primaryInstance.getNodeManager().getNodeActivationSequence());
Assert.assertEquals(1L, backupServer.getNodeManager().getNodeActivationSequence());
primaryInstance.stop();
// backup UN REPLICATED (new version)
org.apache.activemq.artemis.utils.Wait.assertTrue(() -> 2L == backupServer.getNodeManager().getNodeActivationSequence(), timeout);
final String nodeId = backupServer.getNodeManager().getNodeId().toString();
backupServer.stop();
restoreCoordinatedSequence(backupConfiguration, backupServer, nodeId, 2);
}
@Test
public void restorePeerCoordinatedSequence() throws Exception { // start live
final Configuration liveConfiguration = createLiveConfiguration();
((ReplicationPrimaryPolicyConfiguration) liveConfiguration.getHAPolicyConfiguration()).setCoordinationId("peer-id");
final ActiveMQServer liveServer = addServer(ActiveMQServers.newActiveMQServer(liveConfiguration));
liveServer.setIdentity("LIVE");
liveServer.start();
Wait.waitFor(liveServer::isStarted);
final String nodeID = liveServer.getNodeID().toString();
liveServer.stop();
restoreCoordinatedSequence(liveConfiguration, liveServer, nodeID, 1);
}
private void restoreCoordinatedSequence(Configuration liveConfiguration,
ActiveMQServer liveServer,
String nodeID,
final long expectedStartCoordinatedSequence) throws Exception {
final ActivationSequenceList sequenceList = new ActivationSequenceList();
ActivationSequenceList.ListResult list = ActivationSequenceList.execute(sequenceList, liveConfiguration, null);
Assert.assertEquals(expectedStartCoordinatedSequence, list.coordinatedActivationSequence.longValue());
Assert.assertEquals(expectedStartCoordinatedSequence, list.localActivationSequence.longValue());
try (DistributedPrimitiveManager distributedPrimitiveManager = DistributedPrimitiveManager
.newInstanceOf(managerConfiguration.getClassName(), managerConfiguration.getProperties())) {
distributedPrimitiveManager.start();
try (DistributedLock lock = distributedPrimitiveManager.getDistributedLock(nodeID);
MutableLong coordinatedActivationSequence = distributedPrimitiveManager.getMutableLong(nodeID)) {
Assert.assertTrue(lock.tryLock());
final long activationSequence = coordinatedActivationSequence.get();
Assert.assertEquals(expectedStartCoordinatedSequence, activationSequence);
coordinatedActivationSequence.set(0);
}
sequenceList.remote = true;
Assert.assertEquals(0, ActivationSequenceList.execute(sequenceList, liveConfiguration, null)
.coordinatedActivationSequence.longValue());
final ActivationSequenceSet sequenceSet = new ActivationSequenceSet();
sequenceSet.remote = true;
sequenceSet.value = expectedStartCoordinatedSequence;
ActivationSequenceSet.execute(sequenceSet, liveConfiguration, null);
liveServer.start();
Wait.waitFor(liveServer::isStarted);
Assert.assertTrue(liveServer.isActive());
Assert.assertEquals(expectedStartCoordinatedSequence + 1, liveServer.getNodeManager().getNodeActivationSequence());
liveServer.stop();
}
}
}

View File

@ -933,13 +933,18 @@ public class FailoverTest extends FailoverTestBase {
beforeRestart(liveServer);
liveServer.start();
Assert.assertTrue("live initialized...", liveServer.getServer().waitForActivation(40, TimeUnit.SECONDS));
int i = 0;
while (!backupServer.isStarted() && i++ < 100) {
Thread.sleep(100);
if (isReplicated) {
// wait until it switch role again
Wait.assertTrue(() -> backupServer.getServer().getHAPolicy().isBackup());
// wait until started
Wait.assertTrue(backupServer::isStarted);
// wait until is an in-sync replica
Wait.assertTrue(backupServer.getServer()::isReplicaSync);
} else {
Wait.assertTrue(backupServer::isStarted);
backupServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
Assert.assertTrue(backupServer.isStarted());
}
backupServer.getServer().waitForActivation(5, TimeUnit.SECONDS);
Assert.assertTrue(backupServer.isStarted());
if (isReplicated) {
FileMoveManager moveManager = new FileMoveManager(backupServer.getServer().getConfiguration().getJournalLocation(), 0);
// backup has not had a chance to restart as a backup and cleanup

View File

@ -428,8 +428,8 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
}
liveServer.start();
Wait.waitFor(liveServer::isStarted);
Assert.assertEquals(2, liveServer.getNodeManager().getNodeActivationSequence());
Assert.assertEquals(2, distributedPrimitiveManager.getMutableLong(coordinatedId).get());
Assert.assertEquals(3, liveServer.getNodeManager().getNodeActivationSequence());
Assert.assertEquals(3, distributedPrimitiveManager.getMutableLong(coordinatedId).get());
distributedPrimitiveManager.stop();
@ -438,7 +438,7 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
backupServer.setIdentity("BACKUP");
backupServer.start();
Wait.waitFor(backupServer::isReplicaSync);
Assert.assertEquals(2, backupServer.getNodeManager().getNodeActivationSequence());
Assert.assertEquals(3, backupServer.getNodeManager().getNodeActivationSequence());
backupServer.stop();
}
@ -575,20 +575,15 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
final MutableLong activationSequence = distributedPrimitiveManager.getMutableLong(coordinatedId);
Assert.assertTrue(activationSequence.compareAndSet(2, -2));
// case: 1, the fail to write locally 2 but the write actually succeeding
// should delay pending resolution of the uncommitted claim
backupServer.start();
// live server should activate after self healing its outstanding claim
liveServer.start();
Wait.waitFor(liveServer::isStarted);
assertTrue(Wait.waitFor(backupServer::isReplicaSync));
assertTrue(liveServer.isReplicaSync());
Assert.assertEquals(3, liveServer.getNodeManager().getNodeActivationSequence());
Assert.assertEquals(3, activationSequence.get());
}
@Test
public void testUnavailableAdminIntervention() throws Exception {
public void testUnavailableAutoRepair() throws Exception {
// start backup
Configuration backupConfiguration = createBackupConfiguration();
ActiveMQServer backupServer = addServer(ActiveMQServers.newActiveMQServer(backupConfiguration));
@ -610,7 +605,6 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
final String coordinatedId = liveServer.getNodeID().toString();
System.err.println("coodr id: " + coordinatedId);
backupServer.stop();
TimeUnit.MILLISECONDS.sleep(500);
liveServer.stop();
@ -651,15 +645,14 @@ public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest
TimeUnit.MILLISECONDS.sleep(500);
// both are candidates and one of them failed to commit the claim
// let them compete on retry
Assert.assertTrue(coordinatedActivationSequence.compareAndSet(-2, 1));
// one of the two can activate
Wait.waitFor(() -> liveServer.isStarted() || backupServer.isStarted());
assertTrue(Wait.waitFor(backupServer::isReplicaSync));
assertTrue(liveServer.isReplicaSync());
assertEquals(2, backupServer.getNodeManager().getNodeActivationSequence());
assertEquals(2, liveServer.getNodeManager().getNodeActivationSequence());
assertEquals(3, backupServer.getNodeManager().getNodeActivationSequence());
assertEquals(3, liveServer.getNodeManager().getNodeActivationSequence());
}

View File

@ -137,7 +137,9 @@ public abstract class PluggableQuorumSinglePairTest extends SmokeTestBase {
protected abstract boolean awaitAsyncSetupCompleted(long timeout, TimeUnit unit) throws InterruptedException;
protected abstract void stopMajority() throws Exception;
protected abstract int[] stopMajority() throws Exception;
protected abstract void restart(int[] nodes) throws Exception;
@Before
public void setup() throws Exception {
@ -150,14 +152,33 @@ public abstract class PluggableQuorumSinglePairTest extends SmokeTestBase {
super.after();
}
@Test
public void testCanQueryEmptyBackup() throws Exception {
final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
LOGGER.info("starting primary");
Process live = primary.startServer(this, timeout);
Assert.assertTrue(awaitAsyncSetupCompleted(timeout, TimeUnit.MILLISECONDS));
Wait.assertTrue(() -> !primary.isBackup().orElse(true), timeout);
LOGGER.info("killing primary");
ServerUtil.killServer(live, forceKill);
LOGGER.info("starting backup");
backup.startServer(this, 0);
Wait.assertTrue(() -> backup.isBackup().orElse(false), timeout);
LOGGER.info("Stopping majority of consensus nodes");
final int[] stopped = stopMajority();
LOGGER.info("Waiting until isolated");
Thread.sleep(2000);
LOGGER.info("Restarting majority of consensus nodes");
restart(stopped);
Wait.assertTrue(() -> backup.isBackup().orElse(false), timeout);
}
@Test
public void testBackupFailoverAndPrimaryFailback() throws Exception {
final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
LOGGER.info("starting primary");
Process primaryInstance = primary.startServer(this, timeout);
Assert.assertTrue(awaitAsyncSetupCompleted(timeout, TimeUnit.MILLISECONDS));
Wait.assertTrue(() -> !primary.isBackup().orElse(true), timeout);
// primary UN REPLICATED
Assert.assertEquals(1L, primary.getActivationSequence().get().longValue());

View File

@ -29,6 +29,8 @@ import org.junit.Assert;
import org.junit.Test;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.containsExactNodeIds;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.decodeNetworkTopologyJson;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.liveOf;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.validateNetworkTopology;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.withBackup;
import static org.apache.activemq.artemis.tests.smoke.utils.Jmx.withLive;
@ -56,6 +58,44 @@ public class ZookeeperPluggableQuorumPeerTest extends ZookeeperPluggableQuorumSi
Wait.waitFor(this::ensembleHasLeader);
}
@Test
public void testBackupCannotForgetPeerIdOnLostQuorum() throws Exception {
// see FileLockTest::testCorrelationId to get more info why this is not peer-journal-001 as in broker.xml
final String coordinationId = "peer.journal.001";
final int timeout = (int) TimeUnit.SECONDS.toMillis(30);
LOGGER.info("starting peer a");
final Process live = primary.startServer(this, 0);
LOGGER.info("waiting peer a to increase coordinated activation sequence to 1");
Wait.assertEquals(1L, () -> primary.getActivationSequence().orElse(Long.MAX_VALUE).longValue(), timeout);
Assert.assertEquals(coordinationId, primary.getNodeID().get());
Wait.waitFor(() -> primary.listNetworkTopology().isPresent(), timeout);
final String urlPeerA = liveOf(coordinationId, decodeNetworkTopologyJson(primary.listNetworkTopology().get()));
Assert.assertNotNull(urlPeerA);
LOGGER.infof("peer a acceptor: %s", urlPeerA);
LOGGER.info("killing peer a");
ServerUtil.killServer(live, forceKill);
LOGGER.info("starting peer b");
Process emptyBackup = backup.startServer(this, 0);
LOGGER.info("waiting until peer b act as empty backup");
Wait.assertTrue(() -> backup.isBackup().orElse(false), timeout);
LOGGER.info("Stop majority of quorum nodes");
final int[] majority = stopMajority();
LOGGER.info("Wait peer b to deactivate");
Thread.sleep(2000);
LOGGER.info("Restart majority of quorum nodes");
restart(majority);
LOGGER.info("Restart peer a as legit last live");
final Process restartedLive = primary.startServer(this, 0);
LOGGER.info("waiting peer a to increase coordinated activation sequence to 2");
Wait.assertEquals(2L, () -> primary.getActivationSequence().orElse(Long.MAX_VALUE).longValue(), timeout);
Assert.assertEquals(coordinationId, primary.getNodeID().get());
LOGGER.info("waiting peer b to be a replica");
Wait.waitFor(() -> backup.isReplicaSync().orElse(false));
Wait.assertEquals(2L, () -> backup.getActivationSequence().get().longValue());
final String expectedUrlPeerA = liveOf(coordinationId, decodeNetworkTopologyJson(primary.listNetworkTopology().get()));
Assert.assertEquals(urlPeerA, expectedUrlPeerA);
}
@Test
public void testMultiPrimary_Peer() throws Exception {

View File

@ -87,11 +87,22 @@ public class ZookeeperPluggableQuorumSinglePairTest extends PluggableQuorumSingl
}
@Override
protected void stopMajority() throws Exception {
protected int[] stopMajority() throws Exception {
List<TestingZooKeeperServer> followers = testingServer.getServers();
final int quorum = (nodes / 2) + 1;
final int[] stopped = new int[quorum];
for (int i = 0; i < quorum; i++) {
followers.get(i).stop();
stopped[i] = i;
}
return stopped;
}
@Override
protected void restart(int[] nodes) throws Exception {
List<TestingZooKeeperServer> servers = testingServer.getServers();
for (int nodeIndex : nodes) {
servers.get(nodeIndex).restart();
}
}
}

View File

@ -17,13 +17,24 @@
package org.apache.activemq.artemis.tests.unit.core.server.impl;
import java.io.File;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Set;
import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.server.NodeManager;
import org.apache.activemq.artemis.core.server.impl.FileLockNodeManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.UUID;
import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static java.util.stream.Collectors.toSet;
public class FileLockTest extends ActiveMQTestBase {
@Override
@ -53,6 +64,91 @@ public class FileLockTest extends ActiveMQTestBase {
}
}
@Test
public void testNodeManagerStartPersistence() throws Exception {
final File managerDirectory = getTestDirfile();
FileLockNodeManager manager = new FileLockNodeManager(managerDirectory, false);
manager.start();
Set<File> files = Arrays.stream(managerDirectory.listFiles(pathname -> pathname.isFile())).collect(toSet());
final Set<String> expectedFileNames = Arrays.stream(new String[]{FileLockNodeManager.SERVER_LOCK_NAME, "serverlock.1", "serverlock.2"})
.collect(toSet());
Assert.assertEquals(expectedFileNames, files.stream().map(File::getName).collect(toSet()));
final File nodeIdFile = files.stream().filter(file -> file.getName().equals(FileLockNodeManager.SERVER_LOCK_NAME)).findFirst().get();
final byte[] encodedNodeId = manager.getUUID().asBytes();
try (FileChannel serverLock = FileChannel.open(nodeIdFile.toPath(), StandardOpenOption.READ)) {
Assert.assertEquals(16, encodedNodeId.length);
Assert.assertEquals(19, serverLock.size());
final ByteBuffer readNodeId = ByteBuffer.allocate(16);
serverLock.read(readNodeId, 3);
readNodeId.flip();
Assert.assertArrayEquals(encodedNodeId, readNodeId.array());
}
Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.getNodeActivationSequence());
Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.readNodeActivationSequence());
Assert.assertEquals(3, managerDirectory.listFiles(pathname -> pathname.isFile()).length);
manager.stop();
}
@Test
public void testReplicateBackupNodeManagerStartPersistence() throws Exception {
final File managerDirectory = getTestDirfile();
FileLockNodeManager manager = new FileLockNodeManager(managerDirectory, true);
manager.start();
Set<File> files = Arrays.stream(managerDirectory.listFiles(pathname -> pathname.isFile())).collect(toSet());
Assert.assertTrue(files.isEmpty());
Assert.assertNull(manager.getNodeId());
Assert.assertNull(manager.getUUID());
Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.getNodeActivationSequence());
Assert.assertEquals(NodeManager.NULL_NODE_ACTIVATION_SEQUENCE, manager.readNodeActivationSequence());
Assert.assertEquals(0, managerDirectory.listFiles(pathname -> pathname.isFile()).length);
manager.stop();
}
@Test
public void testReplicatedStopBackupPersistence() throws Exception {
final FileLockNodeManager manager = new FileLockNodeManager(getTestDirfile(), false);
manager.start();
Assert.assertNotNull(manager.getUUID());
manager.writeNodeActivationSequence(1);
final long nodeActivationSequence = manager.getNodeActivationSequence();
Assert.assertEquals(1, nodeActivationSequence);
manager.stop();
// replicated manager read activation sequence (if any) but ignore NodeId
final FileLockNodeManager replicatedManager = new FileLockNodeManager(getTestDirfile(), true);
replicatedManager.start();
Assert.assertNull(replicatedManager.getUUID());
Assert.assertEquals(1, replicatedManager.getNodeActivationSequence());
UUID storedNodeId = UUIDGenerator.getInstance().generateUUID();
replicatedManager.setNodeID(storedNodeId.toString());
replicatedManager.setNodeActivationSequence(2);
replicatedManager.stopBackup();
replicatedManager.setNodeID(UUIDGenerator.getInstance().generateStringUUID());
replicatedManager.setNodeActivationSequence(3);
replicatedManager.stop();
// start read whatever has been persisted by stopBackup
manager.start();
Assert.assertEquals(storedNodeId, manager.getUUID());
Assert.assertEquals(2, manager.getNodeActivationSequence());
manager.stop();
}
@Test
public void testWriteNodeActivationSequence() throws Exception {
final FileLockNodeManager manager = new FileLockNodeManager(getTestDirfile(), false);
manager.start();
UUID id = manager.getUUID();
Assert.assertNotNull(manager.getUUID());
manager.writeNodeActivationSequence(1);
final long nodeActivationSequence = manager.getNodeActivationSequence();
Assert.assertEquals(1, nodeActivationSequence);
manager.stop();
final FileLockNodeManager otherManager = new FileLockNodeManager(getTestDirfile(), false);
otherManager.start();
Assert.assertEquals(id, otherManager.getUUID());
Assert.assertEquals(1, otherManager.getNodeActivationSequence());
otherManager.stop();
}
@Test
public void testNIOLock() throws Exception {
doTestLock(new FileLockNodeManager(getTestDirfile(), false), new FileLockNodeManager(getTestDirfile(), false));