From 52adaa43dbf9c18c86932bcd1d895ce98f9bd567 Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Thu, 7 Dec 2017 21:11:29 -0800 Subject: [PATCH] HADOOP-15012. Add readahead, dropbehind, and unbuffer to StreamCapabilities. Contributed by John Zhuge. (cherry picked from commit c9d48352c232ac9e031559cb99a18ff249d2b6c1) --- .../apache/hadoop/fs/FSDataInputStream.java | 15 +++--- .../apache/hadoop/fs/StreamCapabilities.java | 48 ++++++++++++----- .../hadoop/fs/StreamCapabilitiesPolicy.java | 51 +++++++++++++++++++ .../site/markdown/filesystem/filesystem.md | 19 ++++--- .../apache/hadoop/hdfs/DFSInputStream.java | 16 +++++- .../apache/hadoop/hdfs/DFSOutputStream.java | 12 ++--- .../fs/azure/BlockBlobAppendStream.java | 17 ++++--- 7 files changed, 139 insertions(+), 39 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index a80279db525..08d71f16c07 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -38,7 +38,7 @@ import org.apache.hadoop.util.IdentityHashStore; public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer { + HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities { /** * Map ByteBuffers that we have handed out to readers to ByteBufferPool * objects @@ -227,12 +227,15 @@ public class FSDataInputStream extends DataInputStream @Override public void unbuffer() { - try { - ((CanUnbuffer)in).unbuffer(); - } catch (ClassCastException e) { - throw new UnsupportedOperationException("this stream " + - in.getClass().getName() + " does not " + "support unbuffering."); + StreamCapabilitiesPolicy.unbuffer(in); + } + + @Override + public boolean hasCapability(String capability) { + if (in instanceof StreamCapabilities) { + return ((StreamCapabilities) in).hasCapability(capability); } + return false; } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java index 65aa67988a8..3549cdc4fa3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java @@ -23,27 +23,49 @@ import org.apache.hadoop.classification.InterfaceStability; /** * Interface to query streams for supported capabilities. + * + * Capability strings must be in lower case. + * + * Constant strings are chosen over enums in order to allow other file systems + * to define their own capabilities. */ @InterfaceAudience.Public @InterfaceStability.Evolving public interface StreamCapabilities { + /** + * Stream hflush capability implemented by {@link Syncable#hflush()}. + */ + String HFLUSH = "hflush"; + + /** + * Stream hsync capability implemented by {@link Syncable#hsync()}. + */ + String HSYNC = "hsync"; + + /** + * Stream setReadahead capability implemented by + * {@link CanSetReadahead#setReadahead(Long)}. + */ + String READAHEAD = "in:readahead"; + + /** + * Stream setDropBehind capability implemented by + * {@link CanSetDropBehind#setDropBehind(Boolean)}. + */ + String DROPBEHIND = "dropbehind"; + + /** + * Stream unbuffer capability implemented by {@link CanUnbuffer#unbuffer()}. + */ + String UNBUFFER = "in:unbuffer"; + /** * Capabilities that a stream can support and be queried for. */ + @Deprecated enum StreamCapability { - /** - * Stream hflush capability to flush out the data in client's buffer. - * Streams with this capability implement {@link Syncable} and support - * {@link Syncable#hflush()}. - */ - HFLUSH("hflush"), - - /** - * Stream hsync capability to flush out the data in client's buffer and - * the disk device. Streams with this capability implement {@link Syncable} - * and support {@link Syncable#hsync()}. - */ - HSYNC("hsync"); + HFLUSH(StreamCapabilities.HFLUSH), + HSYNC(StreamCapabilities.HSYNC); private final String capability; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java new file mode 100644 index 00000000000..3080780ddab --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilitiesPolicy.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.InputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Static methods to implement policies for {@link StreamCapabilities}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class StreamCapabilitiesPolicy { + /** + * Implement the policy for {@link CanUnbuffer#unbuffer()}. + * + * @param in the input stream + */ + public static void unbuffer(InputStream in) { + try { + if (in instanceof StreamCapabilities + && ((StreamCapabilities) in).hasCapability( + StreamCapabilities.UNBUFFER)) { + ((CanUnbuffer) in).unbuffer(); + } + } catch (ClassCastException e) { + throw new UnsupportedOperationException("this stream " + + in.getClass().getName() + + " claims to unbuffer but forgets to implement CanUnbuffer"); + } + } +} + diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md index 21209bd50f7..fdac610afbb 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/filesystem/filesystem.md @@ -1204,7 +1204,8 @@ problems were not considered during the implementation of these loops. ## interface `StreamCapabilities` The `StreamCapabilities` provides a way to programmatically query the -capabilities that an `OutputStream` supports. +capabilities that `OutputStream`, `InputStream`, or other FileSystem class +supports. ```java public interface StreamCapabilities { @@ -1214,12 +1215,16 @@ public interface StreamCapabilities { ### `boolean hasCapability(capability)` -Return true if the `OutputStream` has the desired capability. +Return true if the `OutputStream`, `InputStream`, or other FileSystem class +has the desired capability. The caller can query the capabilities of a stream using a string value. -It currently supports to query: +Here is a table of possible string values: - * `StreamCapabilties.HFLUSH` ("*hflush*"): the capability to flush out the data - in client's buffer. - * `StreamCapabilities.HSYNC` ("*hsync*"): capability to flush out the data in - client's buffer and the disk device. \ No newline at end of file +String | Constant | Implements | Description +-------------|------------|------------------|------------------------------- +hflush | HFLUSH | Syncable | Flush out the data in client's user buffer. After the return of this call, new readers will see the data. +hsync | HSYNC | Syncable | Flush out the data in client's user buffer all the way to the disk device (but the disk may have it in its cache). Similar to POSIX fsync. +in:readahead | READAHEAD | CanSetReadahead | Set the readahead on the input stream. +dropbehind | DROPBEHIND | CanSetDropBehind | Drop the cache. +in:unbuffer | UNBUFFER | CanUnbuffer | Reduce the buffering on the input stream. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 3e50d333bea..cff259f1651 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -61,6 +61,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; @@ -83,6 +84,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.IdentityHashStore; import org.apache.hadoop.util.StopWatch; +import org.apache.hadoop.util.StringUtils; import org.apache.htrace.core.SpanId; import org.apache.htrace.core.TraceScope; import org.apache.htrace.core.Tracer; @@ -98,7 +100,7 @@ import javax.annotation.Nonnull; @InterfaceAudience.Private public class DFSInputStream extends FSInputStream implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, - HasEnhancedByteBufferAccess, CanUnbuffer { + HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities { @VisibleForTesting public static boolean tcpReadsDisabledForTesting = false; private long hedgedReadOpsLoopNumForTesting = 0; @@ -1991,4 +1993,16 @@ public class DFSInputStream extends FSInputStream public synchronized void unbuffer() { closeCurrentBlockReaders(); } + + @Override + public boolean hasCapability(String capability) { + switch (StringUtils.toLowerCase(capability)) { + case StreamCapabilities.READAHEAD: + case StreamCapabilities.DROPBEHIND: + case StreamCapabilities.UNBUFFER: + return true; + default: + return false; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index d608f347208..9f16af5c30c 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -17,9 +17,6 @@ */ package org.apache.hadoop.hdfs; -import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH; -import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC; - import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; @@ -67,6 +64,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.htrace.core.TraceScope; import org.slf4j.Logger; @@ -560,11 +558,13 @@ public class DFSOutputStream extends FSOutputSummer @Override public boolean hasCapability(String capability) { - if (capability.equalsIgnoreCase(HSYNC.getValue()) || - capability.equalsIgnoreCase((HFLUSH.getValue()))) { + switch (StringUtils.toLowerCase(capability)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: return true; + default: + return false; } - return false; } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java index eaada1b6c19..1cd798aa501 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java @@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Iterator; import java.util.List; +import java.util.Locale; import java.util.UUID; import java.util.Random; import java.util.concurrent.ConcurrentLinkedDeque; @@ -63,9 +64,6 @@ import com.microsoft.azure.storage.blob.BlockEntry; import com.microsoft.azure.storage.blob.BlockListingFilter; import com.microsoft.azure.storage.blob.BlockSearchMode; -import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH; -import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC; - /** * Stream object that implements append for Block Blobs in WASB. * @@ -562,9 +560,16 @@ public class BlockBlobAppendStream extends OutputStream implements Syncable, */ @Override public boolean hasCapability(String capability) { - return compactionEnabled - && (capability.equalsIgnoreCase(HSYNC.getValue()) - || capability.equalsIgnoreCase((HFLUSH.getValue()))); + if (!compactionEnabled) { + return false; + } + switch (capability.toLowerCase(Locale.ENGLISH)) { + case StreamCapabilities.HSYNC: + case StreamCapabilities.HFLUSH: + return true; + default: + return false; + } } /**