diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 6006d71faf6..9373e98bf42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -189,6 +189,16 @@ public interface HdfsClientConfigKeys { int THREADPOOL_SIZE_DEFAULT = 18; } + /** dfs.client.write.striped configuration properties */ + interface StripedWrite { + String PREFIX = Write.PREFIX + "striped."; + + String MAX_SECONDS_GET_STRIPED_BLOCK_KEY = PREFIX + "max-seconds-get-striped-block"; + int MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT = 90; + String MAX_SECONDS_GET_ENDED_BLOCK_KEY = PREFIX + "max-seconds-get-ended-block"; + int MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT = 60; + } + /** dfs.http.client configuration properties */ interface HttpClient { String PREFIX = "dfs.http.client."; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt old mode 100644 new mode 100755 index 3170e9bc0fb..939ba89b7e7 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -225,3 +225,6 @@ (Yi Liu via jing9) HDFS-8320. Erasure coding: consolidate striping-related terminologies. (zhz) + + HDFS-8366. Erasure Coding: Make the timeout parameter of polling blocking queue + configurable in DFSStripedOutputStream. (Li Bo) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index b99afab9547..a6480238b59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -33,6 +33,8 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.CreateFlag; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -61,11 +63,14 @@ import com.google.common.base.Preconditions; public class DFSStripedOutputStream extends DFSOutputStream { /** Coordinate the communication between the streamers. */ static class Coordinator { + private final DfsClientConf conf; private final List> endBlocks; private final List> stripedBlocks; private volatile boolean shouldLocateFollowingBlock = false; - Coordinator(final int numDataBlocks, final int numAllBlocks) { + Coordinator(final DfsClientConf conf, final int numDataBlocks, + final int numAllBlocks) { + this.conf = conf; endBlocks = new ArrayList<>(numDataBlocks); for (int i = 0; i < numDataBlocks; i++) { endBlocks.add(new LinkedBlockingQueue(1)); @@ -91,7 +96,9 @@ public class DFSStripedOutputStream extends DFSOutputStream { ExtendedBlock getEndBlock(int i) throws InterruptedIOException { try { - return endBlocks.get(i).poll(30, TimeUnit.SECONDS); + return endBlocks.get(i).poll( + conf.getStripedWriteMaxSecondsGetEndedBlock(), + TimeUnit.SECONDS); } catch (InterruptedException e) { throw DFSUtil.toInterruptedIOException( "getEndBlock interrupted, i=" + i, e); @@ -121,7 +128,9 @@ public class DFSStripedOutputStream extends DFSOutputStream { LocatedBlock getStripedBlock(int i) throws IOException { final LocatedBlock lb; try { - lb = stripedBlocks.get(i).poll(90, TimeUnit.SECONDS); + lb = stripedBlocks.get(i).poll( + conf.getStripedWriteMaxSecondsGetStripedBlock(), + TimeUnit.SECONDS); } catch (InterruptedException e) { throw DFSUtil.toInterruptedIOException("getStripedBlock interrupted", e); } @@ -133,7 +142,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { } } - /** Buffers for writing the data and parity cells of a strip. */ + /** Buffers for writing the data and parity cells of a stripe. */ class CellBuffers { private final ByteBuffer[] buffers; private final byte[][] checksumArrays; @@ -228,7 +237,7 @@ public class DFSStripedOutputStream extends DFSOutputStream { encoder = new RSRawEncoder(); encoder.initialize(numDataBlocks, numParityBlocks, cellSize); - coordinator = new Coordinator(numDataBlocks, numAllBlocks); + coordinator = new Coordinator(dfsClient.getConf(), numDataBlocks, numAllBlocks); try { cellBuffers = new CellBuffers(numParityBlocks); } catch (InterruptedException ie) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 32a3da0a7c2..34ec06d9998 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -103,6 +103,9 @@ public class DfsClientConf { private final int hedgedReadThreadpoolSize; private final int stripedReadThreadpoolSize; + private final int stripedWriteMaxSecondsGetStripedBlock; + private final int stripedWriteMaxSecondsGetEndedBlock; + public DfsClientConf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout @@ -225,6 +228,13 @@ public class DfsClientConf { Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " + HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + " must be greater than 0."); + + stripedWriteMaxSecondsGetStripedBlock = conf.getInt( + HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_KEY, + HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_STRIPED_BLOCK_DEFAULT); + stripedWriteMaxSecondsGetEndedBlock = conf.getInt( + HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_KEY, + HdfsClientConfigKeys.StripedWrite.MAX_SECONDS_GET_ENDED_BLOCK_DEFAULT); } private DataChecksum.Type getChecksumType(Configuration conf) { @@ -508,6 +518,20 @@ public class DfsClientConf { return stripedReadThreadpoolSize; } + /** + * @return stripedWriteMaxSecondsGetStripedBlock + */ + public int getStripedWriteMaxSecondsGetStripedBlock() { + return stripedWriteMaxSecondsGetStripedBlock; + } + + /** + * @return stripedWriteMaxSecondsGetEndedBlock + */ + public int getStripedWriteMaxSecondsGetEndedBlock() { + return stripedWriteMaxSecondsGetEndedBlock; + } + /** * @return the shortCircuitConf */