HDFS-4817. Make HDFS advisory caching configurable on a per-file basis. (Colin Patrick McCabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1506188 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2013-07-23 17:54:44 +00:00
parent 673b762364
commit 5ff0774e8a
36 changed files with 947 additions and 106 deletions

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface CanSetDropBehind {
/**
* Configure whether the stream should drop the cache.
*
* @param dropCache Whether to drop the cache. null means to use the
* default value.
* @throws IOException If there was an error changing the dropBehind
* setting.
* UnsupportedOperationException If this stream doesn't support
* setting the drop-behind.
*/
public void setDropBehind(Boolean dropCache)
throws IOException, UnsupportedOperationException;
}

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Evolving
public interface CanSetReadahead {
/**
* Set the readahead on this stream.
*
* @param readahead The readahead to use. null means to use the default.
* @throws IOException If there was an error changing the dropBehind
* setting.
* UnsupportedOperationException If this stream doesn't support
* setting readahead.
*/
public void setReadahead(Long readahead)
throws IOException, UnsupportedOperationException;
}

View File

@ -28,7 +28,8 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable, Closeable, ByteBufferReadable, HasFileDescriptor {
implements Seekable, PositionedReadable, Closeable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
public FSDataInputStream(InputStream in)
throws IOException {
@ -143,4 +144,27 @@ public class FSDataInputStream extends DataInputStream
return null;
}
}
@Override
public void setReadahead(Long readahead)
throws IOException, UnsupportedOperationException {
try {
((CanSetReadahead)in).setReadahead(readahead);
} catch (ClassCastException e) {
throw new UnsupportedOperationException(
"this stream does not support setting the readahead " +
"caching strategy.");
}
}
@Override
public void setDropBehind(Boolean dropBehind)
throws IOException, UnsupportedOperationException {
try {
((CanSetDropBehind)in).setDropBehind(dropBehind);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("this stream does not " +
"support setting the drop-behind caching setting.");
}
}
}

View File

