HDFS-15210. EC : File write hanged when DN is shutdown by admin command. Contributed by Surendra Singh Lilhore.
(cherry picked from commit db6252b6c3
)
(cherry picked from commit 7856af2cc65377352db5ebc93a778f373f71d215)
(cherry picked from commit 5315de3d437f78d81dc5fa9f43849de6198220e6)
This commit is contained in:
parent
eb6c08e423
commit
379a7a5726
|
@ -282,6 +282,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
private ExecutorService flushAllExecutor;
|
private ExecutorService flushAllExecutor;
|
||||||
private CompletionService<Void> flushAllExecutorCompletionService;
|
private CompletionService<Void> flushAllExecutorCompletionService;
|
||||||
private int blockGroupIndex;
|
private int blockGroupIndex;
|
||||||
|
private long datanodeRestartTimeout;
|
||||||
|
|
||||||
/** Construct a new output stream for creating a file. */
|
/** Construct a new output stream for creating a file. */
|
||||||
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
DFSStripedOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
|
||||||
|
@ -321,6 +322,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
streamers.add(streamer);
|
streamers.add(streamer);
|
||||||
}
|
}
|
||||||
currentPackets = new DFSPacket[streamers.size()];
|
currentPackets = new DFSPacket[streamers.size()];
|
||||||
|
datanodeRestartTimeout = dfsClient.getConf().getDatanodeRestartTimeout();
|
||||||
setCurrentStreamer(0);
|
setCurrentStreamer(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -621,6 +623,11 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
"streamer: " + streamer);
|
"streamer: " + streamer);
|
||||||
streamer.setExternalError();
|
streamer.setExternalError();
|
||||||
healthySet.add(streamer);
|
healthySet.add(streamer);
|
||||||
|
} else if (!streamer.streamerClosed()
|
||||||
|
&& streamer.getErrorState().hasDatanodeError()
|
||||||
|
&& streamer.getErrorState().doWaitForRestart()) {
|
||||||
|
healthySet.add(streamer);
|
||||||
|
failedStreamers.remove(streamer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return healthySet;
|
return healthySet;
|
||||||
|
@ -685,6 +692,14 @@ public class DFSStripedOutputStream extends DFSOutputStream
|
||||||
for (int i = 0; i < numAllBlocks; i++) {
|
for (int i = 0; i < numAllBlocks; i++) {
|
||||||
coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
|
coordinator.offerStreamerUpdateResult(i, newFailed.size() == 0);
|
||||||
}
|
}
|
||||||
|
//wait for get notify to failed stream
|
||||||
|
if (newFailed.size() != 0) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(datanodeRestartTimeout);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// Do nothing
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -143,7 +143,8 @@ public class StripedDataStreamer extends DataStreamer {
|
||||||
|
|
||||||
// set up the pipeline again with the remaining nodes. when a striped
|
// set up the pipeline again with the remaining nodes. when a striped
|
||||||
// data streamer comes here, it must be in external error state.
|
// data streamer comes here, it must be in external error state.
|
||||||
assert getErrorState().hasExternalError();
|
assert getErrorState().hasExternalError()
|
||||||
|
|| getErrorState().doWaitForRestart();
|
||||||
success = createBlockOutputStream(nodes, nodeStorageTypes,
|
success = createBlockOutputStream(nodes, nodeStorageTypes,
|
||||||
nodeStorageIDs, newGS, true);
|
nodeStorageIDs, newGS, true);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue