diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 8d651d855c0..aad4a00bdeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -283,6 +283,7 @@ public class DFSStripedOutputStream extends DFSOutputStream private ExecutorService flushAllExecutor; private CompletionService flushAllExecutorCompletionService; private int blockGroupIndex; + private long datanodeRestartTimeout; /** Construct a new output stream for creating a file. */ DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, @@ -322,6 +323,7 @@ public class DFSStripedOutputStream extends DFSOutputStream streamers.add(streamer); } currentPackets = new DFSPacket[streamers.size()]; + datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout(); setCurrentStreamer(0); } @@ -637,6 +639,11 @@ public class DFSStripedOutputStream extends DFSOutputStream "streamer: " + streamer); streamer.setExternalError(); healthySet.add(streamer); + } else if (!streamer.streamerClosed() + && streamer.getErrorState().hasDatanodeError() + && streamer.getErrorState().doWaitForRestart()) { + healthySet.add(streamer); + failedStreamers.remove(streamer); } } return healthySet; @@ -701,6 +708,14 @@ public class DFSStripedOutputStream extends DFSOutputStream for (int i = 0; i < numAllBlocks; i++) { coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0); } + //wait for get notify to failed stream + if (newFailed.size() != 0) { + try { + Thread.sleep(datanodeRestartTimeout); + } catch (InterruptedException e) { + // Do nothing + } + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java index d920f18e247..09dd8cb48ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java @@ -143,7 +143,8 @@ public class StripedDataStreamer extends DataStreamer { // set up the pipeline again with the remaining nodes. when a striped // data streamer comes here, it must be in external error state. - assert getErrorState().hasExternalError(); + assert getErrorState().hasExternalError() + || getErrorState().doWaitForRestart(); success = createBlockOutputStream(nodes, nodeStorageTypes, nodeStorageIDs, newGS, true);