HDDS-728. Datanodes should use different ContainerStateMachine for each pipeline.

Contributed by Mukul Kumar Singh.
This commit is contained in:
Nanda kumar 2018-10-29 19:53:52 +05:30
parent bfb720ebc8
commit 902345de66
15 changed files with 208 additions and 80 deletions

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import org.apache.hadoop.hdds.protocol.proto
@ -120,6 +122,7 @@ public DatanodeStateMachine(DatanodeDetails datanodeDetails,
.addPublisherFor(NodeReportProto.class)
.addPublisherFor(ContainerReportsProto.class)
.addPublisherFor(CommandStatusReportsProto.class)
.addPublisherFor(PipelineReportsProto.class)
.build();
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.ratis.protocol.RaftGroupId;
/**
* This class is for maintaining Container State Machine statistics.
@ -47,9 +48,9 @@ public class CSMMetrics {
public CSMMetrics() {
}
public static CSMMetrics create() {
public static CSMMetrics create(RaftGroupId gid) {
MetricsSystem ms = DefaultMetricsSystem.instance();
return ms.register(SOURCE_NAME,
return ms.register(SOURCE_NAME + gid.toString(),
"Container State Machine",
new CSMMetrics());
}

View File

@ -66,7 +66,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
@ -112,6 +111,7 @@ public class ContainerStateMachine extends BaseStateMachine {
LoggerFactory.getLogger(ContainerStateMachine.class);
private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage();
private final RaftGroupId gid;
private final ContainerDispatcher dispatcher;
private ThreadPoolExecutor chunkExecutor;
private final XceiverServerRatis ratisServer;
@ -127,21 +127,19 @@ public class ContainerStateMachine extends BaseStateMachine {
*/
private final CSMMetrics metrics;
public ContainerStateMachine(ContainerDispatcher dispatcher,
public ContainerStateMachine(RaftGroupId gid, ContainerDispatcher dispatcher,
ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer,
int numOfExecutors) {
List<ExecutorService> executors) {
this.gid = gid;
this.dispatcher = dispatcher;
this.chunkExecutor = chunkExecutor;
this.ratisServer = ratisServer;
metrics = CSMMetrics.create(gid);
this.numExecutors = executors.size();
this.executors = executors.toArray(new ExecutorService[numExecutors]);
this.writeChunkFutureMap = new ConcurrentHashMap<>();
metrics = CSMMetrics.create();
this.createContainerFutureMap = new ConcurrentHashMap<>();
this.numExecutors = numOfExecutors;
executors = new ExecutorService[numExecutors];
containerCommandCompletionMap = new ConcurrentHashMap<>();
for (int i = 0; i < numExecutors; i++) {
executors[i] = Executors.newSingleThreadExecutor();
}
}
@Override
@ -207,7 +205,7 @@ public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
final ContainerCommandRequestProto proto =
getRequestProto(request.getMessage().getContent());
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
final StateMachineLogEntryProto log;
if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
@ -557,8 +555,5 @@ public void notifyExtendedNoLeader(RaftGroup group,
@Override
public void close() throws IOException {
for (int i = 0; i < numExecutors; i++) {
executors[i].shutdown();
}
}
}

View File

@ -76,6 +76,8 @@
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -94,11 +96,12 @@ private static long nextCallId() {
private final int port;
private final RaftServer server;
private ThreadPoolExecutor chunkExecutor;
private final List<ExecutorService> executors;
private final ContainerDispatcher dispatcher;
private ClientId clientId = ClientId.randomId();
private final StateContext context;
private final ReplicationLevel replicationLevel;
private long nodeFailureTimeoutMs;
private ContainerStateMachine stateMachine;
private XceiverServerRatis(DatanodeDetails dd, int port,
ContainerDispatcher dispatcher, Configuration conf, StateContext context)
@ -121,18 +124,22 @@ private XceiverServerRatis(DatanodeDetails dd, int port,
this.replicationLevel =
conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
stateMachine = new ContainerStateMachine(dispatcher, chunkExecutor, this,
numContainerOpExecutors);
this.executors = new ArrayList<>();
this.dispatcher = dispatcher;
for (int i = 0; i < numContainerOpExecutors; i++) {
executors.add(Executors.newSingleThreadExecutor());
}
this.server = RaftServer.newBuilder()
.setServerId(RatisHelper.toRaftPeerId(dd))
.setProperties(serverProperties)
.setStateMachine(stateMachine)
.setStateMachineRegistry(this::getStateMachine)
.build();
}
@VisibleForTesting
public ContainerStateMachine getStateMachine() {
return stateMachine;
private ContainerStateMachine getStateMachine(RaftGroupId gid) {
return new ContainerStateMachine(gid, dispatcher, chunkExecutor,
this, Collections.unmodifiableList(executors));
}
private RaftProperties newRaftProperties(Configuration conf) {
@ -310,8 +317,11 @@ public void start() throws IOException {
@Override
public void stop() {
try {
chunkExecutor.shutdown();
// shutdown server before the executors as while shutting down,
// some of the tasks would be executed using the executors.
server.close();
chunkExecutor.shutdown();
executors.forEach(ExecutorService::shutdown);
} catch (IOException e) {
throw new RuntimeException(e);
}

View File

@ -45,7 +45,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<hdds.version>0.4.0-SNAPSHOT</hdds.version>
<!-- Apache Ratis version -->
<ratis.version>0.3.0-aa38160-SNAPSHOT</ratis.version>
<ratis.version>0.3.0-2272086-SNAPSHOT</ratis.version>
<bouncycastle.version>1.60</bouncycastle.version>

View File

@ -523,9 +523,13 @@ public void processContainerReports(DatanodeDetails datanodeDetails,
try {
containerStateManager.updateContainerReplica(id, replica);
ContainerInfo currentInfo = containerStateManager.getContainer(id);
if (newInfo.getState() == LifeCycleState.CLOSING
&& currentInfo.getState() == LifeCycleState.CLOSED) {
if (newInfo.getState() == LifeCycleState.CLOSED
&& currentInfo.getState() == LifeCycleState.CLOSING) {
currentInfo = updateContainerStateInternal(id, LifeCycleEvent.CLOSE);
if (!currentInfo.isOpen()) {
pipelineManager.removeContainerFromPipeline(
currentInfo.getPipelineID(), id);
}
}
HddsProtos.SCMContainerInfo newState =

View File

@ -118,7 +118,7 @@ public void testPipelineFail() throws InterruptedException, IOException,
pipelineManager.getPipeline(ratisContainer2.getPipeline().getId())
.getPipelineState());
// Now restart the datanode and make sure that a new pipeline is created.
cluster.restartHddsDatanode(dnToFail);
cluster.restartHddsDatanode(dnToFail, true);
ContainerWithPipeline ratisContainer3 =
containerManager.allocateContainer(RATIS, THREE, "testOwner");
//Assert that new container is not created from the ratis 2 pipeline

View File

@ -156,16 +156,16 @@ void restartStorageContainerManager() throws InterruptedException,
*
* @param i index of HddsDatanode in the MiniOzoneCluster
*/
void restartHddsDatanode(int i) throws InterruptedException,
TimeoutException;
void restartHddsDatanode(int i, boolean waitForDatanode)
throws InterruptedException, TimeoutException;
/**
* Restart a particular HddsDatanode.
*
* @param dn HddsDatanode in the MiniOzoneCluster
*/
void restartHddsDatanode(DatanodeDetails dn) throws InterruptedException,
TimeoutException, IOException;
void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode)
throws InterruptedException, TimeoutException, IOException;
/**
* Shutdown a particular HddsDatanode.
*

View File

@ -232,8 +232,8 @@ public void restartOzoneManager() throws IOException {
}
@Override
public void restartHddsDatanode(int i) throws InterruptedException,
TimeoutException {
public void restartHddsDatanode(int i, boolean waitForDatanode)
throws InterruptedException, TimeoutException {
HddsDatanodeService datanodeService = hddsDatanodes.get(i);
datanodeService.stop();
datanodeService.join();
@ -248,20 +248,24 @@ public void restartHddsDatanode(int i) throws InterruptedException,
conf.setInt(DFS_CONTAINER_RATIS_IPC_PORT, ratisPort);
conf.setBoolean(DFS_CONTAINER_RATIS_IPC_RANDOM_PORT, false);
hddsDatanodes.remove(i);
// wait for node to be removed from SCM healthy node list.
waitForClusterToBeReady();
if (waitForDatanode) {
// wait for node to be removed from SCM healthy node list.
waitForClusterToBeReady();
}
HddsDatanodeService service =
HddsDatanodeService.createHddsDatanodeService(conf);
hddsDatanodes.add(i, service);
service.start(null);
// wait for the node to be identified as a healthy node again.
waitForClusterToBeReady();
if (waitForDatanode) {
// wait for the node to be identified as a healthy node again.
waitForClusterToBeReady();
}
}
@Override
public void restartHddsDatanode(DatanodeDetails dn)
public void restartHddsDatanode(DatanodeDetails dn, boolean waitForDatanode)
throws InterruptedException, TimeoutException, IOException {
restartHddsDatanode(getHddsDatanodeIndex(dn));
restartHddsDatanode(getHddsDatanodeIndex(dn), waitForDatanode);
}
@Override

View File

@ -137,7 +137,7 @@ public void testBCSID() throws Exception {
omKeyLocationInfo.getBlockCommitSequenceId());
// verify that on restarting the datanode, it reloads the BCSID correctly.
cluster.restartHddsDatanode(0);
cluster.restartHddsDatanode(0, true);
Assert.assertEquals(blockCommitSequenceId,
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer().getContainerSet()

View File

@ -177,7 +177,7 @@ public void testBlockDeletion()
// Containers in the DN and SCM should have same delete transactionIds
// after DN restart. The assertion is just to verify that the state of
// containerInfos in dn and scm is consistent after dn restart.
cluster.restartHddsDatanode(0);
cluster.restartHddsDatanode(0, true);
matchContainerTransactionIds();
// verify PENDING_DELETE_STATUS event is fired
@ -210,7 +210,7 @@ private void waitForDatanodeCommandRetry()
GenericTestUtils.waitFor(() -> logCapturer.getOutput()
.contains("RetriableDatanodeCommand type=deleteBlocksCommand"),
500, 5000);
cluster.restartHddsDatanode(0);
cluster.restartHddsDatanode(0, true);
}
private void verifyTransactionsCommitted() throws IOException {

View File

@ -326,7 +326,7 @@ static void runTestPutKey(PutHelper helper) throws Exception {
private static void restartDatanode(MiniOzoneCluster cluster, int datanodeIdx)
throws Exception {
cluster.restartHddsDatanode(datanodeIdx);
cluster.restartHddsDatanode(datanodeIdx, true);
}
@Test

View File

@ -33,7 +33,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
<hadoop.version>3.2.1-SNAPSHOT</hadoop.version>
<hdds.version>0.4.0-SNAPSHOT</hdds.version>
<ozone.version>0.4.0-SNAPSHOT</ozone.version>
<ratis.version>0.3.0-aa38160-SNAPSHOT</ratis.version>
<ratis.version>0.3.0-2272086-SNAPSHOT</ratis.version>
<bouncycastle.version>1.60</bouncycastle.version>
<ozone.release>Badlands</ozone.release>
<declared.ozone.version>${ozone.version}</declared.ozone.version>

View File

@ -0,0 +1,130 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.freon;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.transport
.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis
.XceiverServerRatis;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* Tests Freon with Datanode restarts without waiting for pipeline to close.
*/
public class TestFreonWithDatanodeFastRestart {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf)
.setHbProcessorInterval(1000)
.setHbInterval(1000)
.setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testRestart() throws Exception {
startFreon();
StateMachine sm = getStateMachine();
TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex();
cluster.restartHddsDatanode(0, false);
sm = getStateMachine();
SimpleStateMachineStorage storage =
(SimpleStateMachineStorage)sm.getStateMachineStorage();
SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot();
TermIndex termInSnapshot = snapshotInfo.getTermIndex();
String expectedSnapFile =
storage.getSnapshotFile(termIndexBeforeRestart.getTerm(),
termIndexBeforeRestart.getIndex()).getAbsolutePath();
Assert.assertEquals(snapshotInfo.getFile().getPath().toString(),
expectedSnapFile);
Assert.assertEquals(termInSnapshot, termIndexBeforeRestart);
// After restart the term index might have progressed to apply pending
// transactions.
TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex();
Assert.assertTrue(termIndexAfterRestart.getIndex() >=
termIndexBeforeRestart.getIndex());
startFreon();
}
private void startFreon() throws Exception {
RandomKeyGenerator randomKeyGenerator =
new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
randomKeyGenerator.setNumOfVolumes(1);
randomKeyGenerator.setNumOfBuckets(1);
randomKeyGenerator.setNumOfKeys(1);
randomKeyGenerator.setType(ReplicationType.RATIS);
randomKeyGenerator.setFactor(ReplicationFactor.THREE);
randomKeyGenerator.setKeySize(20971520);
randomKeyGenerator.setValidateWrites(true);
randomKeyGenerator.call();
Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount());
}
private StateMachine getStateMachine() throws Exception {
XceiverServerSpi server =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
getContainer().getServer(HddsProtos.ReplicationType.RATIS);
RaftServerProxy proxy =
(RaftServerProxy)(((XceiverServerRatis)server).getServer());
RaftGroupId groupId = proxy.getGroupIds().iterator().next();
RaftServerImpl impl = proxy.getImpl(groupId);
return impl.getStateMachine();
}
}

View File

@ -18,17 +18,11 @@
package org.apache.hadoop.ozone.freon;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@ -36,7 +30,10 @@
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
.OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests Freon with Datanode restarts.
@ -56,6 +53,12 @@ public class TestFreonWithDatanodeRestart {
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
conf.setTimeDuration(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, 1,
TimeUnit.SECONDS);
conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 1,
TimeUnit.SECONDS);
conf.setTimeDuration(OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT, 5,
TimeUnit.SECONDS);
cluster = MiniOzoneCluster.newBuilder(conf)
.setHbProcessorInterval(1000)
.setHbInterval(1000)
@ -76,6 +79,12 @@ public static void shutdown() {
@Test
public void testRestart() throws Exception {
startFreon();
cluster.restartHddsDatanode(0, true);
startFreon();
}
private void startFreon() throws Exception {
RandomKeyGenerator randomKeyGenerator =
new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
randomKeyGenerator.setNumOfVolumes(1);
@ -90,33 +99,5 @@ public void testRestart() throws Exception {
Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount());
ContainerStateMachine sm = getStateMachine();
TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex();
cluster.restartHddsDatanode(0);
sm = getStateMachine();
SimpleStateMachineStorage storage =
(SimpleStateMachineStorage)sm.getStateMachineStorage();
SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot();
TermIndex termInSnapshot = snapshotInfo.getTermIndex();
String expectedSnapFile =
storage.getSnapshotFile(termIndexBeforeRestart.getTerm(),
termIndexBeforeRestart.getIndex()).getAbsolutePath();
Assert.assertEquals(snapshotInfo.getFile().getPath().toString(),
expectedSnapFile);
Assert.assertEquals(termInSnapshot, termIndexBeforeRestart);
// After restart the term index might have progressed to apply pending
// transactions.
TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex();
Assert.assertTrue(termIndexAfterRestart.getIndex() >=
termIndexBeforeRestart.getIndex());
}
private ContainerStateMachine getStateMachine() {
XceiverServerSpi server =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
getContainer().getServer(HddsProtos.ReplicationType.RATIS);
return ((XceiverServerRatis)server).getStateMachine();
}
}