HDFS-11856. Ability to re-add Upgrading nodes to pipeline for future pipeline updates. Contributed by Vinayakumar B.

This commit is contained in:
Kihwal Lee 2017-05-31 12:42:37 -05:00
parent c112bf6838
commit 212a566088
11 changed files with 244 additions and 66 deletions

View File

@ -57,4 +57,8 @@ public class DFSClientFaultInjector {
public void fetchFromDatanodeException() {}
public void readFromDatanodeDelay() {}
public boolean skipRollingRestartWait() {
return false;
}
}

View File

@ -328,6 +328,7 @@ class DataStreamer extends Daemon {
static class ErrorState {
private boolean error = false;
private int badNodeIndex = -1;
private boolean waitForRestart = true;
private int restartingNodeIndex = -1;
private long restartingNodeDeadline = 0;
private final long datanodeRestartTimeout;
@ -341,6 +342,7 @@ class DataStreamer extends Daemon {
badNodeIndex = -1;
restartingNodeIndex = -1;
restartingNodeDeadline = 0;
waitForRestart = true;
}
synchronized boolean hasError() {
@ -367,14 +369,19 @@ class DataStreamer extends Daemon {
return restartingNodeIndex;
}
synchronized void initRestartingNode(int i, String message) {
synchronized void initRestartingNode(int i, String message,
boolean shouldWait) {
restartingNodeIndex = i;
restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
// If the data streamer has already set the primary node
// bad, clear it. It is likely that the write failed due to
// the DN shutdown. Even if it was a real failure, the pipeline
// recovery will take care of it.
badNodeIndex = -1;
if (shouldWait) {
restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout;
// If the data streamer has already set the primary node
// bad, clear it. It is likely that the write failed due to
// the DN shutdown. Even if it was a real failure, the pipeline
// recovery will take care of it.
badNodeIndex = -1;
} else {
this.waitForRestart = false;
}
LOG.info(message);
}
@ -383,7 +390,7 @@ class DataStreamer extends Daemon {
}
synchronized boolean isNodeMarked() {
return badNodeIndex >= 0 || isRestartingNode();
return badNodeIndex >= 0 || (isRestartingNode() && doWaitForRestart());
}
/**
@ -408,7 +415,7 @@ class DataStreamer extends Daemon {
} else if (badNodeIndex < restartingNodeIndex) {
// the node index has shifted.
restartingNodeIndex--;
} else {
} else if (waitForRestart) {
throw new IllegalStateException("badNodeIndex = " + badNodeIndex
+ " = restartingNodeIndex = " + restartingNodeIndex);
}
@ -450,6 +457,10 @@ class DataStreamer extends Daemon {
}
}
}
boolean doWaitForRestart() {
return waitForRestart;
}
}
private volatile boolean streamerClosed = false;
@ -469,6 +480,8 @@ class DataStreamer extends Daemon {
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<>();
/** Restarting Nodes */
private List<DatanodeInfo> restartingNodes = new ArrayList<>();
/** The times have retried to recover pipeline, for the same packet. */
private volatile int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
@ -1020,6 +1033,13 @@ class DataStreamer extends Daemon {
return true;
}
/*
* Treat all nodes as remote for test when skip enabled.
*/
if (DFSClientFaultInjector.get().skipRollingRestartWait()) {
return false;
}
// Is it a local node?
InetAddress addr = null;
try {
@ -1087,11 +1107,11 @@ class DataStreamer extends Daemon {
}
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&
shouldWaitForRestart(i)) {
if (PipelineAck.isRestartOOBStatus(reply)) {
final String message = "Datanode " + i + " is restarting: "
+ targets[i];
errorState.initRestartingNode(i, message);
errorState.initRestartingNode(i, message,
shouldWaitForRestart(i));
throw new IOException(message);
}
// node error
@ -1452,6 +1472,14 @@ class DataStreamer extends Daemon {
*/
private boolean handleRestartingDatanode() {
if (errorState.isRestartingNode()) {
if (!errorState.doWaitForRestart()) {
// If node is restarting and not worth to wait for restart then can go
// ahead with error recovery considering it as bad node for now. Later
// it should be able to re-consider the same node for future pipeline
// updates.
errorState.setBadNodeIndex(errorState.getRestartingNodeIndex());
return true;
}
// 4 seconds or the configured deadline period, whichever is shorter.
// This is the retry interval and recovery will be retried in this
// interval until timeout or success.
@ -1483,9 +1511,14 @@ class DataStreamer extends Daemon {
return false;
}
String reason = "bad.";
if (errorState.getRestartingNodeIndex() == badNodeIndex) {
reason = "restarting.";
restartingNodes.add(nodes[badNodeIndex]);
}
LOG.warn("Error Recovery for " + block + " in pipeline "
+ Arrays.toString(nodes) + ": datanode " + badNodeIndex
+ "("+ nodes[badNodeIndex] + ") is bad.");
+ "("+ nodes[badNodeIndex] + ") is " + reason);
failed.add(nodes[badNodeIndex]);
DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
@ -1691,6 +1724,9 @@ class DataStreamer extends Daemon {
blockStream = out;
result = true; // success
errorState.reset();
// remove all restarting nodes from failed nodes list
failed.removeAll(restartingNodes);
restartingNodes.clear();
} catch (IOException ie) {
if (!errorState.isRestartingNode()) {
LOG.info("Exception in createBlockOutputStream", ie);
@ -1724,9 +1760,10 @@ class DataStreamer extends Daemon {
final int i = errorState.getBadNodeIndex();
// Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(i)) {
errorState.initRestartingNode(i, "Datanode " + i + " is restarting: "
+ nodes[i]);
if (checkRestart) {
errorState.initRestartingNode(i,
"Datanode " + i + " is restarting: " + nodes[i],
shouldWaitForRestart(i));
}
errorState.setError(true);
lastException.set(ie);

View File

@ -206,7 +206,7 @@ class BlockReceiver implements Closeable {
// Open local disk out
//
if (isDatanode) { //replication or move
replicaHandler = datanode.data.createTemporary(storageType, block);
replicaHandler = datanode.data.createTemporary(storageType, block, false);
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
@ -235,7 +235,7 @@ class BlockReceiver implements Closeable {
case TRANSFER_FINALIZED:
// this is a transfer destination
replicaHandler =
datanode.data.createTemporary(storageType, block);
datanode.data.createTemporary(storageType, block, isTransfer);
break;
default: throw new IOException("Unsupported stage " + stage +
" while receiving block " + block + " from " + inAddr);

View File

@ -314,13 +314,14 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
/**
* Creates a temporary replica and returns the meta information of the replica
* .
*
* @param b block
* @param isTransfer whether for transfer
*
* @return the meta info of the replica which is being written to
* @throws IOException if an error occurs
*/
ReplicaHandler createTemporary(StorageType storageType,
ExtendedBlock b) throws IOException;
ExtendedBlock b, boolean isTransfer) throws IOException;
/**
* Creates a RBW replica and returns the meta info of the replica

View File

@ -220,7 +220,19 @@ class FsDatasetAsyncDiskService {
volumeRef, blockFile, metaFile, block, trashDirectory);
execute(((FsVolumeImpl) volumeRef.getVolume()).getCurrentDir(), deletionTask);
}
/**
* Delete the block file and meta file from the disk synchronously, adjust
* dfsUsed statistics accordingly.
*/
void deleteSync(FsVolumeReference volumeRef, File blockFile, File metaFile,
ExtendedBlock block, String trashDirectory) {
LOG.info("Deleting " + block.getLocalBlock() + " file " + blockFile);
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
volumeRef, blockFile, metaFile, block, trashDirectory);
deletionTask.run();
}
/** A task for deleting a block file and its associated meta file, as well
* as decrement the dfs usage of the volume.
* Optionally accepts a trash directory. If one is specified then the files

View File

@ -1656,38 +1656,28 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public ReplicaHandler createTemporary(
StorageType storageType, ExtendedBlock b) throws IOException {
public ReplicaHandler createTemporary(StorageType storageType,
ExtendedBlock b, boolean isTransfer) throws IOException {
long startTimeMs = Time.monotonicNow();
long writerStopTimeoutMs = datanode.getDnConf().getXceiverStopTimeout();
ReplicaInfo lastFoundReplicaInfo = null;
boolean isInPipeline = false;
do {
try(AutoCloseableLock lock = datasetLock.acquire()) {
ReplicaInfo currentReplicaInfo =
volumeMap.get(b.getBlockPoolId(), b.getBlockId());
if (currentReplicaInfo == lastFoundReplicaInfo) {
if (lastFoundReplicaInfo != null) {
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo });
}
FsVolumeReference ref =
volumes.getNextVolume(storageType, b.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create a temporary file to hold block in the designated volume
File f;
try {
f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
ReplicaInPipeline newReplicaInfo =
new ReplicaInPipeline(b.getBlockId(), b.getGenerationStamp(), v,
f.getParentFile(), b.getLocalBlock().getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return new ReplicaHandler(newReplicaInfo, ref);
break;
} else {
if (!(currentReplicaInfo.getGenerationStamp() < b
.getGenerationStamp() && currentReplicaInfo instanceof ReplicaInPipeline)) {
isInPipeline = currentReplicaInfo.getState() == ReplicaState.TEMPORARY
|| currentReplicaInfo.getState() == ReplicaState.RBW;
/*
* If the current block is old, reject.
* else If transfer request, then accept it.
* else if state is not RBW/Temporary, then reject
*/
if ((currentReplicaInfo.getGenerationStamp() >= b.getGenerationStamp())
|| (!isTransfer && !isInPipeline)) {
throw new ReplicaAlreadyExistsException("Block " + b
+ " already exists in state " + currentReplicaInfo.getState()
+ " and thus cannot be created.");
@ -1695,7 +1685,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
lastFoundReplicaInfo = currentReplicaInfo;
}
}
if (!isInPipeline) {
continue;
}
// Hang too long, just bail out. This is not supposed to happen.
long writerStopMs = Time.monotonicNow() - startTimeMs;
if (writerStopMs > writerStopTimeoutMs) {
@ -1709,6 +1701,32 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
((ReplicaInPipeline) lastFoundReplicaInfo)
.stopWriter(writerStopTimeoutMs);
} while (true);
if (lastFoundReplicaInfo != null) {
// Old blockfile should be deleted synchronously as it might collide
// with the new block if allocated in same volume.
// Do the deletion outside of lock as its DISK IO.
invalidate(b.getBlockPoolId(), new Block[] { lastFoundReplicaInfo },
false);
}
try (AutoCloseableLock lock = datasetLock.acquire()) {
FsVolumeReference ref = volumes.getNextVolume(storageType, b
.getNumBytes());
FsVolumeImpl v = (FsVolumeImpl) ref.getVolume();
// create a temporary file to hold block in the designated volume
File f;
try {
f = v.createTmpFile(b.getBlockPoolId(), b.getLocalBlock());
} catch (IOException e) {
IOUtils.cleanup(null, ref);
throw e;
}
ReplicaInPipeline newReplicaInfo = new ReplicaInPipeline(b.getBlockId(), b
.getGenerationStamp(), v, f.getParentFile(), b.getLocalBlock()
.getNumBytes());
volumeMap.add(b.getBlockPoolId(), newReplicaInfo);
return new ReplicaHandler(newReplicaInfo, ref);
}
}
/**
@ -2060,6 +2078,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
*/
@Override // FsDatasetSpi
public void invalidate(String bpid, Block invalidBlks[]) throws IOException {
invalidate(bpid, invalidBlks, true);
}
private void invalidate(String bpid, Block[] invalidBlks, boolean async)
throws IOException {
final List<String> errors = new ArrayList<String>();
for (int i = 0; i < invalidBlks.length; i++) {
final File f;
@ -2125,14 +2148,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// If the block is cached, start uncaching it.
cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
// Delete the block asynchronously to make sure we can do it fast enough.
// It's ok to unlink the block file before the uncache operation
// finishes.
try {
asyncDiskService.deleteAsync(v.obtainReference(), f,
FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForBlockFile(bpid, f));
// Delete the block asynchronously to make sure we can do it fast
// enough.
// It's ok to unlink the block file before the uncache operation
// finishes.
if (async) {
asyncDiskService.deleteAsync(v.obtainReference(), f,
FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForBlockFile(bpid, f));
} else {
asyncDiskService.deleteSync(v.obtainReference(), f,
FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
new ExtendedBlock(bpid, invalidBlks[i]),
dataStorage.getTrashDirectoryForBlockFile(bpid, f));
}
} catch (ClosedChannelException e) {
LOG.warn("Volume " + v + " is closed, ignore the deletion task for " +
"block " + invalidBlks[i]);

View File

@ -17,9 +17,11 @@
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@ -33,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@ -437,6 +440,95 @@ public class TestClientProtocolForPipelineRecovery {
}
}
@Test
public void testPipelineRecoveryOnRemoteDatanodeUpgrade() throws Exception {
Configuration conf = new HdfsConfiguration();
conf.setBoolean(BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, true);
MiniDFSCluster cluster = null;
DFSClientFaultInjector old = DFSClientFaultInjector.get();
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
FileSystem fileSys = cluster.getFileSystem();
Path file = new Path("/testPipelineRecoveryOnDatanodeUpgrade");
DFSTestUtil.createFile(fileSys, file, 10240L, (short) 3, 0L);
// treat all restarting nodes as remote for test.
DFSClientFaultInjector.set(new DFSClientFaultInjector() {
public boolean skipRollingRestartWait() {
return true;
}
});
final DFSOutputStream out = (DFSOutputStream) fileSys.append(file)
.getWrappedStream();
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicBoolean failed = new AtomicBoolean(false);
Thread t = new Thread() {
public void run() {
while (running.get()) {
try {
out.write("test".getBytes());
out.hflush();
// Keep writing data every one second
Thread.sleep(1000);
} catch (IOException | InterruptedException e) {
LOG.error("Exception during write", e);
failed.set(true);
break;
}
}
running.set(false);
}
};
t.start();
// Let write start
Thread.sleep(1000);
DatanodeInfo[] pipeline = out.getPipeline();
for (DatanodeInfo node : pipeline) {
assertFalse("Write should be going on", failed.get());
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
int indexToShutdown = 0;
for (int i = 0; i < dataNodes.size(); i++) {
if (dataNodes.get(i).getIpcPort() == node.getIpcPort()) {
indexToShutdown = i;
break;
}
}
// Note old genstamp to findout pipeline recovery
final long oldGs = out.getBlock().getGenerationStamp();
MiniDFSCluster.DataNodeProperties dnProps = cluster
.stopDataNodeForUpgrade(indexToShutdown);
GenericTestUtils.waitForThreadTermination(
"Async datanode shutdown thread", 100, 10000);
cluster.restartDataNode(dnProps, true);
cluster.waitActive();
// wait pipeline to be recovered
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return out.getBlock().getGenerationStamp() > oldGs;
}
}, 100, 10000);
Assert.assertEquals("The pipeline recovery count shouldn't increase", 0,
out.getStreamer().getPipelineRecoveryCount());
}
assertFalse("Write should be going on", failed.get());
running.set(false);
t.join();
out.write("testagain".getBytes());
assertTrue("There should be atleast 2 nodes in pipeline still", out
.getPipeline().length >= 2);
out.close();
} finally {
DFSClientFaultInjector.set(old);
if (cluster != null) {
cluster.shutdown();
}
}
}
/**
* Test to make sure the checksum is set correctly after pipeline
* recovery transfers 0 byte partial block. If fails the test case

View File

@ -991,12 +991,12 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public synchronized ReplicaHandler createRbw(
StorageType storageType, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
return createTemporary(storageType, b);
return createTemporary(storageType, b, false);
}
@Override // FsDatasetSpi
public synchronized ReplicaHandler createTemporary(
StorageType storageType, ExtendedBlock b) throws IOException {
public synchronized ReplicaHandler createTemporary(StorageType storageType,
ExtendedBlock b, boolean isTransfer) throws IOException {
if (isValidBlock(b)) {
throw new ReplicaAlreadyExistsException("Block " + b +
" is valid, and cannot be written to.");

View File

@ -368,7 +368,7 @@ public class TestSimulatedFSDataset {
ExtendedBlock block = new ExtendedBlock(newbpid,1);
try {
// it will throw an exception if the block pool is not found
fsdataset.createTemporary(StorageType.DEFAULT, block);
fsdataset.createTemporary(StorageType.DEFAULT, block, false);
} catch (IOException ioe) {
// JUnit does not capture exception in non-main thread,
// so cache it and then let main thread throw later.

View File

@ -137,8 +137,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
}
@Override
public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b)
throws IOException {
public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b,
boolean isTransfer) throws IOException {
return new ReplicaHandler(new ExternalReplicaInPipeline(), null);
}

View File

@ -435,44 +435,44 @@ public class TestWriteToReplica {
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED], false);
Assert.fail("Should not have created a temporary replica that was " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as" +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
}
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
}
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
try {
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]);
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
Assert.fail("Should not have created a replica that had already been "
+ "created " + blocks[NON_EXISTENT]);
} catch (Exception e) {
@ -485,7 +485,8 @@ public class TestWriteToReplica {
blocks[NON_EXISTENT].setGenerationStamp(newGenStamp);
try {
ReplicaInPipelineInterface replicaInfo =
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica();
dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT],
false).getReplica();
Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp);
Assert.assertTrue(
replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId());