From e30ce01ddce1cfd1e9d49c4784eb4a6bc87e36ca Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Mon, 1 Feb 2016 13:02:58 -0800 Subject: [PATCH] HDFS-9494. Parallel optimization of DFSStripedOutputStream#flushAllInternals. Contributed by Gao Rui. --- .../apache/hadoop/hdfs/DFSOutputStream.java | 28 ++++--- .../hadoop/hdfs/DFSStripedOutputStream.java | 73 +++++++++++++++---- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + 3 files changed, 79 insertions(+), 25 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index f6a8981a489..1c58b28fbfb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -685,18 +685,7 @@ public class DFSOutputStream extends FSOutputSummer * received from datanodes. */ protected void flushInternal() throws IOException { - long toWaitFor; - synchronized (this) { - dfsClient.checkOpen(); - checkClosed(); - // - // If there is data in the current buffer, send it across - // - getStreamer().queuePacket(currentPacket); - currentPacket = null; - toWaitFor = getStreamer().getLastQueuedSeqno(); - } - + long toWaitFor = flushInternalWithoutWaitingAck(); getStreamer().waitForAckedSeqno(toWaitFor); } @@ -864,6 +853,21 @@ public class DFSOutputStream extends FSOutputSummer return getStreamer().getBlockToken(); } + protected long flushInternalWithoutWaitingAck() throws IOException { + long toWaitFor; + synchronized (this) { + dfsClient.checkOpen(); + checkClosed(); + // + // If there is data in the current buffer, send it across + // + getStreamer().queuePacket(currentPacket); + currentPacket = null; + toWaitFor = getStreamer().getLastQueuedSeqno(); + } + return toWaitFor; + } + @Override public void setDropBehind(Boolean dropBehind) throws IOException { CachingStrategy prevStrategy, nextStrategy; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index e1ff84461f8..8292d0a0262 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -34,6 +34,13 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -245,7 +252,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { private final List streamers; private final DFSPacket[] currentPackets; // current Packet of each streamer - /** Size of each striping cell, must be a multiple of bytesPerChecksum */ + // Size of each striping cell, must be a multiple of bytesPerChecksum. private final int cellSize; private final int numAllBlocks; private final int numDataBlocks; @@ -253,6 +260,8 @@ public class DFSStripedOutputStream extends DFSOutputStream { private final String[] favoredNodes; private final List failedStreamers; private final Map corruptBlockCountMap; + private ExecutorService flushAllExecutor; + private CompletionService flushAllExecutorCompletionService; private int blockGroupIndex; /** Construct a new output stream for creating a file. */ @@ -273,6 +282,9 @@ public class DFSStripedOutputStream extends DFSOutputStream { this.favoredNodes = favoredNodes; failedStreamers = new ArrayList<>(); corruptBlockCountMap = new LinkedHashMap<>(); + flushAllExecutor = Executors.newFixedThreadPool(numAllBlocks); + flushAllExecutorCompletionService = new + ExecutorCompletionService<>(flushAllExecutor); encoder = CodecUtil.createRSRawEncoder(dfsClient.getConfiguration(), numDataBlocks, numParityBlocks); @@ -368,13 +380,19 @@ public class DFSStripedOutputStream extends DFSOutputStream { return newFailed; } - private void handleStreamerFailure(String err, Exception e) + private void handleCurrentStreamerFailure(String err, Exception e) throws IOException { - LOG.warn("Failed: " + err + ", " + this, e); - getCurrentStreamer().getErrorState().setInternalError(); - getCurrentStreamer().close(true); - checkStreamers(); currentPacket = null; + handleStreamerFailure(err, e, getCurrentStreamer()); + } + + private void handleStreamerFailure(String err, Exception e, + StripedDataStreamer streamer) throws IOException { + LOG.warn("Failed: " + err + ", " + this, e); + streamer.getErrorState().setInternalError(); + streamer.close(true); + checkStreamers(); + currentPackets[streamer.getIndex()] = null; } private void replaceFailedStreamers() { @@ -495,7 +513,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { try { super.writeChunk(bytes, offset, len, checksum, ckoff, cklen); } catch(Exception e) { - handleStreamerFailure("offset=" + offset + ", length=" + len, e); + handleCurrentStreamerFailure("offset=" + offset + ", length=" + len, e); } } @@ -804,7 +822,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { streamer.closeSocket(); } catch (Exception e) { try { - handleStreamerFailure("force=" + force, e); + handleCurrentStreamerFailure("force=" + force, e); } catch (IOException ioe) { b.add(ioe); } @@ -894,7 +912,8 @@ public class DFSStripedOutputStream extends DFSOutputStream { getChecksumSize()); } } catch(Exception e) { - handleStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, e); + handleCurrentStreamerFailure("oldBytes=" + oldBytes + ", len=" + len, + e); } } } @@ -968,6 +987,8 @@ public class DFSStripedOutputStream extends DFSOutputStream { } catch (ClosedChannelException ignored) { } finally { setClosed(); + // shutdown executor of flushAll tasks + flushAllExecutor.shutdownNow(); } } @@ -980,7 +1001,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { try { enqueueCurrentPacket(); } catch (IOException e) { - handleStreamerFailure("enqueueAllCurrentPackets, i=" + i, e); + handleCurrentStreamerFailure("enqueueAllCurrentPackets, i=" + i, e); } } } @@ -988,6 +1009,8 @@ public class DFSStripedOutputStream extends DFSOutputStream { } void flushAllInternals() throws IOException { + Map, Integer> flushAllFuturesMap = new HashMap<>(); + Future future = null; int current = getCurrentIndex(); for (int i = 0; i < numAllBlocks; i++) { @@ -995,13 +1018,37 @@ public class DFSStripedOutputStream extends DFSOutputStream { if (s.isHealthy()) { try { // flush all data to Datanode - flushInternal(); - } catch(Exception e) { - handleStreamerFailure("flushInternal " + s, e); + final long toWaitFor = flushInternalWithoutWaitingAck(); + future = flushAllExecutorCompletionService.submit( + new Callable() { + @Override + public Void call() throws Exception { + s.waitForAckedSeqno(toWaitFor); + return null; + } + }); + flushAllFuturesMap.put(future, i); + } catch (Exception e) { + handleCurrentStreamerFailure("flushInternal " + s, e); } } } setCurrentStreamer(current); + for (int i = 0; i < flushAllFuturesMap.size(); i++) { + try { + future = flushAllExecutorCompletionService.take(); + future.get(); + } catch (InterruptedException ie) { + throw DFSUtilClient.toInterruptedIOException( + "Interrupted during waiting all streamer flush, ", ie); + } catch (ExecutionException ee) { + LOG.warn( + "Caught ExecutionException while waiting all streamer flush, ", ee); + StripedDataStreamer s = streamers.get(flushAllFuturesMap.get(future)); + handleStreamerFailure("flushInternal " + s, + (Exception) ee.getCause(), s); + } + } } static void sleep(long ms, String op) throws InterruptedIOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 1141df14e8f..18716e095d8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -217,6 +217,9 @@ Trunk (Unreleased) HDFS-9582. TestLeaseRecoveryStriped file missing Apache License header and not well formatted. (umamahesh) + HDFS-9494. Parallel optimization of DFSStripedOutputStream#flushAllInternals. + (Gao Rui via jing9) + OPTIMIZATIONS BUG FIXES