HDDS-638. Enable ratis snapshots for HDDS datanodes. Contributed by Mukul Kumar Singh.

This commit is contained in:
Mukul Kumar Singh 2018-10-22 19:59:35 +05:30
parent 52cb766ad0
commit 82919a1e7a
7 changed files with 233 additions and 9 deletions

View File

@ -104,6 +104,10 @@ public final class ScmConfigKeys {
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT = DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
TimeDuration.valueOf(1, TimeUnit.SECONDS); TimeDuration.valueOf(1, TimeUnit.SECONDS);
public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY =
"dfs.ratis.snapshot.threshold";
public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT = 10000;
public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY = public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
"dfs.ratis.server.failure.duration"; "dfs.ratis.server.failure.duration";
public static final TimeDuration public static final TimeDuration

View File

@ -266,6 +266,10 @@ public final class OzoneConfigKeys {
public static final TimeDuration public static final TimeDuration
DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT = DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
ScmConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT; ScmConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT;
public static final String DFS_RATIS_SNAPSHOT_THRESHOLD_KEY =
ScmConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY;
public static final long DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT =
ScmConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT;
public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY = public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY; ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY;

View File

@ -118,6 +118,14 @@
etc. This picks one of those for this cluster. etc. This picks one of those for this cluster.
</description> </description>
</property> </property>
<property>
<name>dfs.ratis.snapshot.threshold</name>
<value>10000</value>
<tag>OZONE, RATIS</tag>
<description>Number of transactions after which a ratis snapshot should be
taken.
</description>
</property>
<property> <property>
<name>dfs.container.ratis.num.write.chunk.threads</name> <name>dfs.container.ratis.num.write.chunk.threads</name>
<value>60</value> <value>60</value>

View File

@ -25,6 +25,9 @@ import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.RaftServerConstants;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.apache.ratis.thirdparty.com.google.protobuf import org.apache.ratis.thirdparty.com.google.protobuf
.InvalidProtocolBufferException; .InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Type;
@ -55,8 +58,10 @@ import org.apache.ratis.statemachine.impl.TransactionContextImpl;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor;
@ -115,6 +120,7 @@ public class ContainerStateMachine extends BaseStateMachine {
createContainerFutureMap; createContainerFutureMap;
private ExecutorService[] executors; private ExecutorService[] executors;
private final int numExecutors; private final int numExecutors;
private final Map<Long, Long> containerCommandCompletionMap;
/** /**
* CSM metrics. * CSM metrics.
*/ */
@ -131,6 +137,7 @@ public class ContainerStateMachine extends BaseStateMachine {
this.createContainerFutureMap = new ConcurrentHashMap<>(); this.createContainerFutureMap = new ConcurrentHashMap<>();
this.numExecutors = numOfExecutors; this.numExecutors = numOfExecutors;
executors = new ExecutorService[numExecutors]; executors = new ExecutorService[numExecutors];
containerCommandCompletionMap = new ConcurrentHashMap<>();
for (int i = 0; i < numExecutors; i++) { for (int i = 0; i < numExecutors; i++) {
executors[i] = Executors.newSingleThreadExecutor(); executors[i] = Executors.newSingleThreadExecutor();
} }
@ -151,10 +158,47 @@ public class ContainerStateMachine extends BaseStateMachine {
throws IOException { throws IOException {
super.initialize(server, id, raftStorage); super.initialize(server, id, raftStorage);
storage.init(raftStorage); storage.init(raftStorage);
// TODO handle snapshots
// TODO: Add a flag that tells you that initialize has been called. loadSnapshot(storage.getLatestSnapshot());
// Check with Ratis if this feature is done in Ratis. }
private long loadSnapshot(SingleFileSnapshotInfo snapshot) {
if (snapshot == null) {
TermIndex empty = TermIndex.newTermIndex(0, 0);
LOG.info("The snapshot info is null." +
"Setting the last applied index to:" + empty);
setLastAppliedTermIndex(empty);
return RaftServerConstants.INVALID_LOG_INDEX;
}
final TermIndex last =
SimpleStateMachineStorage.getTermIndexFromSnapshotFile(
snapshot.getFile().getPath().toFile());
LOG.info("Setting the last applied index to " + last);
setLastAppliedTermIndex(last);
return last.getIndex();
}
@Override
public long takeSnapshot() throws IOException {
TermIndex ti = getLastAppliedTermIndex();
LOG.info("Taking snapshot at termIndex:" + ti);
if (ti != null) {
final File snapshotFile =
storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
LOG.info("Taking a snapshot to file {}", snapshotFile);
try {
//TODO: For now, just create the file to save the term index,
//persist open container info to snapshot later.
snapshotFile.createNewFile();
} catch(IOException ioe) {
LOG.warn("Failed to write snapshot file \"" + snapshotFile
+ "\", last applied index=" + ti);
throw ioe;
}
return ti.getIndex();
}
return -1;
} }
@Override @Override
@ -353,10 +397,9 @@ public class ContainerStateMachine extends BaseStateMachine {
public CompletableFuture<Void> flushStateMachineData(long index) { public CompletableFuture<Void> flushStateMachineData(long index) {
List<CompletableFuture<Message>> futureList = List<CompletableFuture<Message>> futureList =
writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index) writeChunkFutureMap.entrySet().stream().filter(x -> x.getKey() <= index)
.map(x -> x.getValue()).collect(Collectors.toList()); .map(Map.Entry::getValue).collect(Collectors.toList());
CompletableFuture<Void> combinedFuture = CompletableFuture.allOf( return CompletableFuture.allOf(
futureList.toArray(new CompletableFuture[futureList.size()])); futureList.toArray(new CompletableFuture[futureList.size()]));
return combinedFuture;
} }
/* /*
* This api is used by the leader while appending logs to the follower * This api is used by the leader while appending logs to the follower
@ -394,11 +437,28 @@ public class ContainerStateMachine extends BaseStateMachine {
} }
} }
private void updateLastApplied() {
Long appliedTerm = null;
long appliedIndex = -1;
for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {
final Long removed = containerCommandCompletionMap.remove(i);
if (removed == null) {
break;
}
appliedTerm = removed;
appliedIndex = i;
}
if (appliedTerm != null) {
updateLastAppliedTermIndex(appliedIndex, appliedTerm);
}
}
/* /*
* ApplyTransaction calls in Ratis are sequential. * ApplyTransaction calls in Ratis are sequential.
*/ */
@Override @Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) { public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
long index = trx.getLogEntry().getIndex();
try { try {
metrics.incNumApplyTransactionsOps(); metrics.incNumApplyTransactionsOps();
ContainerCommandRequestProto requestProto = ContainerCommandRequestProto requestProto =
@ -418,7 +478,7 @@ public class ContainerStateMachine extends BaseStateMachine {
blockDataProto.getBlockID()); blockDataProto.getBlockID());
return completeExceptionally(ioe); return completeExceptionally(ioe);
} }
blockData.setBlockCommitSequenceId(trx.getLogEntry().getIndex()); blockData.setBlockCommitSequenceId(index);
final ContainerProtos.PutBlockRequestProto putBlockRequestProto = final ContainerProtos.PutBlockRequestProto putBlockRequestProto =
ContainerProtos.PutBlockRequestProto ContainerProtos.PutBlockRequestProto
.newBuilder(requestProto.getPutBlock()) .newBuilder(requestProto.getPutBlock())
@ -440,6 +500,14 @@ public class ContainerStateMachine extends BaseStateMachine {
future.thenApply( future.thenApply(
r -> createContainerFutureMap.remove(containerID).complete(null)); r -> createContainerFutureMap.remove(containerID).complete(null));
} }
future.thenAccept(m -> {
final Long previous =
containerCommandCompletionMap
.put(index, trx.getLogEntry().getTerm());
Preconditions.checkState(previous == null);
updateLastApplied();
});
return future; return future;
} catch (IOException e) { } catch (IOException e) {
metrics.incNumApplyTransactionsFails(); metrics.incNumApplyTransactionsFails();
@ -466,7 +534,8 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override @Override
public void close() throws IOException { public void close() throws IOException {
for (int i = 0; i < numExecutors; i++){ takeSnapshot();
for (int i = 0; i < numExecutors; i++) {
executors[i].shutdown(); executors[i].shutdown();
} }
} }

View File

@ -130,6 +130,10 @@ public final class XceiverServerRatis implements XceiverServerSpi {
.build(); .build();
} }
@VisibleForTesting
public ContainerStateMachine getStateMachine() {
return stateMachine;
}
private RaftProperties newRaftProperties(Configuration conf) { private RaftProperties newRaftProperties(Configuration conf) {
final RaftProperties properties = new RaftProperties(); final RaftProperties properties = new RaftProperties();
@ -254,6 +258,15 @@ public final class XceiverServerRatis implements XceiverServerSpi {
} else if (rpc == SupportedRpcType.NETTY) { } else if (rpc == SupportedRpcType.NETTY) {
NettyConfigKeys.Server.setPort(properties, port); NettyConfigKeys.Server.setPort(properties, port);
} }
long snapshotThreshold =
conf.getLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY,
OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_DEFAULT);
RaftServerConfigKeys.Snapshot.
setAutoTriggerEnabled(properties, true);
RaftServerConfigKeys.Snapshot.
setAutoTriggerThreshold(properties, snapshotThreshold);
return properties; return properties;
} }
@ -298,7 +311,6 @@ public final class XceiverServerRatis implements XceiverServerSpi {
public void stop() { public void stop() {
try { try {
chunkExecutor.shutdown(); chunkExecutor.shutdown();
stateMachine.close();
server.close(); server.close();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@ -230,6 +230,11 @@ public class OzoneContainer {
return this.hddsDispatcher; return this.hddsDispatcher;
} }
@VisibleForTesting
public XceiverServerSpi getServer(ReplicationType replicationType) {
return servers.get(replicationType);
}
public VolumeSet getVolumeSet() { public VolumeSet getVolumeSet() {
return volumeSet; return volumeSet;
} }

View File

@ -0,0 +1,122 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.freon;
import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.container.common.transport.server.XceiverServerSpi;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.ContainerStateMachine;
import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
import org.apache.ratis.statemachine.impl.SingleFileSnapshotInfo;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
* Tests Freon with Datanode restarts.
*/
public class TestFreonWithDatanodeRestart {
private static MiniOzoneCluster cluster;
private static OzoneConfiguration conf;
/**
* Create a MiniDFSCluster for testing.
* <p>
* Ozone is made active by setting OZONE_ENABLED = true
*
*/
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
cluster = MiniOzoneCluster.newBuilder(conf)
.setHbProcessorInterval(1000)
.setHbInterval(1000)
.setNumDatanodes(3)
.build();
cluster.waitForClusterToBeReady();
}
/**
* Shutdown MiniDFSCluster.
*/
@AfterClass
public static void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
}
@Test
public void testRestart() throws Exception {
RandomKeyGenerator randomKeyGenerator =
new RandomKeyGenerator((OzoneConfiguration) cluster.getConf());
randomKeyGenerator.setNumOfVolumes(1);
randomKeyGenerator.setNumOfBuckets(1);
randomKeyGenerator.setNumOfKeys(1);
randomKeyGenerator.setType(ReplicationType.RATIS);
randomKeyGenerator.setFactor(ReplicationFactor.THREE);
randomKeyGenerator.setKeySize(20971520);
randomKeyGenerator.setValidateWrites(true);
randomKeyGenerator.call();
Assert.assertEquals(1, randomKeyGenerator.getNumberOfVolumesCreated());
Assert.assertEquals(1, randomKeyGenerator.getNumberOfBucketsCreated());
Assert.assertEquals(1, randomKeyGenerator.getNumberOfKeysAdded());
Assert.assertEquals(0, randomKeyGenerator.getUnsuccessfulValidationCount());
ContainerStateMachine sm = getStateMachine();
TermIndex termIndexBeforeRestart = sm.getLastAppliedTermIndex();
cluster.restartHddsDatanode(0);
sm = getStateMachine();
SimpleStateMachineStorage storage =
(SimpleStateMachineStorage)sm.getStateMachineStorage();
SingleFileSnapshotInfo snapshotInfo = storage.getLatestSnapshot();
TermIndex termInSnapshot = snapshotInfo.getTermIndex();
String expectedSnapFile =
storage.getSnapshotFile(termIndexBeforeRestart.getTerm(),
termIndexBeforeRestart.getIndex()).getAbsolutePath();
Assert.assertEquals(snapshotInfo.getFile().getPath().toString(),
expectedSnapFile);
Assert.assertEquals(termInSnapshot, termIndexBeforeRestart);
// After restart the term index might have progressed to apply pending
// transactions.
TermIndex termIndexAfterRestart = sm.getLastAppliedTermIndex();
Assert.assertTrue(termIndexAfterRestart.getIndex() >=
termIndexBeforeRestart.getIndex());
}
private ContainerStateMachine getStateMachine() {
XceiverServerSpi server =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine().
getContainer().getServer(HddsProtos.ReplicationType.RATIS);
return ((XceiverServerRatis)server).getStateMachine();
}
}