@ -18,6 +18,10 @@
package org.apache.hadoop.fs;
import java.io.*;
import java.io.DataOutputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -26,8 +30,9 @@ import org.apache.hadoop.classification.InterfaceStability;
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream implements Syncable {
private OutputStream wrappedStream;
public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind {
private final OutputStream wrappedStream;
private static class PositionCache extends FilterOutputStream {
private FileSystem.Statistics statistics;
@ -133,4 +138,14 @@ public class FSDataOutputStream extends DataOutputStream implements Syncable {
wrappedStream.flush();
}
}
@Override
public void setDropBehind(Boolean dropBehind) throws IOException {
try {
((CanSetDropBehind)wrappedStream).setDropBehind(dropBehind);
} catch (ClassCastException e) {
throw new UnsupportedOperationException("the wrapped stream does " +
"not support setting the drop-behind caching setting.");
}
}
}

View File

@ -810,7 +810,8 @@ public class HarFileSystem extends FilterFileSystem {
/**
* Create an input stream that fakes all the reads/positions/seeking.
*/
private static class HarFsInputStream extends FSInputStream {
private static class HarFsInputStream extends FSInputStream
implements CanSetDropBehind, CanSetReadahead {
private long position, start, end;
//The underlying data input stream that the
// underlying filesystem will return.
@ -957,7 +958,18 @@ public class HarFileSystem extends FilterFileSystem {
public void readFully(long pos, byte[] b) throws IOException {
readFully(pos, b, 0, b.length);
}
@Override
public void setReadahead(Long readahead)
throws IOException, UnsupportedEncodingException {
underLyingStream.setReadahead(readahead);
}
@Override
public void setDropBehind(Boolean dropBehind)
throws IOException, UnsupportedEncodingException {
underLyingStream.setDropBehind(dropBehind);
}
}
/**

View File

@ -203,7 +203,7 @@ public class ReadaheadPool {
// It's also possible that we'll end up requesting readahead on some
// other FD, which may be wasted work, but won't cause a problem.
try {
NativeIO.POSIX.posixFadviseIfPossible(fd, off, len,
NativeIO.POSIX.posixFadviseIfPossible(identifier, fd, off, len,
NativeIO.POSIX.POSIX_FADV_WILLNEED);
} catch (IOException ioe) {
if (canceled) {

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.util.Shell;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* JNI wrappers for various native IO-related calls not available in Java.
* These functions should generally be used alongside a fallback to another
@ -92,6 +94,9 @@ public class NativeIO {
private static final Log LOG = LogFactory.getLog(NativeIO.class);
@VisibleForTesting
public static CacheTracker cacheTracker = null;
private static boolean nativeLoaded = false;
private static boolean fadvisePossible = true;
private static boolean syncFileRangePossible = true;
@ -102,6 +107,10 @@ public class NativeIO {
private static long cacheTimeout = -1;
public static interface CacheTracker {
public void fadvise(String identifier, long offset, long len, int flags);
}
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
@ -178,9 +187,12 @@ public class NativeIO {
*
* @throws NativeIOException if there is an error with the syscall
*/
public static void posixFadviseIfPossible(
public static void posixFadviseIfPossible(String identifier,
FileDescriptor fd, long offset, long len, int flags)
throws NativeIOException {
if (cacheTracker != null) {
cacheTracker.fadvise(identifier, offset, len, flags);
}
if (nativeLoaded && fadvisePossible) {
try {
posix_fadvise(fd, offset, len, flags);

View File

@ -27,6 +27,9 @@ Release 2.3.0 - UNRELEASED
HDFS-4278. Log an ERROR when DFS_BLOCK_ACCESS_TOKEN_ENABLE config is
disabled but security is turned on. (Kousuke Saruta via harsh)
HDFS-4817. Make HDFS advisory caching configurable on a per-file basis.
(Colin Patrick McCabe)
OPTIMIZATIONS
BUG FIXES

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.unix.DomainSocket;
@ -85,7 +86,8 @@ public class BlockReaderFactory {
DomainSocketFactory domSockFactory,
PeerCache peerCache,
FileInputStreamCache fisCache,
boolean allowShortCircuitLocalReads)
boolean allowShortCircuitLocalReads,
CachingStrategy cachingStrategy)
throws IOException {
peer.setReadTimeout(conf.socketTimeout);
peer.setWriteTimeout(HdfsServerConstants.WRITE_TIMEOUT);
@ -122,12 +124,14 @@ public class BlockReaderFactory {
@SuppressWarnings("deprecation")
RemoteBlockReader reader = RemoteBlockReader.newBlockReader(file,
block, blockToken, startOffset, len, conf.ioBufferSize,
verifyChecksum, clientName, peer, datanodeID, peerCache);
verifyChecksum, clientName, peer, datanodeID, peerCache,
cachingStrategy);
return reader;
} else {
return RemoteBlockReader2.newBlockReader(
file, block, blockToken, startOffset, len,
verifyChecksum, clientName, peer, datanodeID, peerCache);
verifyChecksum, clientName, peer, datanodeID, peerCache,
cachingStrategy);
}
}

View File

@ -44,6 +44,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_CACHE_EXPIR
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CACHE_READAHEAD;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
@ -135,6 +138,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.DataOutputBuffer;
@ -200,6 +204,8 @@ public class DFSClient implements java.io.Closeable {
private SocketAddress[] localInterfaceAddrs;
private DataEncryptionKey encryptionKey;
private boolean shouldUseLegacyBlockReaderLocal;
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
/**
* DFSClient configuration
@ -498,6 +504,16 @@ public class DFSClient implements java.io.Closeable {
}
this.peerCache = PeerCache.getInstance(dfsClientConf.socketCacheCapacity, dfsClientConf.socketCacheExpiry);
Boolean readDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_READS) == null) ?
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_READS, false);
Long readahead = (conf.get(DFS_CLIENT_CACHE_READAHEAD) == null) ?
null : conf.getLong(DFS_CLIENT_CACHE_READAHEAD, 0);
Boolean writeDropBehind = (conf.get(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES) == null) ?
null : conf.getBoolean(DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, false);
this.defaultReadCachingStrategy =
new CachingStrategy(readDropBehind, readahead);
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
}
/**
@ -1967,7 +1983,8 @@ public class DFSClient implements java.io.Closeable {
HdfsConstants.SMALL_BUFFER_SIZE));
DataInputStream in = new DataInputStream(pair.in);
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName, 0, 1, true);
new Sender(out).readBlock(lb.getBlock(), lb.getBlockToken(), clientName,
0, 1, true, CachingStrategy.newDefaultStrategy());
final BlockOpResponseProto reply =
BlockOpResponseProto.parseFrom(PBHelper.vintPrefixed(in));
@ -2466,4 +2483,12 @@ public class DFSClient implements java.io.Closeable {
public boolean useLegacyBlockReaderLocal() {
return shouldUseLegacyBlockReaderLocal;
}
public CachingStrategy getDefaultReadCachingStrategy() {
return defaultReadCachingStrategy;
}
public CachingStrategy getDefaultWriteCachingStrategy() {
return defaultWriteCachingStrategy;
}
}

View File

@ -54,6 +54,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
public static final String DFS_CLIENT_CACHE_DROP_BEHIND_WRITES = "dfs.client.cache.drop.behind.writes";
public static final String DFS_CLIENT_CACHE_DROP_BEHIND_READS = "dfs.client.cache.drop.behind.reads";
public static final String DFS_CLIENT_CACHE_READAHEAD = "dfs.client.cache.readahead";
public static final String DFS_HDFS_BLOCKS_METADATA_ENABLED = "dfs.datanode.hdfs-blocks-metadata.enabled";
public static final boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
public static final String DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS = "dfs.client.file-block-storage-locations.num-threads";

View File

@ -36,6 +36,8 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
@ -50,6 +52,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@ -65,7 +68,8 @@ import com.google.common.annotations.VisibleForTesting;
* negotiation of the namenode and various datanodes as necessary.
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream implements ByteBufferReadable {
public class DFSInputStream extends FSInputStream
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
@VisibleForTesting
static boolean tcpReadsDisabledForTesting = false;
private final PeerCache peerCache;
@ -80,6 +84,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
private LocatedBlock currentLocatedBlock = null;
private long pos = 0;
private long blockEnd = -1;
private CachingStrategy cachingStrategy;
private final ReadStatistics readStatistics = new ReadStatistics();
public static class ReadStatistics {
@ -185,6 +190,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
this.fileInputStreamCache = new FileInputStreamCache(
dfsClient.getConf().shortCircuitStreamsCacheSize,
dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
this.cachingStrategy =
dfsClient.getDefaultReadCachingStrategy().duplicate();
openInfo();
}
@ -1035,7 +1042,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads);
allowShortCircuitLocalReads, cachingStrategy);
return reader;
} catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@ -1058,7 +1065,7 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache,
allowShortCircuitLocalReads);
allowShortCircuitLocalReads, cachingStrategy);
return reader;
} catch (IOException e) {
DFSClient.LOG.warn("failed to connect to " + domSock, e);
@ -1081,7 +1088,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
reader = BlockReaderFactory.newBlockReader(
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false);
dsFactory, peerCache, fileInputStreamCache, false,
cachingStrategy);
return reader;
} catch (IOException ex) {
DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@ -1100,7 +1108,8 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
return BlockReaderFactory.newBlockReader(
dfsClient.getConf(), file, block, blockToken, startOffset,
len, verifyChecksum, clientName, peer, chosenNode,
dsFactory, peerCache, fileInputStreamCache, false);
dsFactory, peerCache, fileInputStreamCache, false,
cachingStrategy);
}
@ -1358,4 +1367,30 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
public synchronized ReadStatistics getReadStatistics() {
return new ReadStatistics(readStatistics);
}
private synchronized void closeCurrentBlockReader() {
if (blockReader == null) return;
// Close the current block reader so that the new caching settings can
// take effect immediately.
try {
blockReader.close();
} catch (IOException e) {
DFSClient.LOG.error("error closing blockReader", e);
}
blockReader = null;
}
@Override
public synchronized void setReadahead(Long readahead)
throws IOException {
this.cachingStrategy.setReadahead(readahead);
closeCurrentBlockReader();
}
@Override
public synchronized void setDropBehind(Boolean dropBehind)
throws IOException {
this.cachingStrategy.setDropBehind(dropBehind);
closeCurrentBlockReader();
}
}

View File

@ -40,6 +40,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSOutputSummer;
import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -71,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
import org.apache.hadoop.io.EnumSetWritable;
@ -83,6 +85,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Time;
import org.mortbay.log.Log;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.CacheBuilder;
@ -115,7 +118,8 @@ import com.google.common.cache.RemovalNotification;
* starts sending packets from the dataQueue.
****************************************************************/
@InterfaceAudience.Private
public class DFSOutputStream extends FSOutputSummer implements Syncable {
public class DFSOutputStream extends FSOutputSummer
implements Syncable, CanSetDropBehind {
private final DFSClient dfsClient;
private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
private Socket s;
@ -147,6 +151,7 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
private Progressable progress;
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
private CachingStrategy cachingStrategy;
private class Packet {
long seqno; // sequencenumber of buffer in block
@ -1143,7 +1148,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
// send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
cachingStrategy);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@ -1340,6 +1346,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
this.blockSize = stat.getBlockSize();
this.blockReplication = stat.getReplication();
this.progress = progress;
this.cachingStrategy =
dfsClient.getDefaultWriteCachingStrategy().duplicate();
if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug(
"Set non-null progress callback on DFSOutputStream " + src);
@ -1937,4 +1945,8 @@ public class DFSOutputStream extends FSOutputSummer implements Syncable {
return streamer.getBlockToken();
}
@Override
public void setDropBehind(Boolean dropBehind) throws IOException {
this.cachingStrategy.setDropBehind(dropBehind);
}
}

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
@ -381,13 +382,14 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
int bufferSize, boolean verifyChecksum,
String clientName, Peer peer,
DatanodeID datanodeID,
PeerCache peerCache)
throws IOException {
PeerCache peerCache,
CachingStrategy cachingStrategy)
throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum);
verifyChecksum, cachingStrategy);
//
// Get bytes in block, set streams

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -375,12 +376,13 @@ public class RemoteBlockReader2 implements BlockReader {
boolean verifyChecksum,
String clientName,
Peer peer, DatanodeID datanodeID,
PeerCache peerCache) throws IOException {
PeerCache peerCache,
CachingStrategy cachingStrategy) throws IOException {
// in and out will be closed when sock is closed (by the caller)
final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
peer.getOutputStream()));
new Sender(out).readBlock(block, blockToken, clientName, startOffset, len,
verifyChecksum);
verifyChecksum, cachingStrategy);
//
// Get bytes in block

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -57,13 +58,15 @@ public interface DataTransferProtocol {
* @param length maximum number of bytes for this read.
* @param sendChecksum if false, the DN should skip reading and sending
* checksums
* @param cachingStrategy The caching strategy to use.
*/
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
final boolean sendChecksum) throws IOException;
final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException;
/**
* Write a block to a datanode pipeline.
@ -89,7 +92,8 @@ public interface DataTransferProtocol {
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
final DataChecksum requestedChecksum) throws IOException;
final DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy) throws IOException;
/**
* Transfer a block to another datanode.

View File

@ -31,8 +31,10 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
/** Receiver */
@InterfaceAudience.Private
@ -85,6 +87,14 @@ public abstract class Receiver implements DataTransferProtocol {
}
}
static private CachingStrategy getCachingStrategy(CachingStrategyProto strategy) {
Boolean dropBehind = strategy.hasDropBehind() ?
strategy.getDropBehind() : null;
Long readahead = strategy.hasReadahead() ?
strategy.getReadahead() : null;
return new CachingStrategy(dropBehind, readahead);
}
/** Receive OP_READ_BLOCK */
private void opReadBlock() throws IOException {
OpReadBlockProto proto = OpReadBlockProto.parseFrom(vintPrefixed(in));
@ -93,7 +103,10 @@ public abstract class Receiver implements DataTransferProtocol {
proto.getHeader().getClientName(),
proto.getOffset(),
proto.getLen(),
proto.getSendChecksums());
proto.getSendChecksums(),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
}
/** Receive OP_WRITE_BLOCK */
@ -108,7 +121,10 @@ public abstract class Receiver implements DataTransferProtocol {
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
proto.getLatestGenerationStamp(),
fromProto(proto.getRequestedChecksum()));
fromProto(proto.getRequestedChecksum()),
(proto.hasCachingStrategy() ?
getCachingStrategy(proto.getCachingStrategy()) :
CachingStrategy.newDefaultStrategy()));
}
/** Receive {@link Op#TRANSFER_BLOCK} */

