HDFS-4953. Enable HDFS local reads via mmap. Contributed by Colin Patrick McCabe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4949@1515906 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2013-08-20 18:07:47 +00:00
parent dd00bb71aa
commit 67f86baab0
30 changed files with 2508 additions and 63 deletions

View File

@ -28,9 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable, Closeable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
implements Seekable, PositionedReadable, Closeable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
SupportsZeroCopy {
public FSDataInputStream(InputStream in)
throws IOException {
super(in);
@ -167,4 +167,15 @@ public class FSDataInputStream extends DataInputStream
"support setting the drop-behind caching setting.");
}
}
@Override
public ZeroCopyCursor createZeroCopyCursor()
throws IOException, ZeroCopyUnavailableException {
try {
return ((SupportsZeroCopy)in).createZeroCopyCursor();
}
catch (ClassCastException e) {
throw new ZeroCopyUnavailableException(e);
}
}
}

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.fs;
import java.io.*;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ZeroCopyUnavailableException;
/****************************************************************
* FSInputStream is a generic old InputStream with a little bit
@ -30,7 +32,7 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.LimitedPrivate({"HDFS"})
@InterfaceStability.Unstable
public abstract class FSInputStream extends InputStream
implements Seekable, PositionedReadable {
implements Seekable, PositionedReadable, SupportsZeroCopy {
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
@ -86,4 +88,11 @@ public abstract class FSInputStream extends InputStream
throws IOException {
readFully(position, buffer, 0, buffer.length);
}
@Override
public ZeroCopyCursor createZeroCopyCursor()
throws IOException, ZeroCopyUnavailableException {
throw new ZeroCopyUnavailableException("zero copy is not implemented " +
"for this filesystem type.");
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Supports zero-copy reads.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface SupportsZeroCopy {
/**
* Get a zero-copy cursor to use for zero-copy reads.
*
* @throws IOException
* If there was an error creating the ZeroCopyCursor
* @throws UnsupportedOperationException
* If this stream does not support zero-copy reads.
* This is used, for example, when one stream wraps another
* which may or may not support ZCR.
*/
public ZeroCopyCursor createZeroCopyCursor()
throws IOException, ZeroCopyUnavailableException;
}

View File

@ -0,0 +1,111 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.Closeable;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* A ZeroCopyCursor allows you to make zero-copy reads.
*
* Cursors should be closed when they are no longer needed.
*
* Example:
* FSDataInputStream fis = fs.open("/file");
* ZeroCopyCursor cursor = fis.createZeroCopyCursor();
* try {
* cursor.read(128);
* ByteBuffer data = cursor.getData();
* processData(data);
* } finally {
* cursor.close();
* }
*/
public interface ZeroCopyCursor extends Closeable {
/**
* Set the fallback buffer used for this zero copy cursor.
* The fallback buffer is used when a true zero-copy read is impossible.
* If there is no fallback buffer, UnsupportedOperationException is thrown
* when a true zero-copy read cannot be done.
*
* @param fallbackBuffer The fallback buffer to set, or null for none.
*/
public void setFallbackBuffer(ByteBuffer fallbackBuffer);
/**
* @return the fallback buffer in use, or null if there is none.
*/
public ByteBuffer getFallbackBuffer();
/**
* @param skipChecksums Whether we should skip checksumming with this
* zero copy cursor.
*/
public void setSkipChecksums(boolean skipChecksums);
/**
* @return Whether we should skip checksumming with this
* zero copy cursor.
*/
public boolean getSkipChecksums();
/**
* @param allowShortReads Whether we should allow short reads.
*/
public void setAllowShortReads(boolean allowShortReads);
/**
* @return Whether we should allow short reads.
*/
public boolean getAllowShortReads();
/**
* Perform a zero-copy read.
*
* @param toRead The minimum number of bytes to read.
* Must not be negative. If we hit EOF before
* reading this many bytes, we will either throw
* EOFException (if allowShortReads = false), or
* return a short read (if allowShortReads = true).
* A short read could be as short as 0 bytes.
* @throws UnsupportedOperationException
* If a true zero-copy read cannot be done, and no fallback
* buffer was set.
* @throws EOFException
* If allowShortReads = false, and we can't read all the bytes
* that were requested. This will never be thrown if
* allowShortReads = true.
* @throws IOException
* If there was an error while reading the data.
*/
public void read(int toRead)
throws UnsupportedOperationException, EOFException, IOException;
/**
* Get the current data buffer.
*
* This buffer will remain valid until either this cursor is closed, or we
* call read() again on this same cursor. You can find the amount of data
* that was read previously by calling ByteBuffer#remaining.
*
* @return The current data buffer.
*/
public ByteBuffer getData();
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
public class ZeroCopyUnavailableException extends IOException {
private static final long serialVersionUID = 0L;
public ZeroCopyUnavailableException(String message) {
super(message);
}
public ZeroCopyUnavailableException(String message, Exception e) {
super(message, e);
}
public ZeroCopyUnavailableException(Exception e) {
super(e);
}
}

View File

@ -330,4 +330,14 @@
<Method name="setDirInternal" />
<Bug pattern="DM_STRING_CTOR" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
<Method name="create" />
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
<Method name="create" />
<Bug pattern="UL_UNRELEASED_LOCK" />
</Match>
</FindBugsFilter>

View File

@ -142,6 +142,7 @@ target_link_libraries(test_native_mini_dfs
)
add_executable(test_libhdfs_threaded
main/native/libhdfs/expect.c
main/native/libhdfs/test_libhdfs_threaded.c
)
target_link_libraries(test_libhdfs_threaded
@ -150,6 +151,16 @@ target_link_libraries(test_libhdfs_threaded
pthread
)
add_executable(test_libhdfs_zerocopy
main/native/libhdfs/expect.c
main/native/libhdfs/test/test_libhdfs_zerocopy.c
)
target_link_libraries(test_libhdfs_zerocopy
hdfs
native_mini_dfs
pthread
)
IF(REQUIRE_LIBWEBHDFS)
add_subdirectory(contrib/libwebhdfs)
ENDIF(REQUIRE_LIBWEBHDFS)

View File

@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
/**
* A BlockReader is responsible for reading a single block
@ -81,4 +83,21 @@ public interface BlockReader extends ByteBufferReadable {
* All short-circuit reads are also local.
*/
boolean isShortCircuit();
/**
* Do a zero-copy read with the current block reader.
*
* We assume that the calling code has done bounds checking, and won't ask
* us for more bytes than are supposed to be visible (or are in the file).
*
* @param buffers The zero-copy buffers object.
* @param curBlock The current block.
* @param blockPos Position in the current block to start reading at.
* @param toRead The number of bytes to read from the block.
*
* @return true if the read was done, false otherwise.
*/
boolean readZeroCopy(HdfsZeroCopyCursor buffers,
LocatedBlock curBlock, long blockPos, int toRead,
ClientMmapManager mmapManager);
}

View File

@ -22,11 +22,15 @@ import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
@ -87,6 +91,8 @@ class BlockReaderLocal implements BlockReader {
private final ExtendedBlock block;
private final FileInputStreamCache fisCache;
private ClientMmap clientMmap;
private boolean mmapDisabled;
private static int getSlowReadBufferNumChunks(int bufSize,
int bytesPerChecksum) {
@ -113,6 +119,8 @@ class BlockReaderLocal implements BlockReader {
this.datanodeID = datanodeID;
this.block = block;
this.fisCache = fisCache;
this.clientMmap = null;
this.mmapDisabled = false;
// read and handle the common header here. For now just a version
checksumIn.getChannel().position(0);
@ -487,6 +495,10 @@ class BlockReaderLocal implements BlockReader {
@Override
public synchronized void close() throws IOException {
if (clientMmap != null) {
clientMmap.unref();
clientMmap = null;
}
if (fisCache != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("putting FileInputStream for " + filename +
@ -534,4 +546,48 @@ class BlockReaderLocal implements BlockReader {
public boolean isShortCircuit() {
return true;
}
@Override
public boolean readZeroCopy(HdfsZeroCopyCursor cursor,
LocatedBlock curBlock, long blockPos, int toRead,
ClientMmapManager mmapManager) {
if (clientMmap == null) {
if (mmapDisabled) {
return false;
}
try {
clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
if (clientMmap == null) {
mmapDisabled = true;
return false;
}
} catch (InterruptedException e) {
LOG.error("Interrupted while setting up mmap for " + filename, e);
Thread.currentThread().interrupt();
return false;
} catch (IOException e) {
LOG.error("unable to set up mmap for " + filename, e);
mmapDisabled = true;
return false;
}
}
long limit = blockPos + toRead;
if (limit > Integer.MAX_VALUE) {
/*
* In Java, ByteBuffers use a 32-bit length, capacity, offset, etc.
* This limits our mmap'ed regions to 2 GB in length.
* TODO: we can implement zero-copy for larger blocks by doing multiple
* mmaps.
*/
mmapDisabled = true;
clientMmap.unref();
clientMmap = null;
return false;
}
ByteBuffer mmapBuffer = clientMmap.getMappedByteBuffer().duplicate();
mmapBuffer.position((int)blockPos);
mmapBuffer.limit((int)limit);
cursor.setMmap(clientMmap, mmapBuffer);
return true;
}
}

View File

@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
@ -701,4 +703,11 @@ class BlockReaderLocalLegacy implements BlockReader {
public boolean isShortCircuit() {
return true;
}
@Override
public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
LocatedBlock curBlock, long blockPos, int toRead,
ClientMmapManager mmapManager) {
return false;
}
}

View File

@ -103,6 +103,7 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -204,7 +205,43 @@ public class DFSClient implements java.io.Closeable {
private boolean shouldUseLegacyBlockReaderLocal;
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
private ClientMmapManager mmapManager;
private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
new ClientMmapManagerFactory();
private static final class ClientMmapManagerFactory {
private ClientMmapManager mmapManager = null;
/**
* Tracks the number of users of mmapManager.
*/
private int refcnt = 0;
synchronized ClientMmapManager get(Configuration conf) {
if (refcnt++ == 0) {
mmapManager = ClientMmapManager.fromConf(conf);
} else {
String mismatches = mmapManager.verifyConfigurationMatches(conf);
if (!mismatches.isEmpty()) {
LOG.warn("The ClientMmapManager settings you specified " +
"have been ignored because another thread created the " +
"ClientMmapManager first. " + mismatches);
}
}
return mmapManager;
}
synchronized void unref(ClientMmapManager mmapManager) {
if (this.mmapManager != mmapManager) {
throw new IllegalArgumentException();
}
if (--refcnt == 0) {
IOUtils.cleanup(LOG, mmapManager);
mmapManager = null;
}
}
}
/**
* DFSClient configuration
*/
@ -513,6 +550,7 @@ public class DFSClient implements java.io.Closeable {
new CachingStrategy(readDropBehind, readahead);
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
}
/**
@ -716,9 +754,12 @@ public class DFSClient implements java.io.Closeable {
/** Abort and release resources held. Ignore all errors. */
void abort() {
if (mmapManager != null) {
MMAP_MANAGER_FACTORY.unref(mmapManager);
mmapManager = null;
}
clientRunning = false;
closeAllFilesBeingWritten(true);
try {
// remove reference to this client and stop the renewer,
// if there is no more clients under the renewer.
@ -762,6 +803,10 @@ public class DFSClient implements java.io.Closeable {
*/
@Override
public synchronized void close() throws IOException {
if (mmapManager != null) {
MMAP_MANAGER_FACTORY.unref(mmapManager);
mmapManager = null;
}
if(clientRunning) {
closeAllFilesBeingWritten(false);
clientRunning = false;
@ -2474,4 +2519,8 @@ public class DFSClient implements java.io.Closeable {
public CachingStrategy getDefaultWriteCachingStrategy() {
return defaultWriteCachingStrategy;
}
ClientMmapManager getMmapManager() {
return mmapManager;
}
}

View File

@ -373,6 +373,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024;
public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT = 15 * 60 * 1000;
public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.timeout.ms";
public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT = 4;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.ZeroCopyCursor;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
@ -92,12 +93,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
this.totalBytesRead = 0;
this.totalLocalBytesRead = 0;
this.totalShortCircuitBytesRead = 0;
this.totalZeroCopyBytesRead = 0;
}
public ReadStatistics(ReadStatistics rhs) {
this.totalBytesRead = rhs.getTotalBytesRead();
this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
}
/**
@ -123,6 +126,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
public long getTotalShortCircuitBytesRead() {
return totalShortCircuitBytesRead;
}
/**
* @return The total number of zero-copy bytes read.
*/
public long getTotalZeroCopyBytesRead() {
return totalZeroCopyBytesRead;
}
/**
* @return The total number of bytes read which were not local.
@ -145,12 +155,21 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
this.totalLocalBytesRead += amt;
this.totalShortCircuitBytesRead += amt;
}
void addZeroCopyBytes(long amt) {
this.totalBytesRead += amt;
this.totalLocalBytesRead += amt;
this.totalShortCircuitBytesRead += amt;
this.totalZeroCopyBytesRead += amt;
}
private long totalBytesRead;
private long totalLocalBytesRead;
private long totalShortCircuitBytesRead;
private long totalZeroCopyBytesRead;
}
private final FileInputStreamCache fileInputStreamCache;
@ -1393,4 +1412,67 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
this.cachingStrategy.setDropBehind(dropBehind);
closeCurrentBlockReader();
}
synchronized void readZeroCopy(HdfsZeroCopyCursor zcursor, int toRead)
throws IOException {
assert(toRead > 0);
if (((blockReader == null) || (blockEnd == -1)) &&
(pos < getFileLength())) {
/*
* If we don't have a blockReader, or the one we have has no more bytes
* left to read, we call seekToBlockSource to get a new blockReader and
* recalculate blockEnd. Note that we assume we're not at EOF here
* (we check this above).
*/
if ((!seekToBlockSource(pos)) || (blockReader == null)) {
throw new IOException("failed to allocate new BlockReader " +
"at position " + pos);
}
}
long curPos = pos;
boolean canSkipChecksums = zcursor.getSkipChecksums();
long blockLeft = blockEnd - curPos + 1;
if (zcursor.getAllowShortReads()) {
if (blockLeft < toRead) {
toRead = (int)blockLeft;
}
}
if (canSkipChecksums && (toRead <= blockLeft)) {
long blockStartInFile = currentLocatedBlock.getStartOffset();
long blockPos = curPos - blockStartInFile;
if (blockReader.readZeroCopy(zcursor,
currentLocatedBlock, blockPos, toRead,
dfsClient.getMmapManager())) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("readZeroCopy read " + toRead + " bytes from " +
"offset " + curPos + " via the zero-copy read path. " +
"blockEnd = " + blockEnd);
}
readStatistics.addZeroCopyBytes(toRead);
seek(pos + toRead);
return;
}
}
/*
* Slow path reads.
*
* readStatistics will be updated when we call back into this
* stream's read methods.
*/
long prevBlockEnd = blockEnd;
int slowReadAmount = zcursor.readViaSlowPath(toRead);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("readZeroCopy read " + slowReadAmount + " bytes " +
"from offset " + curPos + " via the fallback read path. " +
"prevBlockEnd = " + prevBlockEnd + ", blockEnd = " + blockEnd +
", canSkipChecksums = " + canSkipChecksums);
}
}
@Override
public ZeroCopyCursor createZeroCopyCursor()
throws IOException, UnsupportedOperationException {
return new HdfsZeroCopyCursor(this,
dfsClient.getConf().skipShortCircuitChecksums);
}
}

View File

@ -0,0 +1,148 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.ZeroCopyCursor;
import org.apache.hadoop.hdfs.client.ClientMmap;
public class HdfsZeroCopyCursor implements ZeroCopyCursor {
public static final Log LOG = LogFactory.getLog(HdfsZeroCopyCursor.class);
private DFSInputStream stream;
private boolean skipChecksums;
private boolean allowShortReads;
private ClientMmap mmap;
private ByteBuffer fallbackBuffer;
private ByteBuffer readBuffer;
HdfsZeroCopyCursor(DFSInputStream stream, boolean skipChecksums) {
this.stream = stream;
this.skipChecksums = skipChecksums;
this.allowShortReads = false;
this.mmap = null;
this.fallbackBuffer = null;
this.readBuffer = null;
}
@Override
public void close() throws IOException {
stream = null;
if (mmap != null) {
mmap.unref();
mmap = null;
}
fallbackBuffer = null;
readBuffer = null;
}
@Override
public void setFallbackBuffer(ByteBuffer fallbackBuffer) {
this.fallbackBuffer = fallbackBuffer;
}
@Override
public ByteBuffer getFallbackBuffer() {
return this.fallbackBuffer;
}
@Override
public void setSkipChecksums(boolean skipChecksums) {
this.skipChecksums = skipChecksums;
}
@Override
public boolean getSkipChecksums() {
return this.skipChecksums;
}
@Override
public void setAllowShortReads(boolean allowShortReads) {
this.allowShortReads = allowShortReads;
}
@Override
public boolean getAllowShortReads() {
return this.allowShortReads;
}
@Override
public void read(int toRead) throws UnsupportedOperationException,
EOFException, IOException {
if (toRead < 0) {
throw new IllegalArgumentException("can't read " + toRead + " bytes.");
}
stream.readZeroCopy(this, toRead);
}
@Override
public ByteBuffer getData() {
return readBuffer;
}
int readViaSlowPath(int toRead) throws EOFException, IOException {
if (fallbackBuffer == null) {
throw new UnsupportedOperationException("unable to read via " +
"the fastpath, and there was no fallback buffer provided.");
}
fallbackBuffer.clear();
fallbackBuffer.limit(toRead); // will throw if toRead is too large
int totalRead = 0;
readBuffer = fallbackBuffer;
try {
while (toRead > 0) {
int nread = stream.read(fallbackBuffer);
if (nread < 0) {
break;
}
toRead -= nread;
totalRead += nread;
if (allowShortReads) {
break;
}
}
} finally {
fallbackBuffer.flip();
}
if ((toRead > 0) && (!allowShortReads)) {
throw new EOFException("only read " + totalRead + " bytes out of " +
"a requested " + toRead + " before hitting EOF");
}
return totalRead;
}
void setMmap(ClientMmap mmap, ByteBuffer readBuffer) {
if (this.mmap != mmap) {
if (this.mmap != null) {
this.mmap.unref();
}
}
this.mmap = mmap;
mmap.ref();
this.readBuffer = readBuffer;
}
ClientMmap getMmap() {
return mmap;
}
}

View File

@ -27,9 +27,11 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -485,4 +487,11 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public boolean isShortCircuit() {
return false;
}
@Override
public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
LocatedBlock curBlock, long blockPos, int toRead,
ClientMmapManager mmapManager) {
return false;
}
}

View File

@ -29,9 +29,11 @@ import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
@ -40,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@ -451,4 +452,11 @@ public class RemoteBlockReader2 implements BlockReader {
public boolean isShortCircuit() {
return false;
}
@Override
public boolean readZeroCopy(HdfsZeroCopyCursor buffers,
LocatedBlock curBlock, long blockPos, int toRead,
ClientMmapManager manager) {
return false;
}
}

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.FileInputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* A memory-mapped region used by an HDFS client.
*
* This class includes a reference count and some other information used by
* ClientMmapManager to track and cache mmaps.
*/
@InterfaceAudience.Private
public class ClientMmap {
static final Log LOG = LogFactory.getLog(ClientMmap.class);
/**
* A reference to the manager of this mmap.
*
* This is only a weak reference to help minimize the damange done by
* code which leaks references accidentally.
*/
private final WeakReference<ClientMmapManager> manager;
/**
* The actual mapped memory region.
*/
private final MappedByteBuffer map;
/**
* A reference count tracking how many threads are using this object.
*/
private final AtomicInteger refCount = new AtomicInteger(1);
/**
* Block pertaining to this mmap
*/
private final ExtendedBlock block;
/**
* The DataNode where this mmap came from.
*/
private final DatanodeID datanodeID;
/**
* The monotonic time when this mmap was last evictable.
*/
private long lastEvictableTimeNs;
public static ClientMmap load(ClientMmapManager manager, FileInputStream in,
ExtendedBlock block, DatanodeID datanodeID)
throws IOException {
MappedByteBuffer map =
in.getChannel().map(MapMode.READ_ONLY, 0,
in.getChannel().size());
return new ClientMmap(manager, map, block, datanodeID);
}
private ClientMmap(ClientMmapManager manager, MappedByteBuffer map,
ExtendedBlock block, DatanodeID datanodeID)
throws IOException {
this.manager = new WeakReference<ClientMmapManager>(manager);
this.map = map;
this.block = block;
this.datanodeID = datanodeID;
this.lastEvictableTimeNs = 0;
}
/**
* Decrement the reference count on this object.
* Should be called with the ClientMmapManager lock held.
*/
public void unref() {
int count = refCount.decrementAndGet();
if (count < 0) {
throw new IllegalArgumentException("can't decrement the " +
"reference count on this ClientMmap lower than 0.");
} else if (count == 0) {
ClientMmapManager man = manager.get();
if (man == null) {
unmap();
} else {
man.makeEvictable(this);
}
}
}
/**
* Increment the reference count on this object.
*
* @return The new reference count.
*/
public int ref() {
return refCount.getAndIncrement();
}
@VisibleForTesting
public ExtendedBlock getBlock() {
return block;
}
DatanodeID getDatanodeID() {
return datanodeID;
}
public MappedByteBuffer getMappedByteBuffer() {
return map;
}
public void setLastEvictableTimeNs(long lastEvictableTimeNs) {
this.lastEvictableTimeNs = lastEvictableTimeNs;
}
public long getLastEvictableTimeNs() {
return this.lastEvictableTimeNs;
}
/**
* Unmap the memory region.
*
* There isn't any portable way to unmap a memory region in Java.
* So we use the sun.nio method here.
* Note that unmapping a memory region could cause crashes if code
* continues to reference the unmapped code. However, if we don't
* manually unmap the memory, we are dependent on the finalizer to
* do it, and we have no idea when the finalizer will run.
*/
void unmap() {
assert(refCount.get() == 0);
if (map instanceof sun.nio.ch.DirectBuffer) {
final sun.misc.Cleaner cleaner =
((sun.nio.ch.DirectBuffer) map).cleaner();
cleaner.clean();
}
}
}

View File

@ -0,0 +1,476 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.Closeable;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Tracks mmap instances used on an HDFS client.
*
* mmaps can be used concurrently by multiple threads at once.
* mmaps cannot be closed while they are in use.
*
* The cache is important for performance, because the first time an mmap is
* created, the page table entries (PTEs) are not yet set up.
* Even when reading data that is entirely resident in memory, reading an
* mmap the second time is faster.
*/
@InterfaceAudience.Private
public class ClientMmapManager implements Closeable {
public static final Log LOG = LogFactory.getLog(ClientMmapManager.class);
private boolean closed = false;
private final int cacheSize;
private final long timeoutNs;
private final int runsPerTimeout;
private final Lock lock = new ReentrantLock();
/**
* Maps block, datanode_id to the client mmap object.
* If the ClientMmap is in the process of being loaded,
* {@link Waitable<ClientMmap>#await()} will block.
*
* Protected by the ClientMmapManager lock.
*/
private final TreeMap<Key, Waitable<ClientMmap>> mmaps =
new TreeMap<Key, Waitable<ClientMmap>>();
/**
* Maps the last use time to the client mmap object.
* We ensure that each last use time is unique by inserting a jitter of a
* nanosecond or two if necessary.
*
* Protected by the ClientMmapManager lock.
* ClientMmap objects that are in use are never evictable.
*/
private final TreeMap<Long, ClientMmap> evictable =
new TreeMap<Long, ClientMmap>();
private final ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
setDaemon(true).setNameFormat("ClientMmapManager").
build());
/**
* The CacheCleaner for this ClientMmapManager. We don't create this
* and schedule it until it becomes necessary.
*/
private CacheCleaner cacheCleaner;
/**
* Factory method to create a ClientMmapManager from a Hadoop
* configuration.
*/
public static ClientMmapManager fromConf(Configuration conf) {
return new ClientMmapManager(conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE,
DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT),
conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT),
conf.getInt(DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT,
DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT));
}
public ClientMmapManager(int cacheSize, long timeoutMs, int runsPerTimeout) {
this.cacheSize = cacheSize;
this.timeoutNs = timeoutMs * 1000000;
this.runsPerTimeout = runsPerTimeout;
}
long getTimeoutMs() {
return this.timeoutNs / 1000000;
}
int getRunsPerTimeout() {
return this.runsPerTimeout;
}
public String verifyConfigurationMatches(Configuration conf) {
StringBuilder bld = new StringBuilder();
int cacheSize = conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE,
DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
if (this.cacheSize != cacheSize) {
bld.append("You specified a cache size of ").append(cacheSize).
append(", but the existing cache size is ").append(this.cacheSize).
append(". ");
}
long timeoutMs = conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
if (getTimeoutMs() != timeoutMs) {
bld.append("You specified a cache timeout of ").append(timeoutMs).
append(" ms, but the existing cache timeout is ").
append(getTimeoutMs()).append("ms").append(". ");
}
int runsPerTimeout = conf.getInt(
DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT,
DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT);
if (getRunsPerTimeout() != runsPerTimeout) {
bld.append("You specified ").append(runsPerTimeout).
append(" runs per timeout, but the existing runs per timeout is ").
append(getTimeoutMs()).append(". ");
}
return bld.toString();
}
private static class Waitable<T> {
private T val;
private final Condition cond;
public Waitable(Condition cond) {
this.val = null;
this.cond = cond;
}
public T await() throws InterruptedException {
while (this.val == null) {
this.cond.await();
}
return this.val;
}
public void provide(T val) {
this.val = val;
this.cond.signalAll();
}
}
private static class Key implements Comparable<Key> {
private final ExtendedBlock block;
private final DatanodeID datanode;
Key(ExtendedBlock block, DatanodeID datanode) {
this.block = block;
this.datanode = datanode;
}
/**
* Compare two ClientMmap regions that we're storing.
*
* When we append to a block, we bump the genstamp. It is important to
* compare the genStamp here. That way, we will not return a shorter
* mmap than required.
*/
@Override
public int compareTo(Key o) {
return ComparisonChain.start().
compare(block.getBlockId(), o.block.getBlockId()).
compare(block.getGenerationStamp(), o.block.getGenerationStamp()).
compare(block.getBlockPoolId(), o.block.getBlockPoolId()).
compare(datanode, o.datanode).
result();
}
@Override
public boolean equals(Object rhs) {
if (rhs == null) {
return false;
}
try {
Key o = (Key)rhs;
return (compareTo(o) == 0);
} catch (ClassCastException e) {
return false;
}
}
@Override
public int hashCode() {
return block.hashCode() ^ datanode.hashCode();
}
}
/**
* Thread which handles expiring mmaps from the cache.
*/
private static class CacheCleaner implements Runnable, Closeable {
private WeakReference<ClientMmapManager> managerRef;
private ScheduledFuture<?> future;
CacheCleaner(ClientMmapManager manager) {
this.managerRef= new WeakReference<ClientMmapManager>(manager);
}
@Override
public void run() {
ClientMmapManager manager = managerRef.get();
if (manager == null) return;
long curTime = System.nanoTime();
try {
manager.lock.lock();
manager.evictStaleEntries(curTime);
} finally {
manager.lock.unlock();
}
}
void setFuture(ScheduledFuture<?> future) {
this.future = future;
}
@Override
public void close() throws IOException {
future.cancel(false);
}
}
/**
* Evict entries which are older than curTime + timeoutNs from the cache.
*
* NOTE: you must call this function with the lock held.
*/
private void evictStaleEntries(long curTime) {
if (closed) {
return;
}
Iterator<Entry<Long, ClientMmap>> iter =
evictable.entrySet().iterator();
while (iter.hasNext()) {
Entry<Long, ClientMmap> entry = iter.next();
if (entry.getKey() + timeoutNs >= curTime) {
return;
}
ClientMmap mmap = entry.getValue();
Key key = new Key(mmap.getBlock(), mmap.getDatanodeID());
mmaps.remove(key);
iter.remove();
mmap.unmap();
}
}
/**
* Evict one mmap object from the cache.
*
* NOTE: you must call this function with the lock held.
*
* @return True if an object was evicted; false if none
* could be evicted.
*/
private boolean evictOne() {
Entry<Long, ClientMmap> entry = evictable.pollFirstEntry();
if (entry == null) {
// We don't want to try creating another mmap region, because the
// cache is full.
return false;
}
ClientMmap evictedMmap = entry.getValue();
Key evictedKey = new Key(evictedMmap.getBlock(),
evictedMmap.getDatanodeID());
mmaps.remove(evictedKey);
evictedMmap.unmap();
return true;
}
/**
* Create a new mmap object.
*
* NOTE: you must call this function with the lock held.
*
* @param key The key which describes this mmap.
* @param in The input stream to use to create the mmap.
* @return The new mmap object, or null if there were
* insufficient resources.
* @throws IOException If there was an I/O error creating the mmap.
*/
private ClientMmap create(Key key, FileInputStream in) throws IOException {
if (mmaps.size() + 1 > cacheSize) {
if (!evictOne()) {
LOG.warn("mmap cache is full (with " + cacheSize + " elements) and " +
"nothing is evictable. Ignoring request for mmap with " +
"datanodeID=" + key.datanode + ", " + "block=" + key.block);
return null;
}
}
// Create the condition variable that other threads may wait on.
Waitable<ClientMmap> waitable =
new Waitable<ClientMmap>(lock.newCondition());
mmaps.put(key, waitable);
// Load the entry
boolean success = false;
ClientMmap mmap = null;
try {
try {
lock.unlock();
mmap = ClientMmap.load(this, in, key.block, key.datanode);
} finally {
lock.lock();
}
if (cacheCleaner == null) {
cacheCleaner = new CacheCleaner(this);
ScheduledFuture<?> future =
executor.scheduleAtFixedRate(cacheCleaner,
timeoutNs, timeoutNs / runsPerTimeout, TimeUnit.NANOSECONDS);
cacheCleaner.setFuture(future);
}
success = true;
} finally {
if (!success) {
LOG.warn("failed to create mmap for datanodeID=" + key.datanode +
", " + "block=" + key.block);
mmaps.remove(key);
}
waitable.provide(mmap);
}
return mmap;
}
/**
* Get or create an mmap region.
*
* @param node The DataNode that owns the block for this mmap region.
* @param block The block ID, block pool ID, and generation stamp of
* the block we want to read.
* @param in An open file for this block. This stream is only used
* if we have to create a new mmap; if we use an
* existing one, it is ignored.
*
* @return The client mmap region.
*/
public ClientMmap fetch(DatanodeID datanodeID, ExtendedBlock block,
FileInputStream in) throws IOException, InterruptedException {
LOG.debug("fetching mmap with datanodeID=" + datanodeID + ", " +
"block=" + block);
Key key = new Key(block, datanodeID);
ClientMmap mmap = null;
try {
lock.lock();
if (closed) {
throw new IOException("ClientMmapManager is closed.");
}
while (mmap == null) {
Waitable<ClientMmap> entry = mmaps.get(key);
if (entry == null) {
return create(key, in);
}
mmap = entry.await();
}
if (mmap.ref() == 1) {
// When going from nobody using the mmap (ref = 0) to somebody
// using the mmap (ref = 1), we must make the mmap un-evictable.
evictable.remove(mmap.getLastEvictableTimeNs());
}
}
finally {
lock.unlock();
}
LOG.debug("reusing existing mmap with datanodeID=" + datanodeID +
", " + "block=" + block);
return mmap;
}
/**
* Make an mmap evictable.
*
* When an mmap is evictable, it may be removed from the cache if necessary.
* mmaps can only be evictable if nobody is using them.
*
* @param mmap The mmap to make evictable.
*/
void makeEvictable(ClientMmap mmap) {
try {
lock.lock();
if (closed) {
// If this ClientMmapManager is closed, then don't bother with the
// cache; just close the mmap.
mmap.unmap();
return;
}
long now = System.nanoTime();
while (evictable.containsKey(now)) {
now++;
}
mmap.setLastEvictableTimeNs(now);
evictable.put(now, mmap);
} finally {
lock.unlock();
}
}
@Override
public void close() throws IOException {
try {
lock.lock();
closed = true;
IOUtils.cleanup(LOG, cacheCleaner);
// Unmap all the mmaps that nobody is using.
// The ones which are in use will be unmapped just as soon as people stop
// using them.
evictStaleEntries(Long.MAX_VALUE);
executor.shutdown();
} finally {
lock.unlock();
}
}
@VisibleForTesting
public interface ClientMmapVisitor {
void accept(ClientMmap mmap);
}
@VisibleForTesting
public synchronized void visitMmaps(ClientMmapVisitor visitor)
throws InterruptedException {
for (Waitable<ClientMmap> entry : mmaps.values()) {
visitor.accept(entry.await());
}
}
public void visitEvictable(ClientMmapVisitor visitor)
throws InterruptedException {
for (ClientMmap mmap : evictable.values()) {
visitor.accept(mmap);
}
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "expect.h"
#include "hdfs.h"
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int expectFileStats(hdfsFile file,
uint64_t expectedTotalBytesRead,
uint64_t expectedTotalLocalBytesRead,
uint64_t expectedTotalShortCircuitBytesRead,
uint64_t expectedTotalZeroCopyBytesRead)
{
struct hdfsReadStatistics *stats = NULL;
EXPECT_ZERO(hdfsFileGetReadStatistics(file, &stats));
if (expectedTotalBytesRead != UINT64_MAX) {
EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
}
if (expectedTotalLocalBytesRead != UINT64_MAX) {
EXPECT_INT64_EQ(expectedTotalLocalBytesRead,
stats->totalLocalBytesRead);
}
if (expectedTotalShortCircuitBytesRead != UINT64_MAX) {
EXPECT_INT64_EQ(expectedTotalShortCircuitBytesRead,
stats->totalShortCircuitBytesRead);
}
if (expectedTotalZeroCopyBytesRead != UINT64_MAX) {
EXPECT_INT64_EQ(expectedTotalZeroCopyBytesRead,
stats->totalZeroCopyBytesRead);
}
hdfsFileFreeReadStatistics(stats);
return 0;
}

View File

@ -19,16 +19,19 @@
#ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H
#define LIBHDFS_NATIVE_TESTS_EXPECT_H
#include <inttypes.h>
#include <stdio.h>
struct hdfsFile_internal;
#define EXPECT_ZERO(x) \
do { \
int __my_ret__ = x; \
if (__my_ret__) { \
int __my_errno__ = errno; \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): got nonzero from %s\n", \
__LINE__, __my_ret__, __my_errno__, #x); \
__FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
return __my_ret__; \
} \
} while (0);
@ -38,9 +41,9 @@
void* __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ != NULL) { \
fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
"got non-NULL value %p from %s\n", \
__LINE__, __my_errno__, __my_ret__, #x); \
__FILE__, __LINE__, __my_errno__, __my_ret__, #x); \
return -1; \
} \
} while (0);
@ -50,8 +53,8 @@
void* __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ == NULL) { \
fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
"got NULL from %s\n", __LINE__, __my_errno__, #x); \
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
"got NULL from %s\n", __FILE__, __LINE__, __my_errno__, #x); \
return -1; \
} \
} while (0);
@ -61,15 +64,16 @@
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ != -1) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): expected -1 from %s\n", __LINE__, \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): expected -1 from %s\n", \
__FILE__, __LINE__, \
__my_ret__, __my_errno__, #x); \
return -1; \
} \
if (__my_errno__ != e) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): expected errno = %d from %s\n", \
__LINE__, __my_ret__, __my_errno__, e, #x); \
__FILE__, __LINE__, __my_ret__, __my_errno__, e, #x); \
return -1; \
} \
} while (0);
@ -79,9 +83,9 @@
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (!__my_ret__) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): got zero from %s\n", __LINE__, \
__my_ret__, __my_errno__, #x); \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): got zero from %s\n", __FILE__, __LINE__, \
__my_ret__, __my_errno__, #x); \
return -1; \
} \
} while (0);
@ -91,9 +95,9 @@
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ < 0) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): got negative return from %s\n", \
__LINE__, __my_ret__, __my_errno__, #x); \
__FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
return __my_ret__; \
} \
} while (0);
@ -103,9 +107,21 @@
int __my_ret__ = y; \
int __my_errno__ = errno; \
if (__my_ret__ != (x)) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): expected %d\n", \
__LINE__, __my_ret__, __my_errno__, (x)); \
__FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
return -1; \
} \
} while (0);
#define EXPECT_INT64_EQ(x, y) \
do { \
int64_t __my_ret__ = y; \
int __my_errno__ = errno; \
if (__my_ret__ != (x)) { \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"value %"PRId64" (errno: %d): expected %"PRId64"\n", \
__FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
return -1; \
} \
} while (0);
@ -117,4 +133,17 @@
ret = -errno; \
} while (ret == -EINTR);
/**
* Test that an HDFS file has the given statistics.
*
* Any parameter can be set to UINT64_MAX to avoid checking it.
*
* @return 0 on success; error code otherwise
*/
int expectFileStats(struct hdfsFile_internal *file,
uint64_t expectedTotalBytesRead,
uint64_t expectedTotalLocalBytesRead,
uint64_t expectedTotalShortCircuitBytesRead,
uint64_t expectedTotalZeroCopyBytesRead);
#endif

