HDFS-15210. EC : File write hanged when DN is shutdown by admin command. Contributed by Surendra Singh Lilhore.
This commit is contained in:
parent
816042e62b
commit
db6252b6c3
|
@ -283,6 +283,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|||
private ExecutorService flushAllExecutor;
|
||||
private CompletionService<Void> flushAllExecutorCompletionService;
|
||||
private int blockGroupIndex;
|
||||
private long datanodeRestartTimeout;
|
||||
|
||||
/** Construct a new output stream for creating a file. */
|
||||
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
||||
|
@ -322,6 +323,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|||
streamers.add(streamer);
|
||||
}
|
||||
currentPackets = new DFSPacket[streamers.size()];
|
||||
datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout();
|
||||
setCurrentStreamer(0);
|
||||
}
|
||||
|
||||
|
@ -637,6 +639,11 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|||
"streamer: " + streamer);
|
||||
streamer.setExternalError();
|
||||
healthySet.add(streamer);
|
||||
} else if (!streamer.streamerClosed()
|
||||
&& streamer.getErrorState().hasDatanodeError()
|
||||
&& streamer.getErrorState().doWaitForRestart()) {
|
||||
healthySet.add(streamer);
|
||||
failedStreamers.remove(streamer);
|
||||
}
|
||||
}
|
||||
return healthySet;
|
||||
|
@ -701,6 +708,14 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
|||
for (int i = 0; i < numAllBlocks; i++) {
|
||||
coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
|
||||
}
|
||||
//wait for get notify to failed stream
|
||||
if (newFailed.size() != 0) {
|
||||
try {
|
||||
Thread.sleep(datanodeRestartTimeout);
|
||||
} catch (InterruptedException e) {
|
||||
// Do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -143,7 +143,8 @@ public class StripedDataStreamer extends DataStreamer {
|
|||
|
||||
// set up the pipeline again with the remaining nodes. when a striped
|
||||
// data streamer comes here, it must be in external error state.
|
||||
assert getErrorState().hasExternalError();
|
||||
assert getErrorState().hasExternalError()
|
||||
|| getErrorState().doWaitForRestart();
|
||||
success = createBlockOutputStream(nodes, nodeStorageTypes,
|
||||
nodeStorageIDs, newGS, true);
|
||||
|
||||
|
|
Loading…
Reference in New Issue