HDFS-12999. When reach the end of the block group, it may not need to flush all the data packets(flushAllInternals) twice. Contributed by lufei and Fei Hui.

(cherry picked from commit df622cf4a3)
This commit is contained in:
Ayush Saxena 2019-12-25 11:07:25 +05:30 committed by Wei-Chiu Chuang
parent e1ff3b2f97
commit 9acbbd885c
1 changed files with 10 additions and 7 deletions

View File

@ -556,7 +556,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
// if this is the end of the block group, end each internal block // if this is the end of the block group, end each internal block
if (shouldEndBlockGroup()) { if (shouldEndBlockGroup()) {
flushAllInternals(); flushAllInternals();
checkStreamerFailures(); checkStreamerFailures(false);
for (int i = 0; i < numAllBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i); final StripedDataStreamer s = setCurrentStreamer(i);
if (s.isHealthy()) { if (s.isHealthy()) {
@ -567,7 +567,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
} }
} else { } else {
// check failure state for all the streamers. Bump GS if necessary // check failure state for all the streamers. Bump GS if necessary
checkStreamerFailures(); checkStreamerFailures(true);
} }
} }
setCurrentStreamer(next); setCurrentStreamer(next);
@ -623,15 +623,18 @@ public class DFSStripedOutputStream extends DFSOutputStream
* written a full stripe (i.e., enqueue all packets for a full stripe), or * written a full stripe (i.e., enqueue all packets for a full stripe), or
* when we're closing the outputstream. * when we're closing the outputstream.
*/ */
private void checkStreamerFailures() throws IOException { private void checkStreamerFailures(boolean isNeedFlushAllPackets)
throws IOException {
Set<StripedDataStreamer> newFailed = checkStreamers(); Set<StripedDataStreamer> newFailed = checkStreamers();
if (newFailed.size() == 0) { if (newFailed.size() == 0) {
return; return;
} }
// for healthy streamers, wait till all of them have fetched the new block if (isNeedFlushAllPackets) {
// and flushed out all the enqueued packets. // for healthy streamers, wait till all of them have fetched the new block
flushAllInternals(); // and flushed out all the enqueued packets.
flushAllInternals();
}
// recheck failed streamers again after the flush // recheck failed streamers again after the flush
newFailed = checkStreamers(); newFailed = checkStreamers();
while (newFailed.size() > 0) { while (newFailed.size() > 0) {
@ -1188,7 +1191,7 @@ public class DFSStripedOutputStream extends DFSOutputStream
// flush all the data packets // flush all the data packets
flushAllInternals(); flushAllInternals();
// check failures // check failures
checkStreamerFailures(); checkStreamerFailures(false);
for (int i = 0; i < numAllBlocks; i++) { for (int i = 0; i < numAllBlocks; i++) {
final StripedDataStreamer s = setCurrentStreamer(i); final StripedDataStreamer s = setCurrentStreamer(i);