From a3224ff799b59e75dc50890c4b49b7113e57a53c Mon Sep 17 00:00:00 2001 From: Surendra Singh Lilhore Date: Sun, 15 Mar 2020 20:44:32 +0530 Subject: [PATCH] HDFS-15211. EC: File write hangs during close in case of Exception during updatePipeline. Contributed by Ayush Saxena. --- .../hadoop/hdfs/DFSStripedOutputStream.java | 60 +++++++++++-------- 1 file changed, 35 insertions(+), 25 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index c94c9dafc88..8fc7eaa955b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -393,6 +393,7 @@ public class DFSStripedOutputStream extends DFSOutputStream LOG.debug("newly failed streamers: " + newFailed); } if (failCount > (numAllBlocks - numDataBlocks)) { + closeAllStreamers(); throw new IOException("Failed: the number of failed blocks = " + failCount + " > the number of parity blocks = " + (numAllBlocks - numDataBlocks)); @@ -400,6 +401,13 @@ public class DFSStripedOutputStream extends DFSOutputStream return newFailed; } + private void closeAllStreamers() { + // The write has failed, Close all the streamers. + for (StripedDataStreamer streamer : streamers) { + streamer.close(true); + } + } + private void handleCurrentStreamerFailure(String err, Exception e) throws IOException { currentPacket = null; @@ -654,6 +662,8 @@ public class DFSStripedOutputStream extends DFSOutputStream newFailed = waitCreatingStreamers(healthySet); if (newFailed.size() + failedStreamers.size() > numAllBlocks - numDataBlocks) { + // The write has failed, Close all the streamers. + closeAllStreamers(); throw new IOException( "Data streamers failed while creating new block streams: " + newFailed + ". There are not enough healthy streamers."); @@ -1153,32 +1163,32 @@ public class DFSStripedOutputStream extends DFSOutputStream @Override protected synchronized void closeImpl() throws IOException { - if (isClosed()) { - exceptionLastSeen.check(true); - - // Writing to at least {dataUnits} replicas can be considered as success, - // and the rest of data can be recovered. - final int minReplication = ecPolicy.getNumDataUnits(); - int goodStreamers = 0; - final MultipleIOException.Builder b = new MultipleIOException.Builder(); - for (final StripedDataStreamer si : streamers) { - try { - si.getLastException().check(true); - goodStreamers++; - } catch (IOException e) { - b.add(e); - } - } - if (goodStreamers < minReplication) { - final IOException ioe = b.build(); - if (ioe != null) { - throw ioe; - } - } - return; - } - try { + if (isClosed()) { + exceptionLastSeen.check(true); + + // Writing to at least {dataUnits} replicas can be considered as + // success, and the rest of data can be recovered. + final int minReplication = ecPolicy.getNumDataUnits(); + int goodStreamers = 0; + final MultipleIOException.Builder b = new MultipleIOException.Builder(); + for (final StripedDataStreamer si : streamers) { + try { + si.getLastException().check(true); + goodStreamers++; + } catch (IOException e) { + b.add(e); + } + } + if (goodStreamers < minReplication) { + final IOException ioe = b.build(); + if (ioe != null) { + throw ioe; + } + } + return; + } + try { // flush from all upper layers flushBuffer();