HADOOP-15012. Add readahead, dropbehind, and unbuffer to StreamCapabilities. Contributed by John Zhuge.

This commit is contained in:
John Zhuge 2017-11-06 23:54:27 -08:00
parent 0a72c2f56c
commit bf6a660232
7 changed files with 139 additions and 39 deletions

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.util.IdentityHashStore;
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer {
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
@ -227,12 +227,15 @@ public class FSDataInputStream extends DataInputStream
@Override
public void unbuffer() {
try {
((CanUnbuffer)in).unbuffer();
} catch (ClassCastException e) {
throw new UnsupportedOperationException("this stream " +
in.getClass().getName() + " does not " + "support unbuffering.");
StreamCapabilitiesPolicy.unbuffer(in);
}
@Override
public boolean hasCapability(String capability) {
if (in instanceof StreamCapabilities) {
return ((StreamCapabilities) in).hasCapability(capability);
}
return false;
}
/**

View File

@ -23,27 +23,49 @@ import org.apache.hadoop.classification.InterfaceStability;
/**
* Interface to query streams for supported capabilities.
*
* Capability strings must be in lower case.
*
* Constant strings are chosen over enums in order to allow other file systems
* to define their own capabilities.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface StreamCapabilities {
/**
* Stream hflush capability implemented by {@link Syncable#hflush()}.
*/
String HFLUSH = "hflush";
/**
* Stream hsync capability implemented by {@link Syncable#hsync()}.
*/
String HSYNC = "hsync";
/**
* Stream setReadahead capability implemented by
* {@link CanSetReadahead#setReadahead(Long)}.
*/
String READAHEAD = "in:readahead";
/**
* Stream setDropBehind capability implemented by
* {@link CanSetDropBehind#setDropBehind(Boolean)}.
*/
String DROPBEHIND = "dropbehind";
/**
* Stream unbuffer capability implemented by {@link CanUnbuffer#unbuffer()}.
*/
String UNBUFFER = "in:unbuffer";
/**
* Capabilities that a stream can support and be queried for.
*/
@Deprecated
enum StreamCapability {
/**
* Stream hflush capability to flush out the data in client's buffer.
* Streams with this capability implement {@link Syncable} and support
* {@link Syncable#hflush()}.
*/
HFLUSH("hflush"),
/**
* Stream hsync capability to flush out the data in client's buffer and
* the disk device. Streams with this capability implement {@link Syncable}
* and support {@link Syncable#hsync()}.
*/
HSYNC("hsync");
HFLUSH(StreamCapabilities.HFLUSH),
HSYNC(StreamCapabilities.HSYNC);
private final String capability;

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.InputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Static methods to implement policies for {@link StreamCapabilities}.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class StreamCapabilitiesPolicy {
/**
* Implement the policy for {@link CanUnbuffer#unbuffer()}.
*
* @param in the input stream
*/
public static void unbuffer(InputStream in) {
try {
if (in instanceof StreamCapabilities
&& ((StreamCapabilities) in).hasCapability(
StreamCapabilities.UNBUFFER)) {
((CanUnbuffer) in).unbuffer();
}
} catch (ClassCastException e) {
throw new UnsupportedOperationException("this stream " +
in.getClass().getName() +
" claims to unbuffer but forgets to implement CanUnbuffer");
}
}
}

View File

@ -1359,7 +1359,8 @@ problems were not considered during the implementation of these loops.
## <a name="StreamCapability"></a> interface `StreamCapabilities`
The `StreamCapabilities` provides a way to programmatically query the
capabilities that an `OutputStream` supports.
capabilities that `OutputStream`, `InputStream`, or other FileSystem class
supports.
```java
public interface StreamCapabilities {
@ -1369,12 +1370,16 @@ public interface StreamCapabilities {
### `boolean hasCapability(capability)`
Return true if the `OutputStream` has the desired capability.
Return true if the `OutputStream`, `InputStream`, or other FileSystem class
has the desired capability.
The caller can query the capabilities of a stream using a string value.
It currently supports to query:
Here is a table of possible string values:
* `StreamCapabilties.HFLUSH` ("*hflush*"): the capability to flush out the data
in client's buffer.
* `StreamCapabilities.HSYNC` ("*hsync*"): capability to flush out the data in
client's buffer and the disk device.
String | Constant | Implements | Description
-------------|------------|------------------|-------------------------------
hflush | HFLUSH | Syncable | Flush out the data in client's user buffer. After the return of this call, new readers will see the data.
hsync | HSYNC | Syncable | Flush out the data in client's user buffer all the way to the disk device (but the disk may have it in its cache). Similar to POSIX fsync.
in:readahead | READAHEAD | CanSetReadahead | Set the readahead on the input stream.
dropbehind | DROPBEHIND | CanSetDropBehind | Drop the cache.
in:unbuffer | UNBUFFER | CanUnbuffer | Reduce the buffering on the input stream.

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.hdfs.DFSUtilClient.CorruptedBlocks;
import org.apache.hadoop.hdfs.client.impl.BlockReaderFactory;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
@ -81,6 +82,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore;
import org.apache.hadoop.util.StopWatch;
import org.apache.hadoop.util.StringUtils;
import org.apache.htrace.core.SpanId;
import org.apache.htrace.core.TraceScope;
import org.apache.htrace.core.Tracer;
@ -96,7 +98,7 @@ import javax.annotation.Nonnull;
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer {
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
@VisibleForTesting
public static boolean tcpReadsDisabledForTesting = false;
private long hedgedReadOpsLoopNumForTesting = 0;
@ -1779,4 +1781,16 @@ public class DFSInputStream extends FSInputStream
public synchronized void unbuffer() {
closeCurrentBlockReaders();
}
@Override
public boolean hasCapability(String capability) {
switch (StringUtils.toLowerCase(capability)) {
case StreamCapabilities.READAHEAD:
case StreamCapabilities.DROPBEHIND:
case StreamCapabilities.UNBUFFER:
return true;
default:
return false;
}
}
}

View File

@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH;
import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InterruptedIOException;
@ -69,6 +66,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DataChecksum.Type;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.htrace.core.TraceScope;
import org.slf4j.Logger;
@ -552,11 +550,13 @@ public class DFSOutputStream extends FSOutputSummer
@Override
public boolean hasCapability(String capability) {
if (capability.equalsIgnoreCase(HSYNC.getValue()) ||
capability.equalsIgnoreCase((HFLUSH.getValue()))) {
switch (StringUtils.toLowerCase(capability)) {
case StreamCapabilities.HSYNC:
case StreamCapabilities.HFLUSH:
return true;
default:
return false;
}
return false;
}
/**

View File

@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
import java.util.UUID;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedDeque;
@ -63,9 +64,6 @@ import com.microsoft.azure.storage.blob.BlockEntry;
import com.microsoft.azure.storage.blob.BlockListingFilter;
import com.microsoft.azure.storage.blob.BlockSearchMode;
import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HFLUSH;
import static org.apache.hadoop.fs.StreamCapabilities.StreamCapability.HSYNC;
/**
* Stream object that implements append for Block Blobs in WASB.
*
@ -550,9 +548,16 @@ public class BlockBlobAppendStream extends OutputStream implements Syncable,
*/
@Override
public boolean hasCapability(String capability) {
return compactionEnabled
&& (capability.equalsIgnoreCase(HSYNC.getValue())
|| capability.equalsIgnoreCase((HFLUSH.getValue())));
if (!compactionEnabled) {
return false;
}
switch (capability.toLowerCase(Locale.ENGLISH)) {
case StreamCapabilities.HSYNC:
case StreamCapabilities.HFLUSH:
return true;
default:
return false;
}
}
/**