View File

@ -39,6 +39,7 @@
#define JAVA_NET_ISA "java/net/InetSocketAddress"
#define JAVA_NET_URI "java/net/URI"
#define JAVA_STRING "java/lang/String"
#define HADOOP_ZERO_COPY_CURSOR "org/apache/hadoop/fs/ZeroCopyCursor"
#define JAVA_VOID "V"
@ -143,6 +144,15 @@ int hdfsFileGetReadStatistics(hdfsFile file,
goto done;
}
s->totalShortCircuitBytesRead = jVal.j;
jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
"getTotalZeroCopyBytesRead", "()J");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
goto done;
}
s->totalZeroCopyBytesRead = jVal.j;
*stats = s;
s = NULL;
ret = 0;
@ -183,6 +193,25 @@ void hdfsFileDisableDirectRead(hdfsFile file)
file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
}
int hdfsDisableDomainSocketSecurity(void)
{
jthrowable jthr;
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jthr = invokeMethod(env, NULL, STATIC, NULL,
"org/apache/hadoop/net/unix/DomainSocket",
"disableBindPathValidation", "()V");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"DomainSocket#disableBindPathValidation");
return -1;
}
return 0;
}
/**
* hdfsJniEnv: A wrapper struct to be used as 'value'
* while saving thread -> JNIEnv* mappings
@ -220,40 +249,6 @@ static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path,
return NULL;
}
/**
* Set a configuration value.
*
* @param env The JNI environment
* @param jConfiguration The configuration object to modify
* @param key The key to modify
* @param value The value to set the key to
*
* @return NULL on success; exception otherwise
*/
static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value)
{
jthrowable jthr;
jstring jkey = NULL, jvalue = NULL;
jthr = newJavaStr(env, key, &jkey);
if (jthr)
goto done;
jthr = newJavaStr(env, value, &jvalue);
if (jthr)
goto done;
jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
JPARAM(JAVA_STRING), JAVA_VOID),
jkey, jvalue);
if (jthr)
goto done;
done:
destroyLocalReference(env, jkey);
destroyLocalReference(env, jvalue);
return jthr;
}
static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
const char *key, char **val)
{
@ -2108,6 +2103,248 @@ int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
return 0;
}
struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file)
{
int ret;
jobject zcursor = NULL;
jvalue jVal;
jthrowable jthr;
JNIEnv* env;
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return NULL;
}
if (file->type != INPUT) {
ret = EINVAL;
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)file->file, HADOOP_ISTRM,
"createZeroCopyCursor", "()L"HADOOP_ZERO_COPY_CURSOR";");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyCursorAlloc: createZeroCopyCursor");
goto done;
}
zcursor = (*env)->NewGlobalRef(env, jVal.l);
if (!zcursor) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hadoopZeroCopyCursorAlloc: NewGlobalRef");
}
ret = 0;
done:
if (ret) {
errno = ret;
}
return (struct hadoopZeroCopyCursor*)zcursor;
}
int hadoopZeroCopyCursorSetFallbackBuffer(struct hadoopZeroCopyCursor* zcursor,
void *cbuf, uint32_t size)
{
int ret;
jobject buffer = NULL;
jthrowable jthr;
JNIEnv* env;
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
buffer = (*env)->NewDirectByteBuffer(env, cbuf, size);
if (!buffer) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hadoopZeroCopyCursorSetFallbackBuffer: NewDirectByteBuffer("
"size=%"PRId32"):", size);
goto done;
}
jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
HADOOP_ZERO_COPY_CURSOR, "setFallbackBuffer",
"(Ljava/nio/ByteBuffer;)V", buffer);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyCursorSetFallbackBuffer: "
"FileSystem#setFallbackBuffer");
goto done;
}
ret = 0;
done:
if (ret) {
(*env)->DeleteLocalRef(env, buffer);
errno = ret;
return -1;
}
return 0;
}
int hadoopZeroCopyCursorSetSkipChecksums(struct hadoopZeroCopyCursor* zcursor,
int skipChecksums)
{
JNIEnv* env;
jthrowable jthr;
jboolean shouldSkipChecksums = skipChecksums ? JNI_TRUE : JNI_FALSE;
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
HADOOP_ZERO_COPY_CURSOR, "setSkipChecksums", "(Z)V",
shouldSkipChecksums);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyCursorSetSkipChecksums(): setSkipChecksums failed");
return -1;
}
return 0;
}
int hadoopZeroCopyCursorSetAllowShortReads(
struct hadoopZeroCopyCursor* zcursor, int allowShort)
{
JNIEnv* env;
jthrowable jthr;
jboolean shouldAllowShort = allowShort ? JNI_TRUE : JNI_FALSE;
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
HADOOP_ZERO_COPY_CURSOR, "setAllowShortReads", "(Z)V",
shouldAllowShort);
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyCursorSetAllowShortReads(): setAllowShortReads "
"failed");
return -1;
}
return 0;
}
void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor)
{
JNIEnv* env;
jthrowable jthr;
env = getJNIEnv();
if (env == NULL) {
return;
}
jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
HADOOP_ZERO_COPY_CURSOR, "close", "()V");
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyCursorFree(): close failed");
}
(*env)->DeleteGlobalRef(env, (jobject)zcursor);
}
/**
* Translate an exception from ZeroCopyCursor#read, translate it into a return
* code.
*/
static int translateZCRException(JNIEnv *env, jthrowable exc)
{
int ret;
char *className = NULL;
jthrowable jthr = classNameOfObject(exc, env, &className);
if (jthr) {
fprintf(stderr, "hadoopZeroCopyRead: unknown "
"exception from read().\n");
destroyLocalReference(env, jthr);
destroyLocalReference(env, jthr);
ret = EIO;
goto done;
}
if (!strcmp(className, "java.io.EOFException")) {
ret = 0; // EOF
goto done;
}
if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
ret = EPROTONOSUPPORT;
goto done;
}
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyRead: ZeroCopyCursor#read failed");
done:
free(className);
return ret;
}
int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
int32_t toRead, const void **data)
{
int32_t ret, nRead = -1;
JNIEnv* env;
jthrowable jthr;
jobject byteBuffer = NULL;
uint8_t *addr;
jint position;
jvalue jVal;
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jthr = invokeMethod(env, NULL, INSTANCE, (jobject)zcursor,
HADOOP_ZERO_COPY_CURSOR, "read", "(I)V", toRead);
if (jthr) {
ret = translateZCRException(env, jthr);
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, (jobject)zcursor,
HADOOP_ZERO_COPY_CURSOR, "getData",
"()Ljava/nio/ByteBuffer;");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyRead(toRead=%"PRId32"): getData failed",
toRead);
goto done;
}
byteBuffer = jVal.l;
addr = (*env)->GetDirectBufferAddress(env, byteBuffer);
if (!addr) {
fprintf(stderr, "hadoopZeroCopyRead(toRead=%"PRId32"): "
"failed to get direct buffer address.\n", toRead);
ret = EIO;
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
"java/nio/ByteBuffer", "position", "()I");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#position "
"failed", toRead);
goto done;
}
position = jVal.i;
jthr = invokeMethod(env, &jVal, INSTANCE, byteBuffer,
"java/nio/ByteBuffer", "remaining", "()I");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyRead(toRead=%"PRId32"): ByteBuffer#remaining "
"failed", toRead);
goto done;
}
ret = 0;
nRead = jVal.i;
*data = addr + position;
done:
(*env)->DeleteLocalRef(env, byteBuffer);
if (nRead == -1) {
errno = ret;
return -1;
}
return nRead;
}
char***
hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
{

View File

@ -85,6 +85,7 @@ extern "C" {
uint64_t totalBytesRead;
uint64_t totalLocalBytesRead;
uint64_t totalShortCircuitBytesRead;
uint64_t totalZeroCopyBytesRead;
};
/**
@ -680,7 +681,89 @@ extern "C" {
* @return 0 on success else -1
*/
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
/**
* Create a zero-copy cursor object.
*
* @param file The file to use for zero-copy reads.
*
* @return The zero-copy cursor, or NULL + errno on failure.
*/
struct hadoopZeroCopyCursor* hadoopZeroCopyCursorAlloc(hdfsFile file);
/**
* Set the fallback buffer which will be used by the zero copy object.
*
* You are responsible for ensuring that this buffer stays valid until you
* either set a different buffer by calling this function again, or free the
* zero-copy cursor.
*
* @param zcursor The zero-copy cursor.
* @param cbuf The buffer to use.
* @param size Size of the buffer.
*
* @return 0 on success. -1 on error. Errno will be set on
* error.
*/
int hadoopZeroCopyCursorSetFallbackBuffer(
struct hadoopZeroCopyCursor* zcursor, void *cbuf, uint32_t size);
/**
* Set whether our cursor should skip checksums or not.
*
* @param zcursor The cursor
* @param skipChecksums Nonzero to skip checksums.
*
* @return -1 on error, 0 otherwise.
*/
int hadoopZeroCopyCursorSetSkipChecksums(
struct hadoopZeroCopyCursor* zcursor, int skipChecksums);
/**
* Set whether our cursor should allow short reads to occur.
* Short reads will always occur if there is not enough data to read
* (i.e., at EOF), but normally we don't return them when reading other
* parts of the file.
*
* @param zcursor The cursor
* @param skipChecksums Nonzero to skip checksums.
*
* @return -1 on error, 0 otherwise.
*/
int hadoopZeroCopyCursorSetAllowShortReads(
struct hadoopZeroCopyCursor* zcursor, int allowShort);
/**
* Free zero-copy cursor.
*
* This will dispose of the cursor allocated by hadoopZeroCopyCursorAlloc, as
* well as any memory map that we have created. You must be done with the
* data returned from hadoopZeroCopyRead before calling this.
*
* @param zcursor The zero-copy cursor.
*/
void hadoopZeroCopyCursorFree(struct hadoopZeroCopyCursor *zcursor);
/*
* Perform a zero-copy read.
*
* @param zcursor The zero-copy cursor object.
* @param toRead The maximum amount to read.
* @param data (out param) on succesful return, a pointer to the
* data. This pointer will remain valid until the next
* call to hadoopZeroCopyRead, or until
* hadoopZeroCopyCursorFree is called on zcursor.
*
* @return -2 if zero-copy is unavailable, and
* -1 if there was an error. errno will be the error.
* 0 if we hit end-of-file without reading anything.
* The positive number of bytes read otherwise. Short
* reads will happen only if EOF is reached.
* The amount read otherwise.
*/
int32_t hadoopZeroCopyRead(struct hadoopZeroCopyCursor *zcursor,
int32_t toRead, const void **data);
#ifdef __cplusplus
}
#endif

