HDFS-15210. EC : File write hanged when DN is shutdown by admin command. Contributed by Surendra Singh Lilhore.

(cherry picked from commit db6252b6c3)
This commit is contained in:
Surendra Singh Lilhore 2020-04-29 10:58:35 +05:30 committed by Wei-Chiu Chuang
parent be6e99963d
commit a6f86af39f
2 changed files with 17 additions and 1 deletions

View File

@ -283,6 +283,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
private ExecutorService flushAllExecutor; private ExecutorService flushAllExecutor;
private CompletionService<Void> flushAllExecutorCompletionService; private CompletionService<Void> flushAllExecutorCompletionService;
private int blockGroupIndex; private int blockGroupIndex;
private long datanodeRestartTimeout;
/** Construct a new output stream for creating a file. */ /** Construct a new output stream for creating a file. */
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat, DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
@ -322,6 +323,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
streamers.add(streamer); streamers.add(streamer);
} }
currentPackets = new DFSPacket[streamers.size()]; currentPackets = new DFSPacket[streamers.size()];
datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout();
setCurrentStreamer(0); setCurrentStreamer(0);
} }
@ -643,6 +645,11 @@ public class DFSStripedOutputStream extends DFSOutputStream
"streamer: " + streamer); "streamer: " + streamer);
streamer.setExternalError(); streamer.setExternalError();
healthySet.add(streamer); healthySet.add(streamer);
} else if (!streamer.streamerClosed()
&& streamer.getErrorState().hasDatanodeError()
&& streamer.getErrorState().doWaitForRestart()) {
healthySet.add(streamer);
failedStreamers.remove(streamer);
} }
} }
return healthySet; return healthySet;
@ -707,6 +714,14 @@ public class DFSStripedOutputStream extends DFSOutputStream
for (int i = 0; i < numAllBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0); coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
} }
//wait for get notify to failed stream
if (newFailed.size() != 0) {
try {
Thread.sleep(datanodeRestartTimeout);
} catch (InterruptedException e) {
// Do nothing
}
}
} }
} }

View File

@ -143,7 +143,8 @@ public class StripedDataStreamer extends DataStreamer {
// set up the pipeline again with the remaining nodes. when a striped // set up the pipeline again with the remaining nodes. when a striped
// data streamer comes here, it must be in external error state. // data streamer comes here, it must be in external error state.
assert getErrorState().hasExternalError(); assert getErrorState().hasExternalError()
|| getErrorState().doWaitForRestart();
success = createBlockOutputStream(nodes, nodeStorageTypes, success = createBlockOutputStream(nodes, nodeStorageTypes,
nodeStorageIDs, newGS, true); nodeStorageIDs, newGS, true);