View File

@ -35,9 +35,11 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpTransferBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -72,19 +74,32 @@ public class Sender implements DataTransferProtocol {
out.flush();
}
static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) {
CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder();
if (cachingStrategy.getReadahead() != null) {
builder.setReadahead(cachingStrategy.getReadahead().longValue());
}
if (cachingStrategy.getDropBehind() != null) {
builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue());
}
return builder.build();
}
@Override
public void readBlock(final ExtendedBlock blk,
final Token<BlockTokenIdentifier> blockToken,
final String clientName,
final long blockOffset,
final long length,
final boolean sendChecksum) throws IOException {
final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
OpReadBlockProto proto = OpReadBlockProto.newBuilder()
.setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken))
.setOffset(blockOffset)
.setLen(length)
.setSendChecksums(sendChecksum)
.setCachingStrategy(getCachingStrategy(cachingStrategy))
.build();
send(out, Op.READ_BLOCK, proto);
@ -102,7 +117,8 @@ public class Sender implements DataTransferProtocol {
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum) throws IOException {
DataChecksum requestedChecksum,
final CachingStrategy cachingStrategy) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
@ -117,7 +133,8 @@ public class Sender implements DataTransferProtocol {
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
.setLatestGenerationStamp(latestGenerationStamp)
.setRequestedChecksum(checksumProto);
.setRequestedChecksum(checksumProto)
.setCachingStrategy(getCachingStrategy(cachingStrategy));
if (source != null) {
proto.setSource(PBHelper.convertDatanodeInfo(source));

View File

@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.hdfs.web.resources.DelegationParam;
@ -217,7 +218,7 @@ public class JspHelper {
"JspHelper", TcpPeerServer.peerFromSocketAndKey(s, encryptionKey),
new DatanodeID(addr.getAddress().getHostAddress(),
addr.getHostName(), poolId, addr.getPort(), 0, 0), null,
null, null, false);
null, null, false, CachingStrategy.newDefaultStrategy());
final byte[] buf = new byte[amtToRead];
int readOffset = 0;

View File

@ -417,7 +417,7 @@ class BlockPoolSliceScanner {
adjustThrottler();
blockSender = new BlockSender(block, 0, -1, false, true, true,
datanode, null);
datanode, null, CachingStrategy.newDropBehind());
DataOutputStream out =
new DataOutputStream(new IOUtils.NullOutputStream());

View File

@ -52,6 +52,8 @@ import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
/** A class that receives a block and writes to its own disk, meanwhile
* may copies it to another site. If a throttler is provided,
* streaming throttling is also supported.
@ -60,7 +62,8 @@ class BlockReceiver implements Closeable {
public static final Log LOG = DataNode.LOG;
static final Log ClientTraceLog = DataNode.ClientTraceLog;
private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
@VisibleForTesting
static long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private DataInputStream in = null; // from where data are read
private DataChecksum clientChecksum; // checksum used by client
@ -96,8 +99,8 @@ class BlockReceiver implements Closeable {
// Cache management state
private boolean dropCacheBehindWrites;
private long lastCacheManagementOffset = 0;
private boolean syncBehindWrites;
private long lastCacheDropOffset = 0;
/** The client name. It is empty if a datanode is the client */
private final String clientname;
@ -119,8 +122,8 @@ class BlockReceiver implements Closeable {
final BlockConstructionStage stage,
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum)
throws IOException {
final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy) throws IOException {
try{
this.block = block;
this.in = in;
@ -145,6 +148,7 @@ class BlockReceiver implements Closeable {
+ "\n isClient =" + isClient + ", clientname=" + clientname
+ "\n isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+ "\n inAddr=" + inAddr + ", myAddr=" + myAddr
+ "\n cachingStrategy = " + cachingStrategy
);
}
@ -191,7 +195,9 @@ class BlockReceiver implements Closeable {
" while receiving block " + block + " from " + inAddr);
}
}
this.dropCacheBehindWrites = datanode.getDnConf().dropCacheBehindWrites;
this.dropCacheBehindWrites = (cachingStrategy.getDropBehind() == null) ?
datanode.getDnConf().dropCacheBehindWrites :
cachingStrategy.getDropBehind();
this.syncBehindWrites = datanode.getDnConf().syncBehindWrites;
final boolean isCreate = isDatanode || isTransfer
@ -597,7 +603,7 @@ class BlockReceiver implements Closeable {
datanode.metrics.incrBytesWritten(len);
dropOsCacheBehindWriter(offsetInBlock);
manageWriterOsCache(offsetInBlock);
}
} catch (IOException iex) {
datanode.checkDiskError(iex);
@ -619,25 +625,44 @@ class BlockReceiver implements Closeable {
return lastPacketInBlock?-1:len;
}
private void dropOsCacheBehindWriter(long offsetInBlock) {
private void manageWriterOsCache(long offsetInBlock) {
try {
if (outFd != null &&
offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
NativeIO.POSIX.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
NativeIO.POSIX.POSIX_FADV_DONTNEED);
}
offsetInBlock > lastCacheManagementOffset + CACHE_DROP_LAG_BYTES) {
//
// For SYNC_FILE_RANGE_WRITE, we want to sync from
// lastCacheManagementOffset to a position "two windows ago"
//
// <========= sync ===========>
// +-----------------------O--------------------------X
// start last curPos
// of file
//
if (syncBehindWrites) {
NativeIO.POSIX.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES,
NativeIO.POSIX.syncFileRangeIfPossible(outFd,
lastCacheManagementOffset,
offsetInBlock - lastCacheManagementOffset,
NativeIO.POSIX.SYNC_FILE_RANGE_WRITE);
}
lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
//
// For POSIX_FADV_DONTNEED, we want to drop from the beginning
// of the file to a position prior to the current position.
//
// <=== drop =====>
// <---W--->
// +--------------+--------O--------------------------X
// start dropPos last curPos
// of file
//
long dropPos = lastCacheManagementOffset - CACHE_DROP_LAG_BYTES;
if (dropPos > 0 && dropCacheBehindWrites) {
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
outFd, 0, dropPos, NativeIO.POSIX.POSIX_FADV_DONTNEED);
}
lastCacheManagementOffset = offsetInBlock;
}
} catch (Throwable t) {
LOG.warn("Couldn't drop os cache behind writer for " + block, t);
LOG.warn("Error managing cache for writer of block " + block, t);
}
}

