From 902345de66b7ee4ceb03ae4a61ea96c4b6b6eaa7 Mon Sep 17 00:00:00 2001 From: Nanda kumar Date: Mon, 29 Oct 2018 19:53:52 +0530 Subject: [PATCH] HDDS-728. Datanodes should use different ContainerStateMachine for each pipeline. Contributed by Mukul Kumar Singh. --- .../statemachine/DatanodeStateMachine.java | 3 + .../transport/server/ratis/CSMMetrics.java | 5 +- .../server/ratis/ContainerStateMachine.java | 21 ++- .../server/ratis/XceiverServerRatis.java | 26 ++-- hadoop-hdds/pom.xml | 2 +- .../scm/container/SCMContainerManager.java | 8 +- .../hdds/scm/pipeline/TestNodeFailure.java | 2 +- .../apache/hadoop/ozone/MiniOzoneCluster.java | 8 +- .../hadoop/ozone/MiniOzoneClusterImpl.java | 20 +-- .../hadoop/ozone/client/rpc/TestBCSID.java | 2 +- .../commandhandler/TestBlockDeletion.java | 4 +- .../hadoop/ozone/web/client/TestKeys.java | 2 +- hadoop-ozone/pom.xml | 2 +- .../TestFreonWithDatanodeFastRestart.java | 130 ++++++++++++++++++ .../freon/TestFreonWithDatanodeRestart.java | 53 +++---- 15 files changed, 208 insertions(+), 80 deletions(-) create mode 100644 hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 85fa304396e..4768cf8ff69 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -25,6 +25,8 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto; import org.apache.hadoop.hdds.protocol.proto @@ -120,6 +122,7 @@ public class DatanodeStateMachine implements Closeable { .addPublisherFor(NodeReportProto.class) .addPublisherFor(ContainerReportsProto.class) .addPublisherFor(CommandStatusReportsProto.class) + .addPublisherFor(PipelineReportsProto.class) .build(); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java index b6aed605a68..9ccf88ac777 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java @@ -24,6 +24,7 @@ import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.ratis.protocol.RaftGroupId; /** * This class is for maintaining Container State Machine statistics. @@ -47,9 +48,9 @@ public class CSMMetrics { public CSMMetrics() { } - public static CSMMetrics create() { + public static CSMMetrics create(RaftGroupId gid) { MetricsSystem ms = DefaultMetricsSystem.instance(); - return ms.register(SOURCE_NAME, + return ms.register(SOURCE_NAME + gid.toString(), "Container State Machine", new CSMMetrics()); } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index bcbf93f1587..ac0833bb9a6 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -66,7 +66,6 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; import java.util.stream.Collectors; @@ -112,6 +111,7 @@ public class ContainerStateMachine extends BaseStateMachine { LoggerFactory.getLogger(ContainerStateMachine.class); private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); + private final RaftGroupId gid; private final ContainerDispatcher dispatcher; private ThreadPoolExecutor chunkExecutor; private final XceiverServerRatis ratisServer; @@ -127,21 +127,19 @@ public class ContainerStateMachine extends BaseStateMachine { */ private final CSMMetrics metrics; - public ContainerStateMachine(ContainerDispatcher dispatcher, + public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher, ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer, - int numOfExecutors) { + List executors) { + this.gid = gid; this.dispatcher = dispatcher; this.chunkExecutor = chunkExecutor; this.ratisServer = ratisServer; + metrics = CSMMetrics.create(gid); + this.numExecutors = executors.size(); + this.executors = executors.toArray(new ExecutorService[numExecutors]); this.writeChunkFutureMap = new ConcurrentHashMap<>(); - metrics = CSMMetrics.create(); this.createContainerFutureMap = new ConcurrentHashMap<>(); - this.numExecutors = numOfExecutors; - executors = new ExecutorService[numExecutors]; containerCommandCompletionMap = new ConcurrentHashMap<>(); - for (int i = 0; i < numExecutors; i++) { - executors[i] = Executors.newSingleThreadExecutor(); - } } @Override @@ -207,7 +205,7 @@ public class ContainerStateMachine extends BaseStateMachine { throws IOException { final ContainerCommandRequestProto proto = getRequestProto(request.getMessage().getContent()); - + Preconditions.checkArgument(request.getRaftGroupId().equals(gid)); final StateMachineLogEntryProto log; if (proto.getCmdType() == Type.WriteChunk) { final WriteChunkRequestProto write = proto.getWriteChunk(); @@ -557,8 +555,5 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public void close() throws IOException { - for (int i = 0; i < numExecutors; i++) { - executors[i].shutdown(); - } } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index b5092d9a364..599f821b8ea 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -76,6 +76,8 @@ import java.util.Objects; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -94,11 +96,12 @@ public final class XceiverServerRatis implements XceiverServerSpi { private final int port; private final RaftServer server; private ThreadPoolExecutor chunkExecutor; + private final List executors; + private final ContainerDispatcher dispatcher; private ClientId clientId = ClientId.randomId(); private final StateContext context; private final ReplicationLevel replicationLevel; private long nodeFailureTimeoutMs; - private ContainerStateMachine stateMachine; private XceiverServerRatis(DatanodeDetails dd, int port, ContainerDispatcher dispatcher, Configuration conf, StateContext context) @@ -121,18 +124,22 @@ public final class XceiverServerRatis implements XceiverServerSpi { this.replicationLevel = conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY, OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT); - stateMachine = new ContainerStateMachine(dispatcher, chunkExecutor, this, - numContainerOpExecutors); + this.executors = new ArrayList<>(); + this.dispatcher = dispatcher; + for (int i = 0; i < numContainerOpExecutors; i++) { + executors.add(Executors.newSingleThreadExecutor()); + } + this.server = RaftServer.newBuilder() .setServerId(RatisHelper.toRaftPeerId(dd)) .setProperties(serverProperties) - .setStateMachine(stateMachine) + .setStateMachineRegistry(this::getStateMachine) .build(); } - @VisibleForTesting - public ContainerStateMachine getStateMachine() { - return stateMachine; + private ContainerStateMachine getStateMachine(RaftGroupId gid) { + return new ContainerStateMachine(gid, dispatcher, chunkExecutor, + this, Collections.unmodifiableList(executors)); } private RaftProperties newRaftProperties(Configuration conf) { @@ -310,8 +317,11 @@ public final class XceiverServerRatis implements XceiverServerSpi { @Override public void stop() { try { - chunkExecutor.shutdown(); + // shutdown server before the executors as while shutting down, + // some of the tasks would be executed using the executors. server.close(); + chunkExecutor.shutdown(); + executors.forEach(ExecutorService::shutdown); } catch (IOException e) { throw new RuntimeException(e); } diff --git a/hadoop-hdds/pom.xml b/hadoop-hdds/pom.xml index bedf78dc744..f960e9052a4 100644 --- a/hadoop-hdds/pom.xml +++ b/hadoop-hdds/pom.xml @@ -45,7 +45,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> 0.4.0-SNAPSHOT - 0.3.0-aa38160-SNAPSHOT + 0.3.0-2272086-SNAPSHOT 1.60 diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 1666b7cec6f..0f980dc18c9 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -523,9 +523,13 @@ public class SCMContainerManager implements ContainerManager { try { containerStateManager.updateContainerReplica(id, replica); ContainerInfo currentInfo = containerStateManager.getContainer(id); - if (newInfo.getState() == LifeCycleState.CLOSING - && currentInfo.getState() == LifeCycleState.CLOSED) { + if (newInfo.getState() == LifeCycleState.CLOSED + && currentInfo.getState() == LifeCycleState.CLOSING) { currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE); + if (!currentInfo.isOpen()) { + pipelineManager.removeContainerFromPipeline( + currentInfo.getPipelineID(), id); + } } HddsProtos.SCMContainerInfo newState = diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java index 45886c66f9e..9a1c70536c4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java @@ -118,7 +118,7 @@ public class TestNodeFailure { pipelineManager.getPipeline(ratisContainer2.getPipeline().getId()) .getPipelineState()); // Now restart the datanode and make sure that a new pipeline is created. - cluster.restartHddsDatanode(dnToFail); + cluster.restartHddsDatanode(dnToFail, true); ContainerWithPipeline ratisContainer3 = containerManager.allocateContainer(RATIS, THREE, "testOwner"); //Assert that new container is not created from the ratis 2 pipeline diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java index d13efb4343e..3aad7f747f2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java @@ -156,16 +156,16 @@ public interface MiniOzoneCluster { * * @param i index of HddsDatanode in the MiniOzoneCluster */ - void restartHddsDatanode(int i) throws InterruptedException, - TimeoutException; + void restartHddsDatanode(int i, boolean waitForDatanode) + throws InterruptedException, TimeoutException; /** * Restart a particular HddsDatanode. * * @param dn HddsDatanode in the MiniOzoneCluster */ - void restartHddsDatanode(DatanodeDetails dn) throws InterruptedException, - TimeoutException, IOException; + void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode) + throws InterruptedException, TimeoutException, IOException; /** * Shutdown a particular HddsDatanode. * diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java index ae5245194d3..11bc0e05fd4 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java @@ -232,8 +232,8 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { } @Override - public void restartHddsDatanode(int i) throws InterruptedException, - TimeoutException { + public void restartHddsDatanode(int i, boolean waitForDatanode) + throws InterruptedException, TimeoutException { HddsDatanodeService datanodeService = hddsDatanodes.get(i); datanodeService.stop(); datanodeService.join(); @@ -248,20 +248,24 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster { conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort); conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false); hddsDatanodes.remove(i); - // wait for node to be removed from SCM healthy node list. - waitForClusterToBeReady(); + if (waitForDatanode) { + // wait for node to be removed from SCM healthy node list. + waitForClusterToBeReady(); + } HddsDatanodeService service = HddsDatanodeService.createHddsDatanodeService(conf); hddsDatanodes.add(i, service); service.start(null); - // wait for the node to be identified as a healthy node again. - waitForClusterToBeReady(); + if (waitForDatanode) { + // wait for the node to be identified as a healthy node again. + waitForClusterToBeReady(); + } } @Override - public void restartHddsDatanode(DatanodeDetails dn) + public void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode) throws InterruptedException, TimeoutException, IOException { - restartHddsDatanode(getHddsDatanodeIndex(dn)); + restartHddsDatanode(getHddsDatanodeIndex(dn), waitForDatanode); } @Override diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java index ed4629c9da6..98099be9429 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java @@ -137,7 +137,7 @@ public class TestBCSID { omKeyLocationInfo.getBlockCommitSequenceId()); // verify that on restarting the datanode, it reloads the BCSID correctly. - cluster.restartHddsDatanode(0); + cluster.restartHddsDatanode(0, true); Assert.assertEquals(blockCommitSequenceId, cluster.getHddsDatanodes().get(0).getDatanodeStateMachine() .getContainer().getContainerSet() diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java index e4cbad5e2ce..63346d24aee 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java @@ -177,7 +177,7 @@ public class TestBlockDeletion { // Containers in the DN and SCM should have same delete transactionIds // after DN restart. The assertion is just to verify that the state of // containerInfos in dn and scm is consistent after dn restart. - cluster.restartHddsDatanode(0); + cluster.restartHddsDatanode(0, true); matchContainerTransactionIds(); // verify PENDING_DELETE_STATUS event is fired @@ -210,7 +210,7 @@ public class TestBlockDeletion { GenericTestUtils.waitFor(() -> logCapturer.getOutput() .contains("RetriableDatanodeCommand type=deleteBlocksCommand"), 500, 5000); - cluster.restartHddsDatanode(0); + cluster.restartHddsDatanode(0, true); } private void verifyTransactionsCommitted() throws IOException { diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java index 1ecedcc6172..08905ebe0b2 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java @@ -326,7 +326,7 @@ public class TestKeys { private static void restartDatanode(MiniOzoneCluster cluster, int datanodeIdx) throws Exception { - cluster.restartHddsDatanode(datanodeIdx); + cluster.restartHddsDatanode(datanodeIdx, true); } @Test diff --git a/hadoop-ozone/pom.xml b/hadoop-ozone/pom.xml index 5e531340a16..2fcffab68cf 100644 --- a/hadoop-ozone/pom.xml +++ b/hadoop-ozone/pom.xml @@ -33,7 +33,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> 3.2.1-SNAPSHOT 0.4.0-SNAPSHOT 0.4.0-SNAPSHOT - 0.3.0-aa38160-SNAPSHOT + 0.3.0-2272086-SNAPSHOT 1.60 Badlands ${ozone.version} diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java new file mode 100644 index 00000000000..44f6f1d04e0 --- /dev/null +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeFastRestart.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.freon; + +import org.apache.hadoop.hdds.client.ReplicationFactor; +import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.ozone.MiniOzoneCluster; +import org.apache.hadoop.ozone.container.common.transport + .server.XceiverServerSpi; +import org.apache.hadoop.ozone.container.common.transport.server.ratis + .XceiverServerRatis; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.server.impl.RaftServerImpl; +import org.apache.ratis.server.impl.RaftServerProxy; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.statemachine.StateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Tests Freon with Datanode restarts without waiting for pipeline to close. + */ +public class TestFreonWithDatanodeFastRestart { + + private static MiniOzoneCluster cluster; + private static OzoneConfiguration conf; + + /** + * Create a MiniDFSCluster for testing. + *

+ * Ozone is made active by setting OZONE_ENABLED = true + * + */ + @BeforeClass + public static void init() throws Exception { + conf = new OzoneConfiguration(); + cluster = MiniOzoneCluster.newBuilder(conf) + .setHbProcessorInterval(1000) + .setHbInterval(1000) + .setNumDatanodes(3) + .build(); + cluster.waitForClusterToBeReady(); + } + + /** + * Shutdown MiniDFSCluster. + */ + @AfterClass + public static void shutdown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + @Test + public void testRestart() throws Exception { + startFreon(); + StateMachine sm = getStateMachine(); + TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex(); + cluster.restartHddsDatanode(0, false); + sm = getStateMachine(); + SimpleStateMachineStorage storage = + (SimpleStateMachineStorage)sm.getStateMachineStorage(); + SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot(); + TermIndex termInSnapshot = snapshotInfo.getTermIndex(); + String expectedSnapFile = + storage.getSnapshotFile(termIndexBeforeRestart.getTerm(), + termIndexBeforeRestart.getIndex()).getAbsolutePath(); + Assert.assertEquals(snapshotInfo.getFile().getPath().toString(), + expectedSnapFile); + Assert.assertEquals(termInSnapshot, termIndexBeforeRestart); + + // After restart the term index might have progressed to apply pending + // transactions. + TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex(); + Assert.assertTrue(termIndexAfterRestart.getIndex() >= + termIndexBeforeRestart.getIndex()); + startFreon(); + } + + private void startFreon() throws Exception { + RandomKeyGenerator randomKeyGenerator = + new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); + randomKeyGenerator.setNumOfVolumes(1); + randomKeyGenerator.setNumOfBuckets(1); + randomKeyGenerator.setNumOfKeys(1); + randomKeyGenerator.setType(ReplicationType.RATIS); + randomKeyGenerator.setFactor(ReplicationFactor.THREE); + randomKeyGenerator.setKeySize(20971520); + randomKeyGenerator.setValidateWrites(true); + randomKeyGenerator.call(); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); + Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded()); + Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount()); + } + + private StateMachine getStateMachine() throws Exception { + XceiverServerSpi server = + cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). + getContainer().getServer(HddsProtos.ReplicationType.RATIS); + RaftServerProxy proxy = + (RaftServerProxy)(((XceiverServerRatis)server).getServer()); + RaftGroupId groupId = proxy.getGroupIds().iterator().next(); + RaftServerImpl impl = proxy.getImpl(groupId); + return impl.getStateMachine(); + } +} diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java index a1c50b673fa..7cb53d39a33 100644 --- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java +++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java @@ -18,17 +18,11 @@ package org.apache.hadoop.ozone.freon; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; import org.apache.hadoop.hdds.conf.OzoneConfiguration; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.ozone.MiniOzoneCluster; -import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine; -import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis; -import org.apache.ratis.server.protocol.TermIndex; -import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; -import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -36,7 +30,10 @@ import org.junit.Test; import java.util.concurrent.TimeUnit; -import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys + .OZONE_SCM_STALENODE_INTERVAL; /** * Tests Freon with Datanode restarts. @@ -56,6 +53,12 @@ public class TestFreonWithDatanodeRestart { public static void init() throws Exception { conf = new OzoneConfiguration(); conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS); + conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1, + TimeUnit.SECONDS); + conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 1, + TimeUnit.SECONDS); + conf.setTimeDuration(OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, 5, + TimeUnit.SECONDS); cluster = MiniOzoneCluster.newBuilder(conf) .setHbProcessorInterval(1000) .setHbInterval(1000) @@ -76,6 +79,12 @@ public class TestFreonWithDatanodeRestart { @Test public void testRestart() throws Exception { + startFreon(); + cluster.restartHddsDatanode(0, true); + startFreon(); + } + + private void startFreon() throws Exception { RandomKeyGenerator randomKeyGenerator = new RandomKeyGenerator((OzoneConfiguration) cluster.getConf()); randomKeyGenerator.setNumOfVolumes(1); @@ -90,33 +99,5 @@ public class TestFreonWithDatanodeRestart { Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated()); Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded()); Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount()); - - ContainerStateMachine sm = getStateMachine(); - TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex(); - cluster.restartHddsDatanode(0); - sm = getStateMachine(); - SimpleStateMachineStorage storage = - (SimpleStateMachineStorage)sm.getStateMachineStorage(); - SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot(); - TermIndex termInSnapshot = snapshotInfo.getTermIndex(); - String expectedSnapFile = - storage.getSnapshotFile(termIndexBeforeRestart.getTerm(), - termIndexBeforeRestart.getIndex()).getAbsolutePath(); - Assert.assertEquals(snapshotInfo.getFile().getPath().toString(), - expectedSnapFile); - Assert.assertEquals(termInSnapshot, termIndexBeforeRestart); - - // After restart the term index might have progressed to apply pending - // transactions. - TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex(); - Assert.assertTrue(termIndexAfterRestart.getIndex() >= - termIndexBeforeRestart.getIndex()); - } - - private ContainerStateMachine getStateMachine() { - XceiverServerSpi server = - cluster.getHddsDatanodes().get(0).getDatanodeStateMachine(). - getContainer().getServer(HddsProtos.ReplicationType.RATIS); - return ((XceiverServerRatis)server).getStateMachine(); } }