From 354e63b366e22b04d8bfe72aa0eb2bb3e24f09b3 Mon Sep 17 00:00:00 2001 From: Ayush Saxena Date: Wed, 25 Dec 2019 11:07:25 +0530 Subject: [PATCH] HDFS-12999. When reach the end of the block group, it may not need to flush all the data packets(flushAllInternals) twice. Contributed by lufei and Fei Hui. (cherry picked from commit df622cf4a32ee172ded6c4b3b97a1e49befc4f10) --- .../hadoop/hdfs/DFSStripedOutputStream.java | 17 ++++++++++------- 1 file changed, 10 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 97310ee8adf..c94c9dafc88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -556,7 +556,7 @@ public class DFSStripedOutputStream extends DFSOutputStream // if this is the end of the block group, end each internal block if (shouldEndBlockGroup()) { flushAllInternals(); - checkStreamerFailures(); + checkStreamerFailures(false); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i); if (s.isHealthy()) { @@ -567,7 +567,7 @@ public class DFSStripedOutputStream extends DFSOutputStream } } else { // check failure state for all the streamers. Bump GS if necessary - checkStreamerFailures(); + checkStreamerFailures(true); } } setCurrentStreamer(next); @@ -623,15 +623,18 @@ public class DFSStripedOutputStream extends DFSOutputStream * written a full stripe (i.e., enqueue all packets for a full stripe), or * when we're closing the outputstream. */ - private void checkStreamerFailures() throws IOException { + private void checkStreamerFailures(boolean isNeedFlushAllPackets) + throws IOException { Set newFailed = checkStreamers(); if (newFailed.size() == 0) { return; } - // for healthy streamers, wait till all of them have fetched the new block - // and flushed out all the enqueued packets. - flushAllInternals(); + if (isNeedFlushAllPackets) { + // for healthy streamers, wait till all of them have fetched the new block + // and flushed out all the enqueued packets. + flushAllInternals(); + } // recheck failed streamers again after the flush newFailed = checkStreamers(); while (newFailed.size() > 0) { @@ -1188,7 +1191,7 @@ public class DFSStripedOutputStream extends DFSOutputStream // flush all the data packets flushAllInternals(); // check failures - checkStreamerFailures(); + checkStreamerFailures(false); for (int i = 0; i < numAllBlocks; i++) { final StripedDataStreamer s = setCurrentStreamer(i);