View File

@ -48,6 +48,15 @@ extern "C" {
* @param file The HDFS file
*/
void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
/**
* Disable domain socket security checks.
*
* @param 0 if domain socket security was disabled;
* -1 if not.
*/
int hdfsDisableDomainSocketSecurity(void);
#ifdef __cplusplus
}
#endif

View File

@ -608,3 +608,42 @@ JNIEnv* getJNIEnv(void)
return env;
}
int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name)
{
jclass clazz;
int ret;
clazz = (*env)->FindClass(env, name);
if (!clazz) {
printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"javaObjectIsOfClass(%s)", name);
return -1;
}
ret = (*env)->IsInstanceOf(env, obj, clazz);
(*env)->DeleteLocalRef(env, clazz);
return ret == JNI_TRUE ? 1 : 0;
}
jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value)
{
jthrowable jthr;
jstring jkey = NULL, jvalue = NULL;
jthr = newJavaStr(env, key, &jkey);
if (jthr)
goto done;
jthr = newJavaStr(env, value, &jvalue);
if (jthr)
goto done;
jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
"org/apache/hadoop/conf/Configuration", "set",
"(Ljava/lang/String;Ljava/lang/String;)V",
jkey, jvalue);
if (jthr)
goto done;
done:
(*env)->DeleteLocalRef(env, jkey);
(*env)->DeleteLocalRef(env, jvalue);
return jthr;
}