View File

@ -45,6 +45,7 @@ import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.SocketOutputStream;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
@ -141,13 +142,22 @@ class BlockSender implements java.io.Closeable {
// Cache-management related fields
private final long readaheadLength;
private boolean shouldDropCacheBehindRead;
private ReadaheadRequest curReadahead;
private final boolean alwaysReadahead;
private final boolean dropCacheBehindLargeReads;
private final boolean dropCacheBehindAllReads;
private long lastCacheDropOffset;
private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
@VisibleForTesting
static long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
/**
* Minimum length of read below which management of the OS
* buffer cache is disabled.
* See {{@link BlockSender#isLongRead()}
*/
private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
@ -167,16 +177,42 @@ class BlockSender implements java.io.Closeable {
*/
BlockSender(ExtendedBlock block, long startOffset, long length,
boolean corruptChecksumOk, boolean verifyChecksum,
boolean sendChecksum,
DataNode datanode, String clientTraceFmt)
boolean sendChecksum, DataNode datanode, String clientTraceFmt,
CachingStrategy cachingStrategy)
throws IOException {
try {
this.block = block;
this.corruptChecksumOk = corruptChecksumOk;
this.verifyChecksum = verifyChecksum;
this.clientTraceFmt = clientTraceFmt;
this.readaheadLength = datanode.getDnConf().readaheadLength;
this.shouldDropCacheBehindRead = datanode.getDnConf().dropCacheBehindReads;
/*
* If the client asked for the cache to be dropped behind all reads,
* we honor that. Otherwise, we use the DataNode defaults.
* When using DataNode defaults, we use a heuristic where we only
* drop the cache for large reads.
*/
if (cachingStrategy.getDropBehind() == null) {
this.dropCacheBehindAllReads = false;
this.dropCacheBehindLargeReads =
datanode.getDnConf().dropCacheBehindReads;
} else {
this.dropCacheBehindAllReads =
this.dropCacheBehindLargeReads =
cachingStrategy.getDropBehind().booleanValue();
}
/*
* Similarly, if readahead was explicitly requested, we always do it.
* Otherwise, we read ahead based on the DataNode settings, and only
* when the reads are large.
*/
if (cachingStrategy.getReadahead() == null) {
this.alwaysReadahead = false;
this.readaheadLength = datanode.getDnConf().readaheadLength;
} else {
this.alwaysReadahead = true;
this.readaheadLength = cachingStrategy.getReadahead().longValue();
}
this.datanode = datanode;
if (verifyChecksum) {
@ -335,10 +371,11 @@ class BlockSender implements java.io.Closeable {
*/
@Override
public void close() throws IOException {
if (blockInFd != null && shouldDropCacheBehindRead && isLongRead()) {
// drop the last few MB of the file from cache
if (blockInFd != null &&
((dropCacheBehindAllReads) ||
(dropCacheBehindLargeReads && isLongRead()))) {
try {
NativeIO.POSIX.posixFadviseIfPossible(
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset,
NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Exception e) {
@ -637,7 +674,7 @@ class BlockSender implements java.io.Closeable {
if (isLongRead() && blockInFd != null) {
// Advise that this file descriptor will be accessed sequentially.
NativeIO.POSIX.posixFadviseIfPossible(
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, 0, 0, NativeIO.POSIX.POSIX_FADV_SEQUENTIAL);
}
@ -705,37 +742,47 @@ class BlockSender implements java.io.Closeable {
* and drop-behind.
*/
private void manageOsCache() throws IOException {
if (!isLongRead() || blockInFd == null) {
// don't manage cache manually for short-reads, like
// HBase random read workloads.
return;
}
// We can't manage the cache for this block if we don't have a file
// descriptor to work with.
if (blockInFd == null) return;
// Perform readahead if necessary
if (readaheadLength > 0 && datanode.readaheadPool != null) {
if ((readaheadLength > 0) && (datanode.readaheadPool != null) &&
(alwaysReadahead || isLongRead())) {
curReadahead = datanode.readaheadPool.readaheadStream(
clientTraceFmt, blockInFd,
offset, readaheadLength, Long.MAX_VALUE,
clientTraceFmt, blockInFd, offset, readaheadLength, Long.MAX_VALUE,
curReadahead);
}
// Drop what we've just read from cache, since we aren't
// likely to need it again
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
if (shouldDropCacheBehindRead &&
offset >= nextCacheDropOffset) {
long dropLength = offset - lastCacheDropOffset;
if (dropLength >= 1024) {
NativeIO.POSIX.posixFadviseIfPossible(blockInFd,
lastCacheDropOffset, dropLength,
if (dropCacheBehindAllReads ||
(dropCacheBehindLargeReads && isLongRead())) {
long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
if (offset >= nextCacheDropOffset) {
long dropLength = offset - lastCacheDropOffset;
NativeIO.POSIX.posixFadviseIfPossible(block.getBlockName(),
blockInFd, lastCacheDropOffset, dropLength,
NativeIO.POSIX.POSIX_FADV_DONTNEED);
lastCacheDropOffset = offset;
}
lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
}
}
/**
* Returns true if we have done a long enough read for this block to qualify
* for the DataNode-wide cache management defaults. We avoid applying the
* cache management defaults to smaller reads because the overhead would be
* too high.
*
* Note that if the client explicitly asked for dropBehind, we will do it
* even on short reads.
*
* This is also used to determine when to invoke
* posix_fadvise(POSIX_FADV_SEQUENTIAL).
*/
private boolean isLongRead() {
return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
return (endOffset - initialOffset) > LONG_READ_THRESHOLD_BYTES;
}
/**

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
/**
* The caching strategy we should use for an HDFS read or write operation.
*/
public class CachingStrategy {
private Boolean dropBehind; // null = use server defaults
private Long readahead; // null = use server defaults
public static CachingStrategy newDefaultStrategy() {
return new CachingStrategy(null, null);
}
public static CachingStrategy newDropBehind() {
return new CachingStrategy(true, null);
}
public CachingStrategy duplicate() {
return new CachingStrategy(this.dropBehind, this.readahead);
}
public CachingStrategy(Boolean dropBehind, Long readahead) {
this.dropBehind = dropBehind;
this.readahead = readahead;
}
public Boolean getDropBehind() {
return dropBehind;
}
public void setDropBehind(Boolean dropBehind) {
this.dropBehind = dropBehind;
}
public Long getReadahead() {
return readahead;
}
public void setReadahead(Long readahead) {
this.readahead = readahead;
}
public String toString() {
return "CachingStrategy(dropBehind=" + dropBehind +
", readahead=" + readahead + ")";
}
}

View File

@ -1519,6 +1519,7 @@ public class DataNode extends Configured
final BlockConstructionStage stage;
final private DatanodeRegistration bpReg;
final String clientname;
final CachingStrategy cachingStrategy;
/**
* Connect to the first item in the target list. Pass along the
@ -1539,6 +1540,8 @@ public class DataNode extends Configured
BPOfferService bpos = blockPoolManager.get(b.getBlockPoolId());
bpReg = bpos.bpRegistration;
this.clientname = clientname;
this.cachingStrategy =
new CachingStrategy(true, getDnConf().readaheadLength);
}
/**
@ -1581,7 +1584,7 @@ public class DataNode extends Configured
HdfsConstants.SMALL_BUFFER_SIZE));
in = new DataInputStream(unbufIn);
blockSender = new BlockSender(b, 0, b.getNumBytes(),
false, false, true, DataNode.this, null);
false, false, true, DataNode.this, null, cachingStrategy);
DatanodeInfo srcNode = new DatanodeInfo(bpReg);
//
@ -1594,7 +1597,7 @@ public class DataNode extends Configured
}
new Sender(out).writeBlock(b, accessToken, clientname, targets, srcNode,
stage, 0, 0, 0, 0, blockSender.getChecksum());
stage, 0, 0, 0, 0, blockSender.getChecksum(), cachingStrategy);
// send data & checksum
blockSender.sendBlock(out, unbufOut, null);

View File

@ -299,7 +299,8 @@ class DataXceiver extends Receiver implements Runnable {
final String clientName,
final long blockOffset,
final long length,
final boolean sendChecksum) throws IOException {
final boolean sendChecksum,
final CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientName;
OutputStream baseStream = getOutputStream();
@ -324,7 +325,8 @@ class DataXceiver extends Receiver implements Runnable {
try {
try {
blockSender = new BlockSender(block, blockOffset, length,
true, false, sendChecksum, datanode, clientTraceFmt);
true, false, sendChecksum, datanode, clientTraceFmt,
cachingStrategy);
} catch(IOException e) {
String msg = "opReadBlock " + block + " received exception " + e;
LOG.info(msg);
@ -393,7 +395,8 @@ class DataXceiver extends Receiver implements Runnable {
final long minBytesRcvd,
final long maxBytesRcvd,
final long latestGenerationStamp,
DataChecksum requestedChecksum) throws IOException {
DataChecksum requestedChecksum,
CachingStrategy cachingStrategy) throws IOException {
previousOpClientName = clientname;
updateCurrentThreadName("Receiving block " + block);
final boolean isDatanode = clientname.length() == 0;
@ -452,7 +455,8 @@ class DataXceiver extends Receiver implements Runnable {
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum);
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
} else {
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
}
@ -497,7 +501,8 @@ class DataXceiver extends Receiver implements Runnable {
new Sender(mirrorOut).writeBlock(originalBlock, blockToken,
clientname, targets, srcDataNode, stage, pipelineSize,
minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum);
minBytesRcvd, maxBytesRcvd, latestGenerationStamp, requestedChecksum,
cachingStrategy);
mirrorOut.flush();
@ -715,7 +720,7 @@ class DataXceiver extends Receiver implements Runnable {
try {
// check if the block exists or not
blockSender = new BlockSender(block, 0, -1, false, false, true, datanode,
null);
null, CachingStrategy.newDropBehind());
// set up response stream
OutputStream baseStream = getOutputStream();
@ -846,7 +851,8 @@ class DataXceiver extends Receiver implements Runnable {
blockReceiver = new BlockReceiver(
block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum);
null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind());
// receive a block
blockReceiver.receiveBlock(null, null, null, null,

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.NodeBase;
@ -569,8 +570,8 @@ public class NamenodeFsck {
blockReader = BlockReaderFactory.newBlockReader(dfs.getConf(),
file, block, lblock.getBlockToken(), 0, -1, true, "fsck",
TcpPeerServer.peerFromSocketAndKey(s, namenode.getRpcServer().
getDataEncryptionKey()),
chosenNode, null, null, null, false);
getDataEncryptionKey()), chosenNode, null, null, null,
false, CachingStrategy.newDropBehind());
} catch (IOException ex) {
// Put chosen node into dead list, continue

View File

@ -54,11 +54,17 @@ message ClientOperationHeaderProto {
required string clientName = 2;
}
message CachingStrategyProto {
optional bool dropBehind = 1;
optional int64 readahead = 2;
}
message OpReadBlockProto {
required ClientOperationHeaderProto header = 1;
required uint64 offset = 2;
required uint64 len = 3;
optional bool sendChecksums = 4 [default = true];
optional CachingStrategyProto cachingStrategy = 5;
}
@ -100,6 +106,7 @@ message OpWriteBlockProto {
* The requested checksum mechanism for this block write.
*/
required ChecksumProto requestedChecksum = 9;
optional CachingStrategyProto cachingStrategy = 10;
}
message OpTransferBlockProto {

View File

@ -1307,4 +1307,49 @@
</description>
</property>
<property>
<name>dfs.client.cache.drop.behind.writes</name>
<value></value>
<description>
Just like dfs.datanode.drop.cache.behind.writes, this setting causes the
page cache to be dropped behind HDFS writes, potentially freeing up more
memory for other uses. Unlike dfs.datanode.drop.cache.behind.writes, this
is a client-side setting rather than a setting for the entire datanode.
If present, this setting will override the DataNode default.
If the native libraries are not available to the DataNode, this
configuration has no effect.
</description>
</property>
<property>
<name>dfs.client.cache.drop.behind.reads</name>
<value></value>
<description>
Just like dfs.datanode.drop.cache.behind.reads, this setting causes the
page cache to be dropped behind HDFS reads, potentially freeing up more
memory for other uses. Unlike dfs.datanode.drop.cache.behind.reads, this
is a client-side setting rather than a setting for the entire datanode. If
present, this setting will override the DataNode default.
If the native libraries are not available to the DataNode, this
configuration has no effect.
</description>
</property>
<property>
<name>dfs.client.cache.readahead</name>
<value></value>
<description>
Just like dfs.datanode.readahead.bytes, this setting causes the datanode to
read ahead in the block file using posix_fadvise, potentially decreasing
I/O wait times. Unlike dfs.datanode.readahead.bytes, this is a client-side
setting rather than a setting for the entire datanode. If present, this
setting will override the DataNode default.
If the native libraries are not available to the DataNode, this
configuration has no effect.
</description>
</property>
</configuration>

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.net.NetUtils;
@ -155,7 +156,7 @@ public class BlockReaderTestUtil {
testBlock.getBlockToken(),
offset, lenToRead,
true, "BlockReaderTestUtil", TcpPeerServer.peerFromSocket(sock),
nodes[0], null, null, null, false);
nodes[0], null, null, null, false, CachingStrategy.newDefaultStrategy());
}
/**

View File

@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumIn
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
@ -194,7 +195,7 @@ public class TestDataTransferProtocol {
sender.writeBlock(block, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null, stage,
0, block.getNumBytes(), block.getNumBytes(), newGS,
DEFAULT_CHECKSUM);
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
if (eofExcepted) {
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData(description, true);
@ -391,7 +392,7 @@ public class TestDataTransferProtocol {
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE,
0, 0L, 0L, 0L,
badChecksum);
badChecksum, CachingStrategy.newDefaultStrategy());
recvBuf.reset();
sendResponse(Status.ERROR, null, null, recvOut);
sendRecvData("wrong bytesPerChecksum while writing", true);
@ -402,7 +403,7 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
DEFAULT_CHECKSUM);
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
PacketHeader hdr = new PacketHeader(
4, // size of packet
@ -425,7 +426,7 @@ public class TestDataTransferProtocol {
BlockTokenSecretManager.DUMMY_TOKEN, "cl",
new DatanodeInfo[1], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 0, 0L, 0L, 0L,
DEFAULT_CHECKSUM);
DEFAULT_CHECKSUM, CachingStrategy.newDefaultStrategy());
hdr = new PacketHeader(
8, // size of packet
@ -452,21 +453,21 @@ public class TestDataTransferProtocol {
recvBuf.reset();
blk.setBlockId(blkid-1);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen, true);
0L, fileLen, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Wrong block ID " + newBlockId + " for read", false);
// negative block start offset -1L
sendBuf.reset();
blk.setBlockId(blkid);
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
-1L, fileLen, true);
-1L, fileLen, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Negative start-offset for read for block " +
firstBlock.getBlockId(), false);
// bad block start offset
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
fileLen, fileLen, true);
fileLen, fileLen, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Wrong start-offset for reading block " +
firstBlock.getBlockId(), false);
@ -483,7 +484,8 @@ public class TestDataTransferProtocol {
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, -1L-random.nextInt(oneMil), true);
0L, -1L-random.nextInt(oneMil), true,
CachingStrategy.newDefaultStrategy());
sendRecvData("Negative length for reading block " +
firstBlock.getBlockId(), false);
@ -496,14 +498,14 @@ public class TestDataTransferProtocol {
recvOut);
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen+1, true);
0L, fileLen+1, true, CachingStrategy.newDefaultStrategy());
sendRecvData("Wrong length for reading block " +
firstBlock.getBlockId(), false);
//At the end of all this, read the file to make sure that succeeds finally.
sendBuf.reset();
sender.readBlock(blk, BlockTokenSecretManager.DUMMY_TOKEN, "cl",
0L, fileLen, true);
0L, fileLen, true, CachingStrategy.newDefaultStrategy());
readFile(fileSys, file, fileLen);
} finally {
cluster.shutdown();

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.security.token.block.SecurityTestUtil;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.net.NetUtils;
@ -148,7 +149,8 @@ public class TestBlockTokenWithDFS {
blockReader = BlockReaderFactory.newBlockReader(
new DFSClient.Conf(conf), file, block, lblock.getBlockToken(), 0, -1,
true, "TestBlockTokenWithDFS", TcpPeerServer.peerFromSocket(s),
nodes[0], null, null, null, false);
nodes[0], null, null, null, false,
CachingStrategy.newDefaultStrategy());
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException) {

View File

@ -0,0 +1,369 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.util.Arrays;
import java.util.Map;
import java.util.TreeMap;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheTracker;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestCachingStrategy {
private static final Log LOG = LogFactory.getLog(TestCachingStrategy.class);
private static int MAX_TEST_FILE_LEN = 1024 * 1024;
private static int WRITE_PACKET_SIZE = DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
private final static TestRecordingCacheTracker tracker =
new TestRecordingCacheTracker();
@BeforeClass
public static void setupTest() {
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
// Track calls to posix_fadvise.
NativeIO.POSIX.cacheTracker = tracker;
// Normally, we wait for a few megabytes of data to be read or written
// before dropping the cache. This is to avoid an excessive number of
// JNI calls to the posix_fadvise function. However, for the purpose
// of this test, we want to use small files and see all fadvise calls
// happen.
BlockSender.CACHE_DROP_INTERVAL_BYTES = 4096;
BlockReceiver.CACHE_DROP_LAG_BYTES = 4096;
}
private static class Stats {
private final String fileName;
private final boolean dropped[] = new boolean[MAX_TEST_FILE_LEN];
Stats(String fileName) {
this.fileName = fileName;
}
synchronized void fadvise(int offset, int len, int flags) {
LOG.debug("got fadvise(offset=" + offset + ", len=" + len +
",flags=" + flags + ")");
if (flags == NativeIO.POSIX.POSIX_FADV_DONTNEED) {
for (int i = 0; i < (int)len; i++) {
dropped[(int)(offset + i)] = true;
}
}
}
synchronized void assertNotDroppedInRange(int start, int end) {
for (int i = start; i < end; i++) {
if (dropped[i]) {
throw new RuntimeException("in file " + fileName + ", we " +
"dropped the cache at offset " + i);
}
}
}
synchronized void assertDroppedInRange(int start, int end) {
for (int i = start; i < end; i++) {
if (!dropped[i]) {
throw new RuntimeException("in file " + fileName + ", we " +
"did not drop the cache at offset " + i);
}
}
}
synchronized void clear() {
Arrays.fill(dropped, false);
}
}
private static class TestRecordingCacheTracker implements CacheTracker {
private final Map<String, Stats> map = new TreeMap<String, Stats>();
@Override
synchronized public void fadvise(String name,
long offset, long len, int flags) {
if ((len < 0) || (len > Integer.MAX_VALUE)) {
throw new RuntimeException("invalid length of " + len +
" passed to posixFadviseIfPossible");
}
if ((offset < 0) || (offset > Integer.MAX_VALUE)) {
throw new RuntimeException("invalid offset of " + offset +
" passed to posixFadviseIfPossible");
}
Stats stats = map.get(name);
if (stats == null) {
stats = new Stats(name);
map.put(name, stats);
}
stats.fadvise((int)offset, (int)len, flags);
}
synchronized void clear() {
map.clear();
}
synchronized Stats getStats(String fileName) {
return map.get(fileName);
}
synchronized public String toString() {
StringBuilder bld = new StringBuilder();
bld.append("TestRecordingCacheManipulator{");
String prefix = "";
for (String fileName : map.keySet()) {
bld.append(prefix);
prefix = ", ";
bld.append(fileName);
}
bld.append("}");
return bld.toString();
}
}
static void createHdfsFile(FileSystem fs, Path p, long length,
Boolean dropBehind) throws Exception {
FSDataOutputStream fos = null;
try {
// create file with replication factor of 1
fos = fs.create(p, (short)1);
if (dropBehind != null) {
fos.setDropBehind(dropBehind);
}
byte buf[] = new byte[8196];
while (length > 0) {
int amt = (length > buf.length) ? (int)buf.length : (int)length;
fos.write(buf, 0, amt);
length -= amt;
}
} catch (IOException e) {
LOG.error("ioexception", e);
} finally {
if (fos != null) {
fos.close();
}
}
}
static long readHdfsFile(FileSystem fs, Path p, long length,
Boolean dropBehind) throws Exception {
FSDataInputStream fis = null;
long totalRead = 0;
try {
fis = fs.open(p);
if (dropBehind != null) {
fis.setDropBehind(dropBehind);
}
byte buf[] = new byte[8196];
while (length > 0) {
int amt = (length > buf.length) ? (int)buf.length : (int)length;
int ret = fis.read(buf, 0, amt);
if (ret == -1) {
return totalRead;
}
totalRead += ret;
length -= ret;
}
} catch (IOException e) {
LOG.error("ioexception", e);
} finally {
if (fis != null) {
fis.close();
}
}
throw new RuntimeException("unreachable");
}
@Test(timeout=120000)
public void testFadviseAfterWriteThenRead() throws Exception {
// start a cluster
LOG.info("testFadviseAfterWriteThenRead");
tracker.clear();
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create new file
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, true);
// verify that we dropped everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
// read file
readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, true);
// verify that we dropped everything from the cache.
Assert.assertNotNull(stats);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
/***
* Test the scenario where the DataNode defaults to not dropping the cache,
* but our client defaults are set.
*/
@Test(timeout=120000)
public void testClientDefaults() throws Exception {
// start a cluster
LOG.info("testClientDefaults");
tracker.clear();
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, false);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_READS, true);
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_CACHE_DROP_BEHIND_WRITES, true);
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create new file
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
// verify that we dropped everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
// read file
readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, null);
// verify that we dropped everything from the cache.
Assert.assertNotNull(stats);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test(timeout=120000)
public void testFadviseSkippedForSmallReads() throws Exception {
// start a cluster
LOG.info("testFadviseSkippedForSmallReads");
tracker.clear();
Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, true);
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
FSDataInputStream fis = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create new file
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, null);
// Since the DataNode was configured with drop-behind, and we didn't
// specify any policy, we should have done drop-behind.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
stats.assertDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
stats.clear();
stats.assertNotDroppedInRange(0, TEST_PATH_LEN);
// read file
fis = fs.open(new Path(TEST_PATH));
byte buf[] = new byte[17];
fis.readFully(4096, buf, 0, buf.length);
// we should not have dropped anything because of the small read.
stats = tracker.getStats(fadvisedFileName);
stats.assertNotDroppedInRange(0, TEST_PATH_LEN - WRITE_PACKET_SIZE);
} finally {
IOUtils.cleanup(null, fis);
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test(timeout=120000)
public void testNoFadviseAfterWriteThenRead() throws Exception {
// start a cluster
LOG.info("testNoFadviseAfterWriteThenRead");
tracker.clear();
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
String TEST_PATH = "/test";
int TEST_PATH_LEN = MAX_TEST_FILE_LEN;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1)
.build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
// create new file
createHdfsFile(fs, new Path(TEST_PATH), TEST_PATH_LEN, false);
// verify that we did not drop everything from the cache during file creation.
ExtendedBlock block = cluster.getNameNode().getRpcServer().getBlockLocations(
TEST_PATH, 0, Long.MAX_VALUE).get(0).getBlock();
String fadvisedFileName = MiniDFSCluster.getBlockFile(0, block).getName();
Stats stats = tracker.getStats(fadvisedFileName);
Assert.assertNull(stats);
// read file
readHdfsFile(fs, new Path(TEST_PATH), Long.MAX_VALUE, false);
// verify that we dropped everything from the cache.
Assert.assertNull(stats);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
}

View File

@ -287,7 +287,8 @@ public class TestDataNodeVolumeFailure {
BlockReader blockReader =
BlockReaderFactory.newBlockReader(new DFSClient.Conf(conf), file, block,
lblock.getBlockToken(), 0, -1, true, "TestDataNodeVolumeFailure",
TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false);
TcpPeerServer.peerFromSocket(s), datanode, null, null, null, false,
CachingStrategy.newDefaultStrategy());
blockReader.close();
}

View File

@ -148,7 +148,7 @@ public class TestDiskError {
BlockTokenSecretManager.DUMMY_TOKEN, "",
new DatanodeInfo[0], null,
BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L,
checksum);
checksum, CachingStrategy.newDefaultStrategy());
out.flush();
// close the connection before sending the content of the block

View File

@ -69,7 +69,7 @@ public class FadvisedChunkedFile extends ChunkedFile {
}
if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
try {
NativeIO.POSIX.posixFadviseIfPossible(
NativeIO.POSIX.posixFadviseIfPossible(identifier,
fd,
getStartOffset(), getEndOffset() - getStartOffset(),
NativeIO.POSIX.POSIX_FADV_DONTNEED);

View File

@ -71,7 +71,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
}
if (manageOsCache && getCount() > 0) {
try {
NativeIO.POSIX.posixFadviseIfPossible(
NativeIO.POSIX.posixFadviseIfPossible(identifier,
fd, getPosition(), getCount(),
NativeIO.POSIX.POSIX_FADV_DONTNEED);
} catch (Throwable t) {