HDFS-11856. Ability to re-add Upgrading Nodes to pipeline for future pipeline updates. Contributed by Vinayakumar B.

This commit is contained in:
Kihwal Lee 2017-05-25 13:04:09 -05:00 committed by Xiaoyu Yao
parent 48f83077ad
commit 46bad64684
11 changed files with 241 additions and 63 deletions

View File

@ -57,4 +57,8 @@ public class DFSClientFaultInjector {
public void fetchFromDatanodeException() {}
public void readFromDatanodeDelay() {}
public boolean skipRollingRestartWait() {
return false;
}
}

View File

@ -327,6 +327,7 @@ class DataStreamer extends Daemon {
static class ErrorState {
ErrorType error = ErrorType.NONE;
private int badNodeIndex = -1;
private boolean waitForRestart = true;
private int restartingNodeIndex = -1;
private long restartingNodeDeadline = 0;
private final long datanodeRestartTimeout;
@ -342,6 +343,7 @@ class DataStreamer extends Daemon {
badNodeIndex = -1;
restartingNodeIndex = -1;
restartingNodeDeadline = 0;
waitForRestart = true;
}
synchronized void reset() {
@ -349,6 +351,7 @@ class DataStreamer extends Daemon {
badNodeIndex = -1;
restartingNodeIndex = -1;
restartingNodeDeadline = 0;
waitForRestart = true;
}
synchronized boolean hasInternalError() {
@ -389,14 +392,19 @@ class DataStreamer extends Daemon {
return restartingNodeIndex;
}
synchronized void initRestartingNode(int i, String message) {
synchronized void initRestartingNode(int i, String message,
boolean shouldWait) {
restartingNodeIndex = i;
restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
// If the data streamer has already set the primary node
// bad, clear it. It is likely that the write failed due to
// the DN shutdown. Even if it was a real failure, the pipeline
// recovery will take care of it.
badNodeIndex = -1;
if (shouldWait) {
restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
// If the data streamer has already set the primary node
// bad, clear it. It is likely that the write failed due to
// the DN shutdown. Even if it was a real failure, the pipeline
// recovery will take care of it.
badNodeIndex = -1;
} else {
this.waitForRestart = false;
}
LOG.info(message);
}
@ -405,7 +413,7 @@ class DataStreamer extends Daemon {
}
synchronized boolean isNodeMarked() {
return badNodeIndex >= 0 || isRestartingNode();
return badNodeIndex >= 0 || (isRestartingNode() && doWaitForRestart());
}
/**
@ -430,7 +438,7 @@ class DataStreamer extends Daemon {
} else if (badNodeIndex < restartingNodeIndex) {
// the node index has shifted.
restartingNodeIndex--;
} else {
} else if (waitForRestart) {
throw new IllegalStateException("badNodeIndex = " + badNodeIndex
+ " = restartingNodeIndex = " + restartingNodeIndex);
}
@ -472,6 +480,10 @@ class DataStreamer extends Daemon {
}
}
}
boolean doWaitForRestart() {
return waitForRestart;
}
}
private volatile boolean streamerClosed = false;
@ -491,6 +503,8 @@ class DataStreamer extends Daemon {
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<>();
/** Restarting Nodes */
private List<DatanodeInfo> restartingNodes = new ArrayList<>();
/** The times have retried to recover pipeline, for the same packet. */
private volatile int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
@ -1043,6 +1057,13 @@ class DataStreamer extends Daemon {
return true;
}
/*
* Treat all nodes as remote for test when skip enabled.
*/
if (DFSClientFaultInjector.get().skipRollingRestartWait()) {
return false;
}
// Is it a local node?
InetAddress addr = null;
try {
@ -1110,11 +1131,11 @@ class DataStreamer extends Daemon {
}
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&
shouldWaitForRestart(i)) {
if (PipelineAck.isRestartOOBStatus(reply)) {
final String message = "Datanode " + i + " is restarting: "
+ targets[i];
errorState.initRestartingNode(i, message);
errorState.initRestartingNode(i, message,
shouldWaitForRestart(i));
throw new IOException(message);
}
// node error
@ -1492,6 +1513,14 @@ class DataStreamer extends Daemon {
*/
boolean handleRestartingDatanode() {
if (errorState.isRestartingNode()) {
if (!errorState.doWaitForRestart()) {
// If node is restarting and not worth to wait for restart then can go
// ahead with error recovery considering it as bad node for now. Later
// it should be able to re-consider the same node for future pipeline
// updates.
errorState.setBadNodeIndex(errorState.getRestartingNodeIndex());
return true;
}
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
@ -1523,9 +1552,14 @@ class DataStreamer extends Daemon {
return false;
}
String reason = "bad.";
if (errorState.getRestartingNodeIndex() == badNodeIndex) {
reason = "restarting.";
restartingNodes.add(nodes[badNodeIndex]);
}
LOG.warn("Error Recovery for " + block + " in pipeline "
+ Arrays.toString(nodes) + ": datanode " + badNodeIndex
+ "("+ nodes[badNodeIndex] + ") is bad.");
+ "("+ nodes[badNodeIndex] + ") is " + reason);
failed.add(nodes[badNodeIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
@ -1735,6 +1769,9 @@ class DataStreamer extends Daemon {
blockStream = out;
result = true; // success
errorState.resetInternalError();
// remove all restarting nodes from failed nodes list
failed.removeAll(restartingNodes);
restartingNodes.clear();
} catch (IOException ie) {
if (!errorState.isRestartingNode()) {
LOG.info("Exception in createBlockOutputStream " + this, ie);
@ -1768,9 +1805,10 @@ class DataStreamer extends Daemon {
final int i = errorState.getBadNodeIndex();
// Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(i)) {
errorState.initRestartingNode(i, "Datanode " + i + " is restarting: "
+ nodes[i]);
if (checkRestart) {
errorState.initRestartingNode(i,
"Datanode " + i + " is restarting: " + nodes[i],
shouldWaitForRestart(i));
}
errorState.setInternalError();
lastException.set(ie);

View File

@ -207,7 +207,7 @@ class BlockReceiver implements Closeable {
//
if (isDatanode) { //replication or move
replicaHandler =
datanode.data.createTemporary(storageType, storageId, block);
datanode.data.createTemporary(storageType, storageId, block, false);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
@ -236,8 +236,8 @@ class BlockReceiver implements Closeable {
case TRANSFER_RBW:
case TRANSFER_FINALIZED:
// this is a transfer destination
replicaHandler =
datanode.data.createTemporary(storageType, storageId, block);
replicaHandler = datanode.data.createTemporary(storageType, storageId,
block, isTransfer);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);

View File

@ -319,7 +319,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @throws IOException if an error occurs
*/
ReplicaHandler createTemporary(StorageType storageType, String storageId,
ExtendedBlock b) throws IOException;
ExtendedBlock b, boolean isTransfer) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica

View File

@ -227,7 +227,19 @@ class FsDatasetAsyncDiskService {
volumeRef, replicaToDelete, block, trashDirectory);
execute(((FsVolumeImpl) volumeRef.getVolume()), deletionTask);
}
/**
* Delete the block file and meta file from the disk synchronously, adjust
* dfsUsed statistics accordingly.
*/
void deleteSync(FsVolumeReference volumeRef, ReplicaInfo replicaToDelete,
ExtendedBlock block, String trashDirectory) {
LOG.info("Deleting " + block.getLocalBlock() + " replica " + replicaToDelete);
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(volumeRef,
replicaToDelete, block, trashDirectory);
deletionTask.run();
}
/** A task for deleting a block file and its associated meta file, as well
* as decrement the dfs usage of the volume.
* Optionally accepts a trash directory. If one is specified then the files

View File

@ -1504,37 +1504,29 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public ReplicaHandler createTemporary(
StorageType storageType, String storageId, ExtendedBlock b)
public ReplicaHandler createTemporary(StorageType storageType,
String storageId, ExtendedBlock b, boolean isTransfer)
throws IOException {
long startTimeMs = Time.monotonicNow();
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null;
boolean isInPipeline = false;
do {
try (AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
if (lastFoundReplicaInfo != null) {
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
}
FsVolumeReference ref =
volumes.getNextVolume(storageType, storageId, b.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
try {
newReplicaInfo = v.createTemporary(b);
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
break;
} else {
if (!(currentReplicaInfo.getGenerationStamp() < b.getGenerationStamp()
&& (currentReplicaInfo.getState() == ReplicaState.TEMPORARY
|| currentReplicaInfo.getState() == ReplicaState.RBW))) {
isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY
|| currentReplicaInfo.getState() == ReplicaState.RBW;
/*
* If the current block is old, reject.
* else If transfer request, then accept it.
* else if state is not RBW/Temporary, then reject
*/
if ((currentReplicaInfo.getGenerationStamp() >= b.getGenerationStamp())
|| (!isTransfer && !isInPipeline)) {
throw new ReplicaAlreadyExistsException("Block " + b
+ " already exists in state " + currentReplicaInfo.getState()
+ " and thus cannot be created.");
@ -1542,7 +1534,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
lastFoundReplicaInfo = currentReplicaInfo;
}
}
if (!isInPipeline) {
continue;
}
// Hang too long, just bail out. This is not supposed to happen.
long writerStopMs = Time.monotonicNow() - startTimeMs;
if (writerStopMs > writerStopTimeoutMs) {
@ -1555,6 +1549,29 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Stop the previous writer
((ReplicaInPipeline)lastFoundReplicaInfo).stopWriter(writerStopTimeoutMs);
} while (true);
if (lastFoundReplicaInfo != null) {
// Old blockfile should be deleted synchronously as it might collide
// with the new block if allocated in same volume.
// Do the deletion outside of lock as its DISK IO.
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
false);
}
try (AutoCloseableLock lock = datasetLock.acquire()) {
FsVolumeReference ref = volumes.getNextVolume(storageType, storageId, b
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
ReplicaInPipeline newReplicaInfo;
try {
newReplicaInfo = v.createTemporary(b);
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo());
return new ReplicaHandler(newReplicaInfo, ref);
}
}
/**
@ -1877,6 +1894,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override // FsDatasetSpi
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
invalidate(bpid, invalidBlks, true);
}
private void invalidate(String bpid, Block[] invalidBlks, boolean async)
throws IOException {
final List<String> errors = new ArrayList<String>();
for (int i = 0; i < invalidBlks.length; i++) {
final ReplicaInfo removing;
@ -1947,13 +1969,20 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
// Delete the block asynchronously to make sure we can do it fast enough.
// It's ok to unlink the block file before the uncache operation
// finishes.
try {
asyncDiskService.deleteAsync(v.obtainReference(), removing,
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForReplica(bpid, removing));
if (async) {
// Delete the block asynchronously to make sure we can do it fast
// enough.
// It's ok to unlink the block file before the uncache operation
// finishes.
asyncDiskService.deleteAsync(v.obtainReference(), removing,
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForReplica(bpid, removing));
} else {
asyncDiskService.deleteSync(v.obtainReference(), removing,
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForReplica(bpid, removing));
}
} catch (ClosedChannelException e) {
LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
"block " + invalidBlks[i]);

View File

@ -17,9 +17,11 @@
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@ -33,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@ -437,6 +440,95 @@ public class TestClientProtocolForPipelineRecovery {
}
}
@Test
public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
MiniDFSCluster cluster = null;
DFSClientFaultInjector old = DFSClientFaultInjector.get();
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
DFSTestUtil.createFile(fileSys, file, 10240L, (short) 3, 0L);
// treat all restarting nodes as remote for test.
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
public boolean skipRollingRestartWait() {
return true;
}
});
final DFSOutputStream out = (DFSOutputStream) fileSys.append(file)
.getWrappedStream();
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean failed = new AtomicBoolean(false);
Thread t = new Thread() {
public void run() {
while (running.get()) {
try {
out.write("test".getBytes());
out.hflush();
// Keep writing data every one second
Thread.sleep(1000);
} catch (IOException | InterruptedException e) {
LOG.error("Exception during write", e);
failed.set(true);
break;
}
}
running.set(false);
}
};
t.start();
// Let write start
Thread.sleep(1000);
DatanodeInfo[] pipeline = out.getPipeline();
for (DatanodeInfo node : pipeline) {
assertFalse("Write should be going on", failed.get());
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
int indexToShutdown = 0;
for (int i = 0; i < dataNodes.size(); i++) {
if (dataNodes.get(i).getIpcPort() == node.getIpcPort()) {
indexToShutdown = i;
break;
}
}
// Note old genstamp to findout pipeline recovery
final long oldGs = out.getBlock().getGenerationStamp();
MiniDFSCluster.DataNodeProperties dnProps = cluster
.stopDataNodeForUpgrade(indexToShutdown);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
cluster.restartDataNode(dnProps, true);
cluster.waitActive();
// wait pipeline to be recovered
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return out.getBlock().getGenerationStamp() > oldGs;
}
}, 100, 10000);
Assert.assertEquals("The pipeline recovery count shouldn't increase", 0,
out.getStreamer().getPipelineRecoveryCount());
}
assertFalse("Write should be going on", failed.get());
running.set(false);
t.join();
out.write("testagain".getBytes());
assertTrue("There should be atleast 2 nodes in pipeline still", out
.getPipeline().length >= 2);
out.close();
} finally {
DFSClientFaultInjector.set(old);
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test to make sure the checksum is set correctly after pipeline
* recovery transfers 0 byte partial block. If fails the test case

View File

@ -1025,12 +1025,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public synchronized ReplicaHandler createRbw(
StorageType storageType, String storageId, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
return createTemporary(storageType, storageId, b);
return createTemporary(storageType, storageId, b, false);
}
@Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary(
StorageType storageType, String storageId, ExtendedBlock b)
public synchronized ReplicaHandler createTemporary(StorageType storageType,
String storageId, ExtendedBlock b, boolean isTransfer)
throws IOException {
if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b +

View File

@ -368,7 +368,7 @@ public class TestSimulatedFSDataset {
ExtendedBlock block = new ExtendedBlock(newbpid,1);
try {
// it will throw an exception if the block pool is not found
fsdataset.createTemporary(StorageType.DEFAULT, null, block);
fsdataset.createTemporary(StorageType.DEFAULT, null, block, false);
} catch (IOException ioe) {
// JUnit does not capture exception in non-main thread,
// so cache it and then let main thread throw later.

View File

@ -139,8 +139,7 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
@Override
public ReplicaHandler createTemporary(StorageType t, String i,
ExtendedBlock b)
throws IOException {
ExtendedBlock b, boolean isTransfer) throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
}

View File

@ -435,44 +435,48 @@ public class TestWriteToReplica {
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]);
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED],
false);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]);
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY],
false);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]);
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]);
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]);
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT],
false);
try {
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]);
dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT],
false);
Assert.fail("Should not have created a replica that had already been "
+ "created " + blocks[NON_EXISTENT]);
} catch (Exception e) {
@ -486,7 +490,7 @@ public class TestWriteToReplica {
try {
ReplicaInPipeline replicaInfo =
dataSet.createTemporary(StorageType.DEFAULT, null,
blocks[NON_EXISTENT]).getReplica();
blocks[NON_EXISTENT], false).getReplica();
Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
Assert.assertTrue(
replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());