View File

@ -114,6 +114,32 @@ jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name);
* */
JNIEnv* getJNIEnv(void);
/**
* Figure out if a Java object is an instance of a particular class.
*
* @param env The Java environment.
* @param obj The object to check.
* @param name The class name to check.
*
* @return -1 if we failed to find the referenced class name.
* 0 if the object is not of the given class.
* 1 if the object is of the given class.
*/
int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name);
/**
* Set a value in a configuration object.
*
* @param env The JNI environment
* @param jConfiguration The configuration object to modify
* @param key The key to modify
* @param value The value to set the key to
*
* @return NULL on success; exception otherwise
*/
jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value);
#endif /*LIBHDFS_JNI_HELPER_H*/
/**

View File

@ -17,14 +17,19 @@
*/
#include "exception.h"
#include "hdfs.h"
#include "hdfs_test.h"
#include "jni_helper.h"
#include "native_mini_dfs.h"
#include <errno.h>
#include <jni.h>
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
#define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
@ -39,8 +44,44 @@ struct NativeMiniDfsCluster {
* The NativeMiniDfsCluster object
*/
jobject obj;
/**
* Path to the domain socket, or the empty string if there is none.
*/
char domainSocketPath[PATH_MAX];
};
static jthrowable nmdConfigureShortCircuit(JNIEnv *env,
struct NativeMiniDfsCluster *cl, jobject cobj)
{
jthrowable jthr;
char *tmpDir;
int ret = hdfsDisableDomainSocketSecurity();
if (ret) {
return newRuntimeError(env, "failed to disable hdfs domain "
"socket security: error %d", ret);
}
jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true");
if (jthr) {
return jthr;
}
tmpDir = getenv("TMPDIR");
if (!tmpDir) {
tmpDir = "/tmp";
}
snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
tmpDir, getpid(), rand());
snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
tmpDir, getpid(), rand());
jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path",
cl->domainSocketPath);
if (jthr) {
return jthr;
}
return NULL;
}
struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
{
struct NativeMiniDfsCluster* cl = NULL;
@ -81,6 +122,28 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
goto error;
}
}
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: Configuration::setBoolean");
goto error;
}
// Disable 'minimum block size' -- it's annoying in tests.
(*env)->DeleteLocalRef(env, jconfStr);
jconfStr = NULL;
jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: new String");
goto error;
}
jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
"setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: Configuration::setLong");
goto error;
}
// Creae MiniDFSCluster object
jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
"(L"HADOOP_CONF";)V", cobj);
if (jthr) {
@ -88,6 +151,14 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
"nmdCreate: NativeMiniDfsCluster#Builder#Builder");
goto error;
}
if (conf->configureShortCircuit) {
jthr = nmdConfigureShortCircuit(env, cl, cobj);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: nmdConfigureShortCircuit error");
goto error;
}
}
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
if (jthr) {
@ -272,3 +343,29 @@ error_dlr_nn:
return ret;
}
int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
struct hdfsBuilder *bld)
{
int port, ret;
hdfsBuilderSetNameNode(bld, "localhost");
port = nmdGetNameNodePort(cl);
if (port < 0) {
fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port);
return EIO;
}
hdfsBuilderSetNameNodePort(bld, port);
if (cl->domainSocketPath[0]) {
ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true");
if (ret) {
return ret;
}
ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path",
cl->domainSocketPath);
if (ret) {
return ret;
}
}
return 0;
}

View File

@ -21,6 +21,7 @@
#include <jni.h> /* for jboolean */
struct hdfsBuilder;
struct NativeMiniDfsCluster;
/**
@ -28,17 +29,24 @@ struct NativeMiniDfsCluster;
*/
struct NativeMiniDfsConf {
/**
* Nonzero if the cluster should be formatted prior to startup
* Nonzero if the cluster should be formatted prior to startup.
*/
jboolean doFormat;
/**
* Whether or not to enable webhdfs in MiniDfsCluster
*/
jboolean webhdfsEnabled;
/**
* The http port of the namenode in MiniDfsCluster
*/
jint namenodeHttpPort;
/**
* Nonzero if we should configure short circuit.
*/
jboolean configureShortCircuit;
};
/**
@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster* cl);
*
* @return the port, or a negative error code
*/
int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
/**
* Get the http address that's in use by the given (non-HA) nativeMiniDfs
@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
int *port, const char **hostName);
/**
* Configure the HDFS builder appropriately to connect to this cluster.
*
* @param bld The hdfs builder
*
* @return the port, or a negative error code
*/
int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
struct hdfsBuilder *bld);
#endif

View File

@ -0,0 +1,225 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "expect.h"
#include "hdfs.h"
#include "native_mini_dfs.h"
#include <errno.h>
#include <inttypes.h>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#define TO_STR_HELPER(X) #X
#define TO_STR(X) TO_STR_HELPER(X)
#define TEST_FILE_NAME_LENGTH 128
#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
#define TEST_ZEROCOPY_NUM_BLOCKS 6
#define SMALL_READ_LEN 16
#define ZC_BUF_LEN 32768
static uint8_t *getZeroCopyBlockData(int blockIdx)
{
uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
int i;
if (!buf) {
fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
exit(1);
}
for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
buf[i] = blockIdx + (i % 17);
}
return buf;
}
static int getZeroCopyBlockLen(int blockIdx)
{
if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
return 0;
} else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
} else {
return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
}
}
static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused));
static void printBuf(const uint8_t *buf, size_t len)
{
size_t i;
for (i = 0; i < len; i++) {
fprintf(stderr, "%02x", buf[i]);
}
fprintf(stderr, "\n");
}
static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
{
hdfsFile file = NULL;
struct hadoopZeroCopyCursor *zcursor = NULL;
uint8_t *backingBuffer = NULL, *block;
const void *zcPtr;
file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
EXPECT_NONNULL(file);
zcursor = hadoopZeroCopyCursorAlloc(file);
EXPECT_NONNULL(zcursor);
/* haven't read anything yet */
EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
block = getZeroCopyBlockData(0);
EXPECT_NONNULL(block);
/* first read is half of a block. */
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
hadoopZeroCopyRead(zcursor,
TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
EXPECT_ZERO(memcmp(zcPtr, block, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
/* read the next half of the block */
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
hadoopZeroCopyRead(zcursor,
TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, &zcPtr));
EXPECT_ZERO(memcmp(zcPtr, block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
free(block);
EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE));
/* Now let's read just a few bytes. */
EXPECT_INT_EQ(SMALL_READ_LEN,
hadoopZeroCopyRead(zcursor, SMALL_READ_LEN, &zcPtr));
block = getZeroCopyBlockData(1);
EXPECT_NONNULL(block);
EXPECT_ZERO(memcmp(block, zcPtr, SMALL_READ_LEN));
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
hdfsTell(fs, file));
EXPECT_ZERO(expectFileStats(file,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
/* Try to read a full block's worth of data. This will cross the block
* boundary, which means we have to fall back to non-zero-copy reads.
* However, because we don't have a backing buffer, the fallback will fail
* with EPROTONOSUPPORT. */
EXPECT_INT_EQ(-1,
hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
/* Now set a backing buffer and try again. It should succeed this time. */
backingBuffer = malloc(ZC_BUF_LEN);
EXPECT_NONNULL(backingBuffer);
EXPECT_ZERO(hadoopZeroCopyCursorSetFallbackBuffer(zcursor,
backingBuffer, ZC_BUF_LEN));
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE,
hadoopZeroCopyRead(zcursor, TEST_ZEROCOPY_FULL_BLOCK_SIZE, &zcPtr));
EXPECT_ZERO(expectFileStats(file,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, zcPtr,
TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
free(block);
block = getZeroCopyBlockData(2);
EXPECT_NONNULL(block);
EXPECT_ZERO(memcmp(block, zcPtr +
(TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
free(block);
hadoopZeroCopyCursorFree(zcursor);
EXPECT_ZERO(hdfsCloseFile(fs, file));
free(backingBuffer);
return 0;
}
static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
size_t testFileNameLen)
{
int blockIdx, blockLen;
hdfsFile file;
uint8_t *data;
snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
getpid(), rand());
file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
TEST_ZEROCOPY_FULL_BLOCK_SIZE);
EXPECT_NONNULL(file);
for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
blockLen = getZeroCopyBlockLen(blockIdx);
data = getZeroCopyBlockData(blockIdx);
EXPECT_NONNULL(data);
EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
}
EXPECT_ZERO(hdfsCloseFile(fs, file));
return 0;
}
/**
* Test that we can write a file with libhdfs and then read it back
*/
int main(void)
{
int port;
struct NativeMiniDfsConf conf = {
.doFormat = 1,
.configureShortCircuit = 1,
};
char testFileName[TEST_FILE_NAME_LENGTH];
hdfsFS fs;
struct NativeMiniDfsCluster* cl;
struct hdfsBuilder *bld;
cl = nmdCreate(&conf);
EXPECT_NONNULL(cl);
EXPECT_ZERO(nmdWaitClusterUp(cl));
port = nmdGetNameNodePort(cl);
if (port < 0) {
fprintf(stderr, "TEST_ERROR: test_zerocopy: "
"nmdGetNameNodePort returned error %d\n", port);
return EXIT_FAILURE;
}
bld = hdfsNewBuilder();
EXPECT_NONNULL(bld);
EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
hdfsBuilderSetForceNewInstance(bld);
hdfsBuilderConfSetStr(bld, "dfs.block.size",
TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
/* ensure that we'll always get our mmaps */
hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
"true");
fs = hdfsBuilderConnect(bld);
EXPECT_NONNULL(fs);
EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName,
TEST_FILE_NAME_LENGTH));
EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName));
EXPECT_ZERO(hdfsDisconnect(fs));
EXPECT_ZERO(nmdShutdown(cl));
nmdFree(cl);
fprintf(stderr, "TEST_SUCCESS\n");
return EXIT_SUCCESS;
}

View File

@ -1391,4 +1391,32 @@
linearly increases.
</description>
</property>
<property>
<name>dfs.client.mmap.cache.size</name>
<value>1024</value>
<description>
When zero-copy reads are used, the DFSClient keeps a cache of recently used
memory mapped regions. This parameter controls the maximum number of
entries that we will keep in that cache.
If this is set to 0, we will not allow mmap.
The larger this number is, the more file descriptors we will potentially
use for memory-mapped files. mmaped files also use virtual address space.
You may need to increase your ulimit virtual address space limits before
increasing the client mmap cache size.
</description>
</property>
<property>
<name>dfs.client.mmap.cache.timeout.ms</name>
<value>900000</value>
<description>
The minimum length of time that we will keep an mmap entry in the cache
between uses. If an entry is in the cache longer than this, and nobody
uses it, it will be removed by a background thread.
</description>
</property>
</configuration>

View File

@ -23,24 +23,44 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
import org.apache.commons.lang.SystemUtils;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.ZeroCopyCursor;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestBlockReaderLocal {
private static TemporarySocketDirectory sockDir;
@BeforeClass
public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
@AfterClass
public static void shutdown() throws IOException {
sockDir.close();
}
public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
int off2, int len) {
for (int i = 0; i < len; i++) {
@ -100,10 +120,11 @@ public class TestBlockReaderLocal {
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
@ -138,6 +159,7 @@ public class TestBlockReaderLocal {
test.doTest(blockReaderLocal, original);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close();
if (checkIn != null) checkIn.close();
@ -382,10 +404,11 @@ public class TestBlockReaderLocal {
final long RANDOM_SEED = 4567L;
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
@ -417,8 +440,327 @@ public class TestBlockReaderLocal {
} finally {
DFSInputStream.tcpReadsDisabledForTesting = false;
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (sockDir != null) sockDir.close();
}
}
private static byte[] byteBufferToArray(ByteBuffer buf) {
byte resultArray[] = new byte[buf.remaining()];
buf.get(resultArray);
return resultArray;
}
public static HdfsConfiguration initZeroCopyTest() {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
sockDir = new TemporarySocketDirectory();
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestRequestMmapAccess._PORT.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
return conf;
}
@Test
public void testZeroCopyReads() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
FSDataInputStream fsIn = null;
ZeroCopyCursor zcursor = null;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
IOUtils.readFully(fsIn, original, 0,
BlockReaderLocalTest.TEST_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
zcursor = fsIn.createZeroCopyCursor();
zcursor.setFallbackBuffer(ByteBuffer.
allocateDirect(1024 * 1024 * 4));
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
zcursor.read(4096);
ByteBuffer result = zcursor.getData();
Assert.assertEquals(4096, result.remaining());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
byteBufferToArray(result));
} finally {
if (zcursor != null) zcursor.close();
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
@Test
public void testShortZeroCopyReads() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
FSDataInputStream fsIn = null;
ZeroCopyCursor zcursor = null;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
IOUtils.readFully(fsIn, original, 0,
BlockReaderLocalTest.TEST_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
zcursor = fsIn.createZeroCopyCursor();
zcursor.setFallbackBuffer(ByteBuffer.
allocateDirect(1024 * 1024 * 4));
zcursor.setAllowShortReads(true);
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
zcursor.read(8192);
ByteBuffer result = zcursor.getData();
Assert.assertEquals(4096, result.remaining());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
byteBufferToArray(result));
zcursor.read(4097);
result = zcursor.getData();
Assert.assertEquals(4096, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
byteBufferToArray(result));
zcursor.setAllowShortReads(false);
zcursor.read(4100);
result = zcursor.getData();
Assert.assertEquals(4100, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 8192, 12292),
byteBufferToArray(result));
} finally {
if (zcursor != null) zcursor.close();
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
@Test
public void testZeroCopyReadsNoBackingBuffer() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
FSDataInputStream fsIn = null;
ZeroCopyCursor zcursor = null;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, 7567L);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
IOUtils.readFully(fsIn, original, 0,
BlockReaderLocalTest.TEST_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
zcursor = fsIn.createZeroCopyCursor();
zcursor.setAllowShortReads(false);
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
// This read is longer than the file, and we do not have short reads enabled.
try {
zcursor.read(8192);
Assert.fail("expected UnsupportedOperationException");
} catch (UnsupportedOperationException e) {
// expected
}
// This read is longer than the block, and we do not have short reads enabled.
try {
zcursor.read(4097);
Assert.fail("expected UnsupportedOperationException");
} catch (UnsupportedOperationException e) {
// expected
}
// This read should succeed.
zcursor.read(4096);
ByteBuffer result = zcursor.getData();
Assert.assertEquals(4096, result.remaining());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
byteBufferToArray(result));
} finally {
if (zcursor != null) zcursor.close();
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
private static class CountingVisitor
implements ClientMmapManager.ClientMmapVisitor {
int count = 0;
@Override
public void accept(ClientMmap mmap) {
count++;
}
public void reset() {
count = 0;
}
}
@Test
public void testZeroCopyMmapCache() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
FSDataInputStream fsIn = null;
ZeroCopyCursor zcursor[] = { null, null, null, null, null };
DistributedFileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
for (int i = 0; i < zcursor.length; i++) {
zcursor[i] = fsIn.createZeroCopyCursor();
zcursor[i].setAllowShortReads(false);
}
ClientMmapManager mmapManager = fs.getClient().getMmapManager();
CountingVisitor countingVisitor = new CountingVisitor();
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
mmapManager.visitEvictable(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
zcursor[0].read(4096);
fsIn.seek(0);
zcursor[1].read(4096);
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(1, countingVisitor.count);
countingVisitor.reset();
mmapManager.visitEvictable(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
countingVisitor.reset();
// The mmaps should be of the first block of the file.
final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
@Override
public void accept(ClientMmap mmap) {
Assert.assertEquals(firstBlock, mmap.getBlock());
}
});
// Read more blocks.
zcursor[2].read(4096);
zcursor[3].read(4096);
try {
zcursor[4].read(4096);
Assert.fail("expected UnsupportedOperationException");
} catch (UnsupportedOperationException e) {
// expected
}
// we should have 3 mmaps, 0 evictable
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(3, countingVisitor.count);
countingVisitor.reset();
mmapManager.visitEvictable(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
// After we close the cursors, the mmaps should be evictable for
// a brief period of time. Then, they should be closed (we're
// using a very quick timeout)
for (int i = 0; i < zcursor.length; i++) {
IOUtils.closeStream(zcursor[i]);
}
while (true) {
countingVisitor.reset();
mmapManager.visitEvictable(countingVisitor);
if (0 == countingVisitor.count) {
break;
}
}
countingVisitor.reset();
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
}