HDFS-5260. Merge zero-copy memory-mapped HDFS client reads to trunk and branch-2. Contributed by Chris Nauroth.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1527113 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2013-09-27 22:51:12 +00:00
parent b2e174c9a5
commit eccdb9aa8b
36 changed files with 3493 additions and 62 deletions

View File

@ -0,0 +1,113 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.ByteBufferPool;
import com.google.common.base.Preconditions;
@InterfaceAudience.Private
@InterfaceStability.Evolving
public final class ByteBufferUtil {
/**
* Determine if a stream can do a byte buffer read via read(ByteBuffer buf)
*/
private static boolean streamHasByteBufferRead(InputStream stream) {
if (!(stream instanceof ByteBufferReadable)) {
return false;
}
if (!(stream instanceof FSDataInputStream)) {
return true;
}
return ((FSDataInputStream)stream).getWrappedStream()
instanceof ByteBufferReadable;
}
/**
* Perform a fallback read.
*/
public static ByteBuffer fallbackRead(
InputStream stream, ByteBufferPool bufferPool, int maxLength)
throws IOException {
if (bufferPool == null) {
throw new UnsupportedOperationException("zero-copy reads " +
"were not available, and you did not provide a fallback " +
"ByteBufferPool.");
}
boolean useDirect = streamHasByteBufferRead(stream);
ByteBuffer buffer = bufferPool.getBuffer(useDirect, maxLength);
if (buffer == null) {
throw new UnsupportedOperationException("zero-copy reads " +
"were not available, and the ByteBufferPool did not provide " +
"us with " + (useDirect ? "a direct" : "an indirect") +
"buffer.");
}
Preconditions.checkState(buffer.capacity() > 0);
Preconditions.checkState(buffer.isDirect() == useDirect);
maxLength = Math.min(maxLength, buffer.capacity());
boolean success = false;
try {
if (useDirect) {
buffer.clear();
buffer.limit(maxLength);
ByteBufferReadable readable = (ByteBufferReadable)stream;
int totalRead = 0;
while (true) {
if (totalRead >= maxLength) {
success = true;
break;
}
int nRead = readable.read(buffer);
if (nRead < 0) {
if (totalRead > 0) {
success = true;
}
break;
}
totalRead += nRead;
}
buffer.flip();
} else {
buffer.clear();
int nRead = stream.read(buffer.array(),
buffer.arrayOffset(), maxLength);
if (nRead >= 0) {
buffer.limit(nRead);
success = true;
}
}
} finally {
if (!success) {
// If we got an error while reading, or if we are at EOF, we
// don't need the buffer any more. We can give it back to the
// bufferPool.
bufferPool.putBuffer(buffer);
buffer = null;
}
}
return buffer;
}
}

View File

@ -1,4 +1,5 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -19,17 +20,29 @@ package org.apache.hadoop.fs;
import java.io.*;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.fs.ByteBufferUtil;
import org.apache.hadoop.util.IdentityHashStore;
/** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream}
* and buffers input through a {@link BufferedInputStream}. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable, Closeable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead {
implements Seekable, PositionedReadable, Closeable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
*/
private final IdentityHashStore<ByteBuffer, ByteBufferPool>
extendedReadBuffers
= new IdentityHashStore<ByteBuffer, ByteBufferPool>(0);
public FSDataInputStream(InputStream in)
throws IOException {
@ -167,4 +180,45 @@ public class FSDataInputStream extends DataInputStream
"support setting the drop-behind caching setting.");
}
}
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
try {
return ((HasEnhancedByteBufferAccess)in).read(bufferPool,
maxLength, opts);
}
catch (ClassCastException e) {
ByteBuffer buffer = ByteBufferUtil.
fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
extendedReadBuffers.put(buffer, bufferPool);
}
return buffer;
}
}
private static final EnumSet<ReadOption> EMPTY_READ_OPTIONS_SET =
EnumSet.noneOf(ReadOption.class);
final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength)
throws IOException, UnsupportedOperationException {
return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET);
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
try {
((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer);
}
catch (ClassCastException e) {
ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer);
if (bufferPool == null) {
throw new IllegalArgumentException("tried to release a buffer " +
"that was not created by this stream.");
}
bufferPool.putBuffer(buffer);
}
}
}

View File

@ -18,9 +18,11 @@
package org.apache.hadoop.fs;
import java.io.*;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ZeroCopyUnavailableException;
/****************************************************************
* FSInputStream is a generic old InputStream with a little bit

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.ByteBufferPool;
/**
* FSDataInputStreams implement this interface to provide enhanced
* byte buffer access. Usually this takes the form of mmap support.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface HasEnhancedByteBufferAccess {
/**
* Get a ByteBuffer containing file data.
*
* This ByteBuffer may come from the stream itself, via a call like mmap,
* or it may come from the ByteBufferFactory which is passed in as an
* argument.
*
* @param factory
* If this is non-null, it will be used to create a fallback
* ByteBuffer when the stream itself cannot create one.
* @param maxLength
* The maximum length of buffer to return. We may return a buffer
* which is shorter than this.
* @param opts
* Options to use when reading.
*
* @return
* We will return null on EOF (and only on EOF).
* Otherwise, we will return a direct ByteBuffer containing at
* least one byte. You must free this ByteBuffer when you are
* done with it by calling releaseBuffer on it.
* The buffer will continue to be readable until it is released
* in this manner. However, the input stream's close method may
* warn about unclosed buffers.
* @throws
* IOException: if there was an error reading.
* UnsupportedOperationException: if factory was null, and we
* needed an external byte buffer. UnsupportedOperationException
* will never be thrown unless the factory argument is null.
*/
public ByteBuffer read(ByteBufferPool factory, int maxLength,
EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException;
/**
* Release a ByteBuffer which was created by the enhanced ByteBuffer read
* function. You must not continue using the ByteBuffer after calling this
* function.
*
* @param buffer
* The ByteBuffer to release.
*/
public void releaseBuffer(ByteBuffer buffer);
}

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Options that can be used when reading from a FileSystem.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public enum ReadOption {
/**
* Skip checksums when reading. This option may be useful when reading a file
* format that has built-in checksums, or for testing purposes.
*/
SKIP_CHECKSUMS,
}

View File

@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.IOException;
public class ZeroCopyUnavailableException extends IOException {
private static final long serialVersionUID = 0L;
public ZeroCopyUnavailableException(String message) {
super(message);
}
public ZeroCopyUnavailableException(String message, Exception e) {
super(message, e);
}
public ZeroCopyUnavailableException(Exception e) {
super(e);
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface ByteBufferPool {
/**
* Get a new direct ByteBuffer. The pool can provide this from
* removing a buffer from its internal cache, or by allocating a
* new buffer.
*
* @param direct Whether the buffer should be direct.
* @param length The minimum length the buffer will have.
* @return A new ByteBuffer. This ByteBuffer must be direct.
* Its capacity can be less than what was requested, but
* must be at least 1 byte.
*/
ByteBuffer getBuffer(boolean direct, int length);
/**
* Release a buffer back to the pool.
* The pool may choose to put this buffer into its cache.
*
* @param buffer a direct bytebuffer
*/
void putBuffer(ByteBuffer buffer);
}

View File

@ -0,0 +1,118 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.io;
import com.google.common.collect.ComparisonChain;
import org.apache.commons.lang.builder.HashCodeBuilder;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.TreeMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* This is a simple ByteBufferPool which just creates ByteBuffers as needed.
* It also caches ByteBuffers after they're released. It will always return
* the smallest cached buffer with at least the capacity you request.
* We don't try to do anything clever here like try to limit the maximum cache
* size.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public final class ElasticByteBufferPool implements ByteBufferPool {
private static final class Key implements Comparable<Key> {
private final int capacity;
private final long insertionTime;
Key(int capacity, long insertionTime) {
this.capacity = capacity;
this.insertionTime = insertionTime;
}
@Override
public int compareTo(Key other) {
return ComparisonChain.start().
compare(capacity, other.capacity).
compare(insertionTime, other.insertionTime).
result();
}
@Override
public boolean equals(Object rhs) {
if (rhs == null) {
return false;
}
try {
Key o = (Key)rhs;
return (compareTo(o) == 0);
} catch (ClassCastException e) {
return false;
}
}
@Override
public int hashCode() {
return new HashCodeBuilder().
append(capacity).
append(insertionTime).
toHashCode();
}
}
private final TreeMap<Key, ByteBuffer> buffers =
new TreeMap<Key, ByteBuffer>();
private final TreeMap<Key, ByteBuffer> directBuffers =
new TreeMap<Key, ByteBuffer>();
private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
return direct ? directBuffers : buffers;
}
@Override
public synchronized ByteBuffer getBuffer(boolean direct, int length) {
TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
Map.Entry<Key, ByteBuffer> entry =
tree.ceilingEntry(new Key(length, 0));
if (entry == null) {
return direct ? ByteBuffer.allocateDirect(length) :
ByteBuffer.allocate(length);
}
tree.remove(entry.getKey());
return entry.getValue();
}
@Override
public synchronized void putBuffer(ByteBuffer buffer) {
TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
while (true) {
Key key = new Key(buffer.capacity(), System.nanoTime());
if (!tree.containsKey(key)) {
tree.put(key, buffer);
return;
}
// Buffers are indexed by (capacity, time).
// If our key is not unique on the first try, we try again, since the
// time will be different. Since we use nanoseconds, it's pretty
// unlikely that we'll loop even once, unless the system clock has a
// poor granularity.
}
}
}

View File

@ -0,0 +1,197 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
/**
* The IdentityHashStore stores (key, value) mappings in an array.
* It is similar to java.util.HashTable, but much more lightweight.
* Neither inserting nor removing an element ever leads to any garbage
* getting created (assuming the array doesn't need to be enlarged).
*
* Unlike HashTable, it compares keys using
* {@link System#identityHashCode(Object)} and the identity operator.
* This is useful for types like ByteBuffer which have expensive hashCode
* and equals operators.
*
* We use linear probing to resolve collisions. This avoids the need for
* the overhead of linked list data structures. It also means that it is
* expensive to attempt to remove an element that isn't there, since we
* have to look at the entire array to be sure that it doesn't exist.
*
* @param <K> The key type to use.
* @param <V> THe value type to use.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
@SuppressWarnings("unchecked")
public final class IdentityHashStore<K, V> {
/**
* Even elements are keys; odd elements are values.
* The array has size 1 + Math.pow(2, capacity).
*/
private Object buffer[];
private int numInserted = 0;
private int capacity;
/**
* The default maxCapacity value to use.
*/
private static final int DEFAULT_MAX_CAPACITY = 2;
public IdentityHashStore(int capacity) {
Preconditions.checkArgument(capacity >= 0);
if (capacity == 0) {
this.capacity = 0;
this.buffer = null;
} else {
// Round the capacity we need up to a power of 2.
realloc((int)Math.pow(2,
Math.ceil(Math.log(capacity) / Math.log(2))));
}
}
private void realloc(int newCapacity) {
Preconditions.checkArgument(newCapacity > 0);
Object prevBuffer[] = buffer;
this.capacity = newCapacity;
// Each element takes two array slots -- one for the key,
// and another for the value. We also want a load factor
// of 0.50. Combine those together and you get 4 * newCapacity.
this.buffer = new Object[4 * newCapacity];
this.numInserted = 0;
if (prevBuffer != null) {
for (int i = 0; i < prevBuffer.length; i += 2) {
if (prevBuffer[i] != null) {
putInternal(prevBuffer[i], prevBuffer[i + 1]);
}
}
}
}
private void putInternal(Object k, Object v) {
int hash = System.identityHashCode(k);
final int numEntries = buffer.length / 2;
int index = hash % numEntries;
while (true) {
if (buffer[2 * index] == null) {
buffer[2 * index] = k;
buffer[1 + (2 * index)] = v;
numInserted++;
return;
}
index = (index + 1) % numEntries;
}
}
/**
* Add a new (key, value) mapping.
*
* Inserting a new (key, value) never overwrites a previous one.
* In other words, you can insert the same key multiple times and it will
* lead to multiple entries.
*/
public void put(K k, V v) {
Preconditions.checkNotNull(k);
if (buffer == null) {
realloc(DEFAULT_MAX_CAPACITY);
} else if (numInserted + 1 > capacity) {
realloc(capacity * 2);
}
putInternal(k, v);
}
private int getElementIndex(K k) {
if (buffer == null) {
return -1;
}
final int numEntries = buffer.length / 2;
int hash = System.identityHashCode(k);
int index = hash % numEntries;
int firstIndex = index;
do {
if (buffer[2 * index] == k) {
return index;
}
index = (index + 1) % numEntries;
} while (index != firstIndex);
return -1;
}
/**
* Retrieve a value associated with a given key.
*/
public V get(K k) {
int index = getElementIndex(k);
if (index < 0) {
return null;
}
return (V)buffer[1 + (2 * index)];
}
/**
* Retrieve a value associated with a given key, and delete the
* relevant entry.
*/
public V remove(K k) {
int index = getElementIndex(k);
if (index < 0) {
return null;
}
V val = (V)buffer[1 + (2 * index)];
buffer[2 * index] = null;
buffer[1 + (2 * index)] = null;
numInserted--;
return val;
}
public boolean isEmpty() {
return numInserted == 0;
}
public int numElements() {
return numInserted;
}
public int capacity() {
return capacity;
}
public interface Visitor<K, V> {
void accept(K k, V v);
}
/**
* Visit all key, value pairs in the IdentityHashStore.
*/
public void visitAll(Visitor<K, V> visitor) {
int length = buffer == null ? 0 : buffer.length;
for (int i = 0; i < length; i += 2) {
if (buffer[i] != null) {
visitor.accept((K)buffer[i], (V)buffer[i + 1]);
}
}
}
}

View File

@ -0,0 +1,159 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.IdentityHashStore;
import org.apache.hadoop.util.IdentityHashStore.Visitor;
import org.junit.Test;
public class TestIdentityHashStore {
private static final Log LOG = LogFactory.getLog(TestIdentityHashStore.class.getName());
private static class Key {
private final String name;
Key(String name) {
this.name = name;
}
@Override
public int hashCode() {
throw new RuntimeException("should not be used!");
}
@Override
public boolean equals(Object o) {
if (!(o instanceof Key)) {
return false;
}
Key other = (Key)o;
return name.equals(other.name);
}
}
@Test(timeout=60000)
public void testStartingWithZeroCapacity() {
IdentityHashStore<Key, Integer> store =
new IdentityHashStore<Key, Integer>(0);
store.visitAll(new Visitor<Key, Integer>() {
@Override
public void accept(Key k, Integer v) {
Assert.fail("found key " + k + " in empty IdentityHashStore.");
}
});
Assert.assertTrue(store.isEmpty());
final Key key1 = new Key("key1");
Integer value1 = new Integer(100);
store.put(key1, value1);
Assert.assertTrue(!store.isEmpty());
Assert.assertEquals(value1, store.get(key1));
store.visitAll(new Visitor<Key, Integer>() {
@Override
public void accept(Key k, Integer v) {
Assert.assertEquals(key1, k);
}
});
Assert.assertEquals(value1, store.remove(key1));
Assert.assertTrue(store.isEmpty());
}
@Test(timeout=60000)
public void testDuplicateInserts() {
IdentityHashStore<Key, Integer> store =
new IdentityHashStore<Key, Integer>(4);
store.visitAll(new Visitor<Key, Integer>() {
@Override
public void accept(Key k, Integer v) {
Assert.fail("found key " + k + " in empty IdentityHashStore.");
}
});
Assert.assertTrue(store.isEmpty());
Key key1 = new Key("key1");
Integer value1 = new Integer(100);
Integer value2 = new Integer(200);
Integer value3 = new Integer(300);
store.put(key1, value1);
Key equalToKey1 = new Key("key1");
// IdentityHashStore compares by object equality, not equals()
Assert.assertNull(store.get(equalToKey1));
Assert.assertTrue(!store.isEmpty());
Assert.assertEquals(value1, store.get(key1));
store.put(key1, value2);
store.put(key1, value3);
final List<Integer> allValues = new LinkedList<Integer>();
store.visitAll(new Visitor<Key, Integer>() {
@Override
public void accept(Key k, Integer v) {
allValues.add(v);
}
});
Assert.assertEquals(3, allValues.size());
for (int i = 0; i < 3; i++) {
Integer value = store.remove(key1);
Assert.assertTrue(allValues.remove(value));
}
Assert.assertNull(store.remove(key1));
Assert.assertTrue(store.isEmpty());
}
@Test(timeout=60000)
public void testAdditionsAndRemovals() {
IdentityHashStore<Key, Integer> store =
new IdentityHashStore<Key, Integer>(0);
final int NUM_KEYS = 1000;
LOG.debug("generating " + NUM_KEYS + " keys");
final List<Key> keys = new ArrayList<Key>(NUM_KEYS);
for (int i = 0; i < NUM_KEYS; i++) {
keys.add(new Key("key " + i));
}
for (int i = 0; i < NUM_KEYS; i++) {
store.put(keys.get(i), i);
}
store.visitAll(new Visitor<Key, Integer>() {
@Override
public void accept(Key k, Integer v) {
Assert.assertTrue(keys.contains(k));
}
});
for (int i = 0; i < NUM_KEYS; i++) {
Assert.assertEquals(Integer.valueOf(i),
store.remove(keys.get(i)));
}
store.visitAll(new Visitor<Key, Integer>() {
@Override
public void accept(Key k, Integer v) {
Assert.fail("expected all entries to be removed");
}
});
Assert.assertTrue("expected the store to be " +
"empty, but found " + store.numElements() + " elements.",
store.isEmpty());
Assert.assertEquals(1024, store.capacity());
}
}

View File

@ -250,6 +250,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5122. Support failover and retry in WebHdfsFileSystem for NN HA.
(Haohui Mai via jing9)
HDFS-4953. Enable HDFS local reads via mmap.
(Colin Patrick McCabe via wang).
IMPROVEMENTS
HDFS-4657. Limit the number of blocks logged by the NN after a block
@ -291,6 +294,12 @@ Release 2.3.0 - UNRELEASED
HDFS-5240. Separate formatting from logging in the audit logger API (daryn)
HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more
intuitive. (Contributed by Colin Patrick McCabe)
HDFS-5260. Merge zero-copy memory-mapped HDFS client reads to trunk and
branch-2. (cnauroth)
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@ -316,6 +325,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5031. BlockScanner scans the block multiple times. (Vinay via Arpit
Agarwal)
HDFS-5266. ElasticByteBufferPool#Key does not implement equals. (cnauroth)
Release 2.2.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -330,4 +330,14 @@
<Method name="setDirInternal" />
<Bug pattern="DM_STRING_CTOR" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
<Method name="create" />
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
<Method name="create" />
<Bug pattern="UL_UNRELEASED_LOCK" />
</Match>
</FindBugsFilter>

View File

@ -142,6 +142,7 @@ target_link_libraries(test_native_mini_dfs
)
add_executable(test_libhdfs_threaded
main/native/libhdfs/expect.c
main/native/libhdfs/test_libhdfs_threaded.c
)
target_link_libraries(test_libhdfs_threaded
@ -150,6 +151,16 @@ target_link_libraries(test_libhdfs_threaded
pthread
)
add_executable(test_libhdfs_zerocopy
main/native/libhdfs/expect.c
main/native/libhdfs/test/test_libhdfs_zerocopy.c
)
target_link_libraries(test_libhdfs_zerocopy
hdfs
native_mini_dfs
pthread
)
IF(REQUIRE_LIBWEBHDFS)
add_subdirectory(contrib/libwebhdfs)
ENDIF(REQUIRE_LIBWEBHDFS)

View File

@ -20,12 +20,16 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
/**
* A BlockReader is responsible for reading a single block
* from a single datanode.
*/
public interface BlockReader extends ByteBufferReadable {
/* same interface as inputStream java.io.InputStream#read()
* used by DFSInputStream#read()
@ -81,4 +85,14 @@ public interface BlockReader extends ByteBufferReadable {
* All short-circuit reads are also local.
*/
boolean isShortCircuit();
/**
* Get a ClientMmap object for this BlockReader.
*
* @param curBlock The current block.
* @return The ClientMmap object, or null if mmap is not
* supported.
*/
ClientMmap getClientMmap(LocatedBlock curBlock,
ClientMmapManager mmapManager);
}

View File

@ -22,11 +22,15 @@ import java.io.DataInputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.io.IOUtils;
@ -87,6 +91,8 @@ class BlockReaderLocal implements BlockReader {
private final ExtendedBlock block;
private final FileInputStreamCache fisCache;
private ClientMmap clientMmap;
private boolean mmapDisabled;
private static int getSlowReadBufferNumChunks(int bufSize,
int bytesPerChecksum) {
@ -113,6 +119,8 @@ class BlockReaderLocal implements BlockReader {
this.datanodeID = datanodeID;
this.block = block;
this.fisCache = fisCache;
this.clientMmap = null;
this.mmapDisabled = false;
// read and handle the common header here. For now just a version
checksumIn.getChannel().position(0);
@ -487,6 +495,10 @@ class BlockReaderLocal implements BlockReader {
@Override
public synchronized void close() throws IOException {
if (clientMmap != null) {
clientMmap.unref();
clientMmap = null;
}
if (fisCache != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("putting FileInputStream for " + filename +
@ -534,4 +546,30 @@ class BlockReaderLocal implements BlockReader {
public boolean isShortCircuit() {
return true;
}
@Override
public ClientMmap getClientMmap(LocatedBlock curBlock,
ClientMmapManager mmapManager) {
if (clientMmap == null) {
if (mmapDisabled) {
return null;
}
try {
clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
if (clientMmap == null) {
mmapDisabled = true;
return null;
}
} catch (InterruptedException e) {
LOG.error("Interrupted while setting up mmap for " + filename, e);
Thread.currentThread().interrupt();
return null;
} catch (IOException e) {
LOG.error("unable to set up mmap for " + filename, e);
mmapDisabled = true;
return null;
}
}
return clientMmap;
}
}

View File

@ -28,6 +28,8 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -35,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
@ -701,4 +704,10 @@ class BlockReaderLocalLegacy implements BlockReader {
public boolean isShortCircuit() {
return true;
}
@Override
public ClientMmap getClientMmap(LocatedBlock curBlock,
ClientMmapManager mmapManager) {
return null;
}
}

View File

@ -104,6 +104,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
@ -206,7 +207,43 @@ public class DFSClient implements java.io.Closeable {
private boolean shouldUseLegacyBlockReaderLocal;
private final CachingStrategy defaultReadCachingStrategy;
private final CachingStrategy defaultWriteCachingStrategy;
private ClientMmapManager mmapManager;
private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY =
new ClientMmapManagerFactory();
private static final class ClientMmapManagerFactory {
private ClientMmapManager mmapManager = null;
/**
* Tracks the number of users of mmapManager.
*/
private int refcnt = 0;
synchronized ClientMmapManager get(Configuration conf) {
if (refcnt++ == 0) {
mmapManager = ClientMmapManager.fromConf(conf);
} else {
String mismatches = mmapManager.verifyConfigurationMatches(conf);
if (!mismatches.isEmpty()) {
LOG.warn("The ClientMmapManager settings you specified " +
"have been ignored because another thread created the " +
"ClientMmapManager first. " + mismatches);
}
}
return mmapManager;
}
synchronized void unref(ClientMmapManager mmapManager) {
if (this.mmapManager != mmapManager) {
throw new IllegalArgumentException();
}
if (--refcnt == 0) {
IOUtils.cleanup(LOG, mmapManager);
mmapManager = null;
}
}
}
/**
* DFSClient configuration
*/
@ -534,6 +571,7 @@ public class DFSClient implements java.io.Closeable {
new CachingStrategy(readDropBehind, readahead);
this.defaultWriteCachingStrategy =
new CachingStrategy(writeDropBehind, readahead);
this.mmapManager = MMAP_MANAGER_FACTORY.get(conf);
}
/**
@ -738,9 +776,12 @@ public class DFSClient implements java.io.Closeable {
/** Abort and release resources held. Ignore all errors. */
void abort() {
if (mmapManager != null) {
MMAP_MANAGER_FACTORY.unref(mmapManager);
mmapManager = null;
}
clientRunning = false;
closeAllFilesBeingWritten(true);
try {
// remove reference to this client and stop the renewer,
// if there is no more clients under the renewer.
@ -784,6 +825,10 @@ public class DFSClient implements java.io.Closeable {
*/
@Override
public synchronized void close() throws IOException {
if (mmapManager != null) {
MMAP_MANAGER_FACTORY.unref(mmapManager);
mmapManager = null;
}
if(clientRunning) {
closeAllFilesBeingWritten(false);
clientRunning = false;
@ -2496,4 +2541,9 @@ public class DFSClient implements java.io.Closeable {
public CachingStrategy getDefaultWriteCachingStrategy() {
return defaultWriteCachingStrategy;
}
@VisibleForTesting
public ClientMmapManager getMmapManager() {
return mmapManager;
}
}

View File

@ -376,6 +376,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024;
public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic";
public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false;
public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size";
public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024;
public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms";
public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT = 15 * 60 * 1000;
public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.timeout.ms";
public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT = 4;
// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";

View File

@ -24,6 +24,7 @@ import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@ -36,11 +37,15 @@ import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.ByteBufferUtil;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.HasEnhancedByteBufferAccess;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
@ -54,12 +59,14 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.IdentityHashStore;
import com.google.common.annotations.VisibleForTesting;
@ -69,7 +76,8 @@ import com.google.common.annotations.VisibleForTesting;
****************************************************************/
@InterfaceAudience.Private
public class DFSInputStream extends FSInputStream
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess {
@VisibleForTesting
static boolean tcpReadsDisabledForTesting = false;
private final PeerCache peerCache;
@ -87,17 +95,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
private CachingStrategy cachingStrategy;
private final ReadStatistics readStatistics = new ReadStatistics();
/**
* Track the ByteBuffers that we have handed out to readers.
*
* The value type can be either ByteBufferPool or ClientMmap, depending on
* whether we this is a memory-mapped buffer or not.
*/
private final IdentityHashStore<ByteBuffer, Object>
extendedReadBuffers = new IdentityHashStore<ByteBuffer, Object>(0);
public static class ReadStatistics {
public ReadStatistics() {
this.totalBytesRead = 0;
this.totalLocalBytesRead = 0;
this.totalShortCircuitBytesRead = 0;
this.totalZeroCopyBytesRead = 0;
}
public ReadStatistics(ReadStatistics rhs) {
this.totalBytesRead = rhs.getTotalBytesRead();
this.totalLocalBytesRead = rhs.getTotalLocalBytesRead();
this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead();
this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead();
}
/**
@ -123,6 +142,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
public long getTotalShortCircuitBytesRead() {
return totalShortCircuitBytesRead;
}
/**
* @return The total number of zero-copy bytes read.
*/
public long getTotalZeroCopyBytesRead() {
return totalZeroCopyBytesRead;
}
/**
* @return The total number of bytes read which were not local.
@ -145,12 +171,21 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
this.totalLocalBytesRead += amt;
this.totalShortCircuitBytesRead += amt;
}
void addZeroCopyBytes(long amt) {
this.totalBytesRead += amt;
this.totalLocalBytesRead += amt;
this.totalShortCircuitBytesRead += amt;
this.totalZeroCopyBytesRead += amt;
}
private long totalBytesRead;
private long totalLocalBytesRead;
private long totalShortCircuitBytesRead;
private long totalZeroCopyBytesRead;
}
private final FileInputStreamCache fileInputStreamCache;
@ -587,6 +622,20 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
}
dfsClient.checkOpen();
if (!extendedReadBuffers.isEmpty()) {
final StringBuilder builder = new StringBuilder();
extendedReadBuffers.visitAll(new IdentityHashStore.Visitor<ByteBuffer, Object>() {
private String prefix = "";
@Override
public void accept(ByteBuffer k, Object v) {
builder.append(prefix).append(k);
prefix = ", ";
}
});
DFSClient.LOG.warn("closing file " + src + ", but there are still " +
"unreleased ByteBuffers allocated by read(). " +
"Please release " + builder.toString() + ".");
}
if (blockReader != null) {
blockReader.close();
blockReader = null;
@ -1393,4 +1442,100 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead {
this.cachingStrategy.setDropBehind(dropBehind);
closeCurrentBlockReader();
}
@Override
public synchronized ByteBuffer read(ByteBufferPool bufferPool,
int maxLength, EnumSet<ReadOption> opts)
throws IOException, UnsupportedOperationException {
assert(maxLength > 0);
if (((blockReader == null) || (blockEnd == -1)) &&
(pos < getFileLength())) {
/*
* If we don't have a blockReader, or the one we have has no more bytes
* left to read, we call seekToBlockSource to get a new blockReader and
* recalculate blockEnd. Note that we assume we're not at EOF here
* (we check this above).
*/
if ((!seekToBlockSource(pos)) || (blockReader == null)) {
throw new IOException("failed to allocate new BlockReader " +
"at position " + pos);
}
}
boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
if (canSkipChecksums) {
ByteBuffer buffer = tryReadZeroCopy(maxLength);
if (buffer != null) {
return buffer;
}
}
ByteBuffer buffer = ByteBufferUtil.
fallbackRead(this, bufferPool, maxLength);
if (buffer != null) {
extendedReadBuffers.put(buffer, bufferPool);
}
return buffer;
}
private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
throws IOException {
// Java ByteBuffers can't be longer than 2 GB, because they use
// 4-byte signed integers to represent capacity, etc.
// So we can't mmap the parts of the block higher than the 2 GB offset.
// FIXME: we could work around this with multiple memory maps.
// See HDFS-5101.
long blockEnd32 = Math.min(Integer.MAX_VALUE, blockEnd);
long curPos = pos;
long blockLeft = blockEnd32 - curPos + 1;
if (blockLeft <= 0) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
curPos + " of " + src + "; blockLeft = " + blockLeft +
"; blockEnd32 = " + blockEnd32 + ", blockEnd = " + blockEnd +
"; maxLength = " + maxLength);
}
return null;
}
int length = Math.min((int)blockLeft, maxLength);
long blockStartInFile = currentLocatedBlock.getStartOffset();
long blockPos = curPos - blockStartInFile;
long limit = blockPos + length;
ClientMmap clientMmap =
blockReader.getClientMmap(currentLocatedBlock,
dfsClient.getMmapManager());
if (clientMmap == null) {
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +
curPos + " of " + src + "; BlockReader#getClientMmap returned " +
"null.");
}
return null;
}
seek(pos + length);
ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer();
buffer.position((int)blockPos);
buffer.limit((int)limit);
clientMmap.ref();
extendedReadBuffers.put(buffer, clientMmap);
readStatistics.addZeroCopyBytes(length);
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " +
"offset " + curPos + " via the zero-copy read path. " +
"blockEnd = " + blockEnd);
}
return buffer;
}
@Override
public synchronized void releaseBuffer(ByteBuffer buffer) {
Object val = extendedReadBuffers.remove(buffer);
if (val == null) {
throw new IllegalArgumentException("tried to release a buffer " +
"that was not created by this stream, " + buffer);
}
if (val instanceof ClientMmap) {
((ClientMmap)val).unref();
} else if (val instanceof ByteBufferPool) {
((ByteBufferPool)val).putBuffer(buffer);
}
}
}

View File

@ -27,9 +27,12 @@ import java.nio.ByteBuffer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
@ -485,4 +488,10 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader {
public boolean isShortCircuit() {
return false;
}
@Override
public ClientMmap getClientMmap(LocatedBlock curBlock,
ClientMmapManager mmapManager) {
return null;
}
}

View File

@ -29,9 +29,12 @@ import java.nio.channels.ReadableByteChannel;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver;
@ -40,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@ -451,4 +453,10 @@ public class RemoteBlockReader2 implements BlockReader {
public boolean isShortCircuit() {
return false;
}
@Override
public ClientMmap getClientMmap(LocatedBlock curBlock,
ClientMmapManager manager) {
return null;
}
}

View File

@ -0,0 +1,166 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.FileInputStream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel.MapMode;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* A memory-mapped region used by an HDFS client.
*
* This class includes a reference count and some other information used by
* ClientMmapManager to track and cache mmaps.
*/
@InterfaceAudience.Private
public class ClientMmap {
static final Log LOG = LogFactory.getLog(ClientMmap.class);
/**
* A reference to the manager of this mmap.
*
* This is only a weak reference to help minimize the damange done by
* code which leaks references accidentally.
*/
private final WeakReference<ClientMmapManager> manager;
/**
* The actual mapped memory region.
*/
private final MappedByteBuffer map;
/**
* A reference count tracking how many threads are using this object.
*/
private final AtomicInteger refCount = new AtomicInteger(1);
/**
* Block pertaining to this mmap
*/
private final ExtendedBlock block;
/**
* The DataNode where this mmap came from.
*/
private final DatanodeID datanodeID;
/**
* The monotonic time when this mmap was last evictable.
*/
private long lastEvictableTimeNs;
public static ClientMmap load(ClientMmapManager manager, FileInputStream in,
ExtendedBlock block, DatanodeID datanodeID)
throws IOException {
MappedByteBuffer map =
in.getChannel().map(MapMode.READ_ONLY, 0,
in.getChannel().size());
return new ClientMmap(manager, map, block, datanodeID);
}
private ClientMmap(ClientMmapManager manager, MappedByteBuffer map,
ExtendedBlock block, DatanodeID datanodeID)
throws IOException {
this.manager = new WeakReference<ClientMmapManager>(manager);
this.map = map;
this.block = block;
this.datanodeID = datanodeID;
this.lastEvictableTimeNs = 0;
}
/**
* Decrement the reference count on this object.
* Should be called with the ClientMmapManager lock held.
*/
public void unref() {
int count = refCount.decrementAndGet();
if (count < 0) {
throw new IllegalArgumentException("can't decrement the " +
"reference count on this ClientMmap lower than 0.");
} else if (count == 0) {
ClientMmapManager man = manager.get();
if (man == null) {
unmap();
} else {
man.makeEvictable(this);
}
}
}
/**
* Increment the reference count on this object.
*
* @return The new reference count.
*/
public int ref() {
return refCount.getAndIncrement();
}
@VisibleForTesting
public ExtendedBlock getBlock() {
return block;
}
DatanodeID getDatanodeID() {
return datanodeID;
}
public MappedByteBuffer getMappedByteBuffer() {
return map;
}
public void setLastEvictableTimeNs(long lastEvictableTimeNs) {
this.lastEvictableTimeNs = lastEvictableTimeNs;
}
public long getLastEvictableTimeNs() {
return this.lastEvictableTimeNs;
}
/**
* Unmap the memory region.
*
* There isn't any portable way to unmap a memory region in Java.
* So we use the sun.nio method here.
* Note that unmapping a memory region could cause crashes if code
* continues to reference the unmapped code. However, if we don't
* manually unmap the memory, we are dependent on the finalizer to
* do it, and we have no idea when the finalizer will run.
*/
void unmap() {
assert(refCount.get() == 0);
if (map instanceof sun.nio.ch.DirectBuffer) {
final sun.misc.Cleaner cleaner =
((sun.nio.ch.DirectBuffer) map).cleaner();
cleaner.clean();
}
}
}

View File

@ -0,0 +1,482 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.Closeable;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.ref.WeakReference;
import java.util.Iterator;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Tracks mmap instances used on an HDFS client.
*
* mmaps can be used concurrently by multiple threads at once.
* mmaps cannot be closed while they are in use.
*
* The cache is important for performance, because the first time an mmap is
* created, the page table entries (PTEs) are not yet set up.
* Even when reading data that is entirely resident in memory, reading an
* mmap the second time is faster.
*/
@InterfaceAudience.Private
public class ClientMmapManager implements Closeable {
public static final Log LOG = LogFactory.getLog(ClientMmapManager.class);
private boolean closed = false;
private final int cacheSize;
private final long timeoutNs;
private final int runsPerTimeout;
private final Lock lock = new ReentrantLock();
/**
* Maps block, datanode_id to the client mmap object.
* If the ClientMmap is in the process of being loaded,
* {@link Waitable<ClientMmap>#await()} will block.
*
* Protected by the ClientMmapManager lock.
*/
private final TreeMap<Key, Waitable<ClientMmap>> mmaps =
new TreeMap<Key, Waitable<ClientMmap>>();
/**
* Maps the last use time to the client mmap object.
* We ensure that each last use time is unique by inserting a jitter of a
* nanosecond or two if necessary.
*
* Protected by the ClientMmapManager lock.
* ClientMmap objects that are in use are never evictable.
*/
private final TreeMap<Long, ClientMmap> evictable =
new TreeMap<Long, ClientMmap>();
private final ScheduledThreadPoolExecutor executor =
new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder().
setDaemon(true).setNameFormat("ClientMmapManager").
build());
/**
* The CacheCleaner for this ClientMmapManager. We don't create this
* and schedule it until it becomes necessary.
*/
private CacheCleaner cacheCleaner;
/**
* Factory method to create a ClientMmapManager from a Hadoop
* configuration.
*/
public static ClientMmapManager fromConf(Configuration conf) {
return new ClientMmapManager(conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE,
DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT),
conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT),
conf.getInt(DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT,
DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT));
}
public ClientMmapManager(int cacheSize, long timeoutMs, int runsPerTimeout) {
this.cacheSize = cacheSize;
this.timeoutNs = timeoutMs * 1000000;
this.runsPerTimeout = runsPerTimeout;
}
long getTimeoutMs() {
return this.timeoutNs / 1000000;
}
int getRunsPerTimeout() {
return this.runsPerTimeout;
}
public String verifyConfigurationMatches(Configuration conf) {
StringBuilder bld = new StringBuilder();
int cacheSize = conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE,
DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT);
if (this.cacheSize != cacheSize) {
bld.append("You specified a cache size of ").append(cacheSize).
append(", but the existing cache size is ").append(this.cacheSize).
append(". ");
}
long timeoutMs = conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS,
DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT);
if (getTimeoutMs() != timeoutMs) {
bld.append("You specified a cache timeout of ").append(timeoutMs).
append(" ms, but the existing cache timeout is ").
append(getTimeoutMs()).append("ms").append(". ");
}
int runsPerTimeout = conf.getInt(
DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT,
DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT);
if (getRunsPerTimeout() != runsPerTimeout) {
bld.append("You specified ").append(runsPerTimeout).
append(" runs per timeout, but the existing runs per timeout is ").
append(getTimeoutMs()).append(". ");
}
return bld.toString();
}
private static class Waitable<T> {
private T val;
private final Condition cond;
public Waitable(Condition cond) {
this.val = null;
this.cond = cond;
}
public T await() throws InterruptedException {
while (this.val == null) {
this.cond.await();
}
return this.val;
}
public void provide(T val) {
this.val = val;
this.cond.signalAll();
}
}
private static class Key implements Comparable<Key> {
private final ExtendedBlock block;
private final DatanodeID datanode;
Key(ExtendedBlock block, DatanodeID datanode) {
this.block = block;
this.datanode = datanode;
}
/**
* Compare two ClientMmap regions that we're storing.
*
* When we append to a block, we bump the genstamp. It is important to
* compare the genStamp here. That way, we will not return a shorter
* mmap than required.
*/
@Override
public int compareTo(Key o) {
return ComparisonChain.start().
compare(block.getBlockId(), o.block.getBlockId()).
compare(block.getGenerationStamp(), o.block.getGenerationStamp()).
compare(block.getBlockPoolId(), o.block.getBlockPoolId()).
compare(datanode, o.datanode).
result();
}
@Override
public boolean equals(Object rhs) {
if (rhs == null) {
return false;
}
try {
Key o = (Key)rhs;
return (compareTo(o) == 0);
} catch (ClassCastException e) {
return false;
}
}
@Override
public int hashCode() {
return block.hashCode() ^ datanode.hashCode();
}
}
/**
* Thread which handles expiring mmaps from the cache.
*/
private static class CacheCleaner implements Runnable, Closeable {
private WeakReference<ClientMmapManager> managerRef;
private ScheduledFuture<?> future;
CacheCleaner(ClientMmapManager manager) {
this.managerRef= new WeakReference<ClientMmapManager>(manager);
}
@Override
public void run() {
ClientMmapManager manager = managerRef.get();
if (manager == null) return;
long curTime = System.nanoTime();
try {
manager.lock.lock();
manager.evictStaleEntries(curTime);
} finally {
manager.lock.unlock();
}
}
void setFuture(ScheduledFuture<?> future) {
this.future = future;
}
@Override
public void close() throws IOException {
future.cancel(false);
}
}
/**
* Evict entries which are older than curTime + timeoutNs from the cache.
*
* NOTE: you must call this function with the lock held.
*/
private void evictStaleEntries(long curTime) {
if (closed) {
return;
}
Iterator<Entry<Long, ClientMmap>> iter =
evictable.entrySet().iterator();
while (iter.hasNext()) {
Entry<Long, ClientMmap> entry = iter.next();
if (entry.getKey() + timeoutNs >= curTime) {
return;
}
ClientMmap mmap = entry.getValue();
Key key = new Key(mmap.getBlock(), mmap.getDatanodeID());
mmaps.remove(key);
iter.remove();
mmap.unmap();
}
}
/**
* Evict one mmap object from the cache.
*
* NOTE: you must call this function with the lock held.
*
* @return True if an object was evicted; false if none
* could be evicted.
*/
private boolean evictOne() {
Entry<Long, ClientMmap> entry = evictable.pollFirstEntry();
if (entry == null) {
// We don't want to try creating another mmap region, because the
// cache is full.
return false;
}
ClientMmap evictedMmap = entry.getValue();
Key evictedKey = new Key(evictedMmap.getBlock(),
evictedMmap.getDatanodeID());
mmaps.remove(evictedKey);
evictedMmap.unmap();
return true;
}
/**
* Create a new mmap object.
*
* NOTE: you must call this function with the lock held.
*
* @param key The key which describes this mmap.
* @param in The input stream to use to create the mmap.
* @return The new mmap object, or null if there were
* insufficient resources.
* @throws IOException If there was an I/O error creating the mmap.
*/
private ClientMmap create(Key key, FileInputStream in) throws IOException {
if (mmaps.size() + 1 > cacheSize) {
if (!evictOne()) {
LOG.warn("mmap cache is full (with " + cacheSize + " elements) and " +
"nothing is evictable. Ignoring request for mmap with " +
"datanodeID=" + key.datanode + ", " + "block=" + key.block);
return null;
}
}
// Create the condition variable that other threads may wait on.
Waitable<ClientMmap> waitable =
new Waitable<ClientMmap>(lock.newCondition());
mmaps.put(key, waitable);
// Load the entry
boolean success = false;
ClientMmap mmap = null;
try {
try {
lock.unlock();
mmap = ClientMmap.load(this, in, key.block, key.datanode);
} finally {
lock.lock();
}
if (cacheCleaner == null) {
cacheCleaner = new CacheCleaner(this);
ScheduledFuture<?> future =
executor.scheduleAtFixedRate(cacheCleaner,
timeoutNs, timeoutNs / runsPerTimeout, TimeUnit.NANOSECONDS);
cacheCleaner.setFuture(future);
}
success = true;
} finally {
if (!success) {
LOG.warn("failed to create mmap for datanodeID=" + key.datanode +
", " + "block=" + key.block);
mmaps.remove(key);
}
waitable.provide(mmap);
}
if (LOG.isDebugEnabled()) {
LOG.info("created a new ClientMmap for block " + key.block +
" on datanode " + key.datanode);
}
return mmap;
}
/**
* Get or create an mmap region.
*
* @param node The DataNode that owns the block for this mmap region.
* @param block The block ID, block pool ID, and generation stamp of
* the block we want to read.
* @param in An open file for this block. This stream is only used
* if we have to create a new mmap; if we use an
* existing one, it is ignored.
*
* @return The client mmap region.
*/
public ClientMmap fetch(DatanodeID datanodeID, ExtendedBlock block,
FileInputStream in) throws IOException, InterruptedException {
LOG.debug("fetching mmap with datanodeID=" + datanodeID + ", " +
"block=" + block);
Key key = new Key(block, datanodeID);
ClientMmap mmap = null;
try {
lock.lock();
if (closed) {
throw new IOException("ClientMmapManager is closed.");
}
while (mmap == null) {
Waitable<ClientMmap> entry = mmaps.get(key);
if (entry == null) {
return create(key, in);
}
mmap = entry.await();
}
if (mmap.ref() == 1) {
// When going from nobody using the mmap (ref = 0) to somebody
// using the mmap (ref = 1), we must make the mmap un-evictable.
evictable.remove(mmap.getLastEvictableTimeNs());
}
}
finally {
lock.unlock();
}
if (LOG.isDebugEnabled()) {
LOG.debug("reusing existing mmap with datanodeID=" + datanodeID +
", " + "block=" + block);
}
return mmap;
}
/**
* Make an mmap evictable.
*
* When an mmap is evictable, it may be removed from the cache if necessary.
* mmaps can only be evictable if nobody is using them.
*
* @param mmap The mmap to make evictable.
*/
void makeEvictable(ClientMmap mmap) {
try {
lock.lock();
if (closed) {
// If this ClientMmapManager is closed, then don't bother with the
// cache; just close the mmap.
mmap.unmap();
return;
}
long now = System.nanoTime();
while (evictable.containsKey(now)) {
now++;
}
mmap.setLastEvictableTimeNs(now);
evictable.put(now, mmap);
} finally {
lock.unlock();
}
}
@Override
public void close() throws IOException {
try {
lock.lock();
closed = true;
IOUtils.cleanup(LOG, cacheCleaner);
// Unmap all the mmaps that nobody is using.
// The ones which are in use will be unmapped just as soon as people stop
// using them.
evictStaleEntries(Long.MAX_VALUE);
executor.shutdown();
} finally {
lock.unlock();
}
}
@VisibleForTesting
public interface ClientMmapVisitor {
void accept(ClientMmap mmap);
}
@VisibleForTesting
public synchronized void visitMmaps(ClientMmapVisitor visitor)
throws InterruptedException {
for (Waitable<ClientMmap> entry : mmaps.values()) {
visitor.accept(entry.await());
}
}
public void visitEvictable(ClientMmapVisitor visitor)
throws InterruptedException {
for (ClientMmap mmap : evictable.values()) {
visitor.accept(mmap);
}
}
}

View File

@ -0,0 +1,68 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "expect.h"
#include "hdfs.h"
#include <inttypes.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
int expectFileStats(hdfsFile file,
uint64_t expectedTotalBytesRead,
uint64_t expectedTotalLocalBytesRead,
uint64_t expectedTotalShortCircuitBytesRead,
uint64_t expectedTotalZeroCopyBytesRead)
{
struct hdfsReadStatistics *stats = NULL;
EXPECT_ZERO(hdfsFileGetReadStatistics(file, &stats));
fprintf(stderr, "expectFileStats(expectedTotalBytesRead=%"PRId64", "
"expectedTotalLocalBytesRead=%"PRId64", "
"expectedTotalShortCircuitBytesRead=%"PRId64", "
"expectedTotalZeroCopyBytesRead=%"PRId64", "
"totalBytesRead=%"PRId64", "
"totalLocalBytesRead=%"PRId64", "
"totalShortCircuitBytesRead=%"PRId64", "
"totalZeroCopyBytesRead=%"PRId64")\n",
expectedTotalBytesRead,
expectedTotalLocalBytesRead,
expectedTotalShortCircuitBytesRead,
expectedTotalZeroCopyBytesRead,
stats->totalBytesRead,
stats->totalLocalBytesRead,
stats->totalShortCircuitBytesRead,
stats->totalZeroCopyBytesRead);
if (expectedTotalBytesRead != UINT64_MAX) {
EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead);
}
if (expectedTotalLocalBytesRead != UINT64_MAX) {
EXPECT_INT64_EQ(expectedTotalLocalBytesRead,
stats->totalLocalBytesRead);
}
if (expectedTotalShortCircuitBytesRead != UINT64_MAX) {
EXPECT_INT64_EQ(expectedTotalShortCircuitBytesRead,
stats->totalShortCircuitBytesRead);
}
if (expectedTotalZeroCopyBytesRead != UINT64_MAX) {
EXPECT_INT64_EQ(expectedTotalZeroCopyBytesRead,
stats->totalZeroCopyBytesRead);
}
hdfsFileFreeReadStatistics(stats);
return 0;
}

View File

@ -19,16 +19,19 @@
#ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H
#define LIBHDFS_NATIVE_TESTS_EXPECT_H
#include <inttypes.h>
#include <stdio.h>
struct hdfsFile_internal;
#define EXPECT_ZERO(x) \
do { \
int __my_ret__ = x; \
if (__my_ret__) { \
int __my_errno__ = errno; \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): got nonzero from %s\n", \
__LINE__, __my_ret__, __my_errno__, #x); \
__FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
return __my_ret__; \
} \
} while (0);
@ -38,9 +41,9 @@
void* __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ != NULL) { \
fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
"got non-NULL value %p from %s\n", \
__LINE__, __my_errno__, __my_ret__, #x); \
__FILE__, __LINE__, __my_errno__, __my_ret__, #x); \
return -1; \
} \
} while (0);
@ -50,8 +53,8 @@
void* __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ == NULL) { \
fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
"got NULL from %s\n", __LINE__, __my_errno__, #x); \
fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
"got NULL from %s\n", __FILE__, __LINE__, __my_errno__, #x); \
return -1; \
} \
} while (0);
@ -61,15 +64,16 @@
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ != -1) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): expected -1 from %s\n", __LINE__, \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): expected -1 from %s\n", \
__FILE__, __LINE__, \
__my_ret__, __my_errno__, #x); \
return -1; \
} \
if (__my_errno__ != e) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): expected errno = %d from %s\n", \
__LINE__, __my_ret__, __my_errno__, e, #x); \
__FILE__, __LINE__, __my_ret__, __my_errno__, e, #x); \
return -1; \
} \
} while (0);
@ -79,9 +83,9 @@
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (!__my_ret__) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
"code %d (errno: %d): got zero from %s\n", __LINE__, \
__my_ret__, __my_errno__, #x); \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): got zero from %s\n", __FILE__, __LINE__, \
__my_ret__, __my_errno__, #x); \
return -1; \
} \
} while (0);
@ -91,9 +95,9 @@
int __my_ret__ = x; \
int __my_errno__ = errno; \
if (__my_ret__ < 0) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): got negative return from %s\n", \
__LINE__, __my_ret__, __my_errno__, #x); \
__FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
return __my_ret__; \
} \
} while (0);
@ -103,9 +107,21 @@
int __my_ret__ = y; \
int __my_errno__ = errno; \
if (__my_ret__ != (x)) { \
fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"code %d (errno: %d): expected %d\n", \
__LINE__, __my_ret__, __my_errno__, (x)); \
__FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
return -1; \
} \
} while (0);
#define EXPECT_INT64_EQ(x, y) \
do { \
int64_t __my_ret__ = y; \
int __my_errno__ = errno; \
if (__my_ret__ != (x)) { \
fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
"value %"PRId64" (errno: %d): expected %"PRId64"\n", \
__FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
return -1; \
} \
} while (0);
@ -117,4 +133,17 @@
ret = -errno; \
} while (ret == -EINTR);
/**
* Test that an HDFS file has the given statistics.
*
* Any parameter can be set to UINT64_MAX to avoid checking it.
*
* @return 0 on success; error code otherwise
*/
int expectFileStats(struct hdfsFile_internal *file,
uint64_t expectedTotalBytesRead,
uint64_t expectedTotalLocalBytesRead,
uint64_t expectedTotalShortCircuitBytesRead,
uint64_t expectedTotalZeroCopyBytesRead);
#endif

View File

@ -39,6 +39,7 @@
#define JAVA_NET_ISA "java/net/InetSocketAddress"
#define JAVA_NET_URI "java/net/URI"
#define JAVA_STRING "java/lang/String"
#define READ_OPTION "org/apache/hadoop/fs/ReadOption"
#define JAVA_VOID "V"
@ -143,6 +144,15 @@ int hdfsFileGetReadStatistics(hdfsFile file,
goto done;
}
s->totalShortCircuitBytesRead = jVal.j;
jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
"org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
"getTotalZeroCopyBytesRead", "()J");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
goto done;
}
s->totalZeroCopyBytesRead = jVal.j;
*stats = s;
s = NULL;
ret = 0;
@ -183,6 +193,25 @@ void hdfsFileDisableDirectRead(hdfsFile file)
file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
}
int hdfsDisableDomainSocketSecurity(void)
{
jthrowable jthr;
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jthr = invokeMethod(env, NULL, STATIC, NULL,
"org/apache/hadoop/net/unix/DomainSocket",
"disableBindPathValidation", "()V");
if (jthr) {
errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"DomainSocket#disableBindPathValidation");
return -1;
}
return 0;
}
/**
* hdfsJniEnv: A wrapper struct to be used as 'value'
* while saving thread -> JNIEnv* mappings
@ -220,40 +249,6 @@ static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path,
return NULL;
}
/**
* Set a configuration value.
*
* @param env The JNI environment
* @param jConfiguration The configuration object to modify
* @param key The key to modify
* @param value The value to set the key to
*
* @return NULL on success; exception otherwise
*/
static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value)
{
jthrowable jthr;
jstring jkey = NULL, jvalue = NULL;
jthr = newJavaStr(env, key, &jkey);
if (jthr)
goto done;
jthr = newJavaStr(env, value, &jvalue);
if (jthr)
goto done;
jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
JPARAM(JAVA_STRING), JAVA_VOID),
jkey, jvalue);
if (jthr)
goto done;
done:
destroyLocalReference(env, jkey);
destroyLocalReference(env, jvalue);
return jthr;
}
static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
const char *key, char **val)
{
@ -2108,6 +2103,395 @@ int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime)
return 0;
}
/**
* Zero-copy options.
*
* We cache the EnumSet of ReadOptions which has to be passed into every
* readZero call, to avoid reconstructing it each time. This cache is cleared
* whenever an element changes.
*/
struct hadoopRzOptions
{
JNIEnv *env;
int skipChecksums;
jobject byteBufferPool;
jobject cachedEnumSet;
};
struct hadoopRzOptions *hadoopRzOptionsAlloc(void)
{
struct hadoopRzOptions *opts;
JNIEnv *env;
env = getJNIEnv();
if (!env) {
// Check to make sure the JNI environment is set up properly.
errno = EINTERNAL;
return NULL;
}
opts = calloc(1, sizeof(struct hadoopRzOptions));
if (!opts) {
errno = ENOMEM;
return NULL;
}
return opts;
}
static void hadoopRzOptionsClearCached(JNIEnv *env,
struct hadoopRzOptions *opts)
{
if (!opts->cachedEnumSet) {
return;
}
(*env)->DeleteGlobalRef(env, opts->cachedEnumSet);
opts->cachedEnumSet = NULL;
}
int hadoopRzOptionsSetSkipChecksum(
struct hadoopRzOptions *opts, int skip)
{
JNIEnv *env;
env = getJNIEnv();
if (!env) {
errno = EINTERNAL;
return -1;
}
hadoopRzOptionsClearCached(env, opts);
opts->skipChecksums = !!skip;
return 0;
}
int hadoopRzOptionsSetByteBufferPool(
struct hadoopRzOptions *opts, const char *className)
{
JNIEnv *env;
jthrowable jthr;
jobject byteBufferPool = NULL;
env = getJNIEnv();
if (!env) {
errno = EINTERNAL;
return -1;
}
// Note: we don't have to call hadoopRzOptionsClearCached in this
// function, since the ByteBufferPool is passed separately from the
// EnumSet of ReadOptions.
jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V");
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopRzOptionsSetByteBufferPool(className=%s): ", className);
errno = EINVAL;
return -1;
}
if (opts->byteBufferPool) {
// Delete any previous ByteBufferPool we had.
(*env)->DeleteGlobalRef(env, opts->byteBufferPool);
}
opts->byteBufferPool = byteBufferPool;
return 0;
}
void hadoopRzOptionsFree(struct hadoopRzOptions *opts)
{
JNIEnv *env;
env = getJNIEnv();
if (!env) {
return;
}
hadoopRzOptionsClearCached(env, opts);
if (opts->byteBufferPool) {
(*env)->DeleteGlobalRef(env, opts->byteBufferPool);
opts->byteBufferPool = NULL;
}
free(opts);
}
struct hadoopRzBuffer
{
jobject byteBuffer;
uint8_t *ptr;
int32_t length;
int direct;
};
static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env,
struct hadoopRzOptions *opts, jobject *enumSet)
{
jthrowable jthr = NULL;
jobject enumInst = NULL, enumSetObj = NULL;
jvalue jVal;
if (opts->cachedEnumSet) {
// If we cached the value, return it now.
*enumSet = opts->cachedEnumSet;
goto done;
}
if (opts->skipChecksums) {
jthr = fetchEnumInstance(env, READ_OPTION,
"SKIP_CHECKSUMS", &enumInst);
if (jthr) {
goto done;
}
jthr = invokeMethod(env, &jVal, STATIC, NULL,
"java/util/EnumSet", "of",
"(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst);
if (jthr) {
goto done;
}
enumSetObj = jVal.l;
} else {
jclass clazz = (*env)->FindClass(env, READ_OPTION);
if (!clazz) {
jthr = newRuntimeError(env, "failed "
"to find class for %s", READ_OPTION);
goto done;
}
jthr = invokeMethod(env, &jVal, STATIC, NULL,
"java/util/EnumSet", "noneOf",
"(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz);
enumSetObj = jVal.l;
}
// create global ref
opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj);
if (!opts->cachedEnumSet) {
jthr = getPendingExceptionAndClear(env);
goto done;
}
*enumSet = opts->cachedEnumSet;
jthr = NULL;
done:
(*env)->DeleteLocalRef(env, enumInst);
(*env)->DeleteLocalRef(env, enumSetObj);
return jthr;
}
static int hadoopReadZeroExtractBuffer(JNIEnv *env,
const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer)
{
int ret;
jthrowable jthr;
jvalue jVal;
uint8_t *directStart;
void *mallocBuf = NULL;
jint position;
jarray array = NULL;
jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
"java/nio/ByteBuffer", "remaining", "()I");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: ");
goto done;
}
buffer->length = jVal.i;
jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
"java/nio/ByteBuffer", "position", "()I");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZeroExtractBuffer: ByteBuffer#position failed: ");
goto done;
}
position = jVal.i;
directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer);
if (directStart) {
// Handle direct buffers.
buffer->ptr = directStart + position;
buffer->direct = 1;
ret = 0;
goto done;
}
// Handle indirect buffers.
// The JNI docs don't say that GetDirectBufferAddress throws any exceptions
// when it fails. However, they also don't clearly say that it doesn't. It
// seems safest to clear any pending exceptions here, to prevent problems on
// various JVMs.
(*env)->ExceptionClear(env);
if (!opts->byteBufferPool) {
fputs("hadoopReadZeroExtractBuffer: we read through the "
"zero-copy path, but failed to get the address of the buffer via "
"GetDirectBufferAddress. Please make sure your JVM supports "
"GetDirectBufferAddress.\n", stderr);
ret = ENOTSUP;
goto done;
}
// Get the backing array object of this buffer.
jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
"java/nio/ByteBuffer", "array", "()[B");
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZeroExtractBuffer: ByteBuffer#array failed: ");
goto done;
}
array = jVal.l;
if (!array) {
fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.",
stderr);
ret = EIO;
goto done;
}
mallocBuf = malloc(buffer->length);
if (!mallocBuf) {
fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n",
buffer->length);
ret = ENOMEM;
goto done;
}
(*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf);
jthr = (*env)->ExceptionOccurred(env);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: ");
goto done;
}
buffer->ptr = mallocBuf;
buffer->direct = 0;
ret = 0;
done:
free(mallocBuf);
(*env)->DeleteLocalRef(env, array);
return ret;
}
static int translateZCRException(JNIEnv *env, jthrowable exc)
{
int ret;
char *className = NULL;
jthrowable jthr = classNameOfObject(exc, env, &className);
if (jthr) {
fputs("hadoopReadZero: failed to get class name of "
"exception from read().\n", stderr);
destroyLocalReference(env, exc);
destroyLocalReference(env, jthr);
ret = EIO;
goto done;
}
if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
ret = EPROTONOSUPPORT;
goto done;
}
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopZeroCopyRead: ZeroCopyCursor#read failed");
done:
free(className);
return ret;
}
struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
struct hadoopRzOptions *opts, int32_t maxLength)
{
JNIEnv *env;
jthrowable jthr = NULL;
jvalue jVal;
jobject enumSet = NULL, byteBuffer = NULL;
struct hadoopRzBuffer* buffer = NULL;
int ret;
env = getJNIEnv();
if (!env) {
errno = EINTERNAL;
return NULL;
}
if (file->type != INPUT) {
fputs("Cannot read from a non-InputStream object!\n", stderr);
ret = EINVAL;
goto done;
}
buffer = calloc(1, sizeof(struct hadoopRzBuffer));
if (!buffer) {
ret = ENOMEM;
goto done;
}
jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet);
if (jthr) {
ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopReadZero: hadoopRzOptionsGetEnumSet failed: ");
goto done;
}
jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read",
"(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
"Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet);
if (jthr) {
ret = translateZCRException(env, jthr);
goto done;
}
byteBuffer = jVal.l;
if (!byteBuffer) {
buffer->byteBuffer = NULL;
buffer->length = 0;
buffer->ptr = NULL;
} else {
buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer);
if (!buffer->byteBuffer) {
ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"hadoopReadZero: failed to create global ref to ByteBuffer");
goto done;
}
ret = hadoopReadZeroExtractBuffer(env, opts, buffer);
if (ret) {
goto done;
}
}
ret = 0;
done:
(*env)->DeleteLocalRef(env, byteBuffer);
if (ret) {
if (buffer) {
if (buffer->byteBuffer) {
(*env)->DeleteGlobalRef(env, buffer->byteBuffer);
}
free(buffer);
}
errno = ret;
return NULL;
} else {
errno = 0;
}
return buffer;
}
int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer)
{
return buffer->length;
}
const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer)
{
return buffer->ptr;
}
void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer)
{
jvalue jVal;
jthrowable jthr;
JNIEnv* env;
env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return;
}
if (buffer->byteBuffer) {
jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
HADOOP_ISTRM, "releaseBuffer",
"(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"hadoopRzBufferFree: releaseBuffer failed: ");
// even on error, we have to delete the reference.
}
(*env)->DeleteGlobalRef(env, buffer->byteBuffer);
}
if (!buffer->direct) {
free(buffer->ptr);
}
memset(buffer, 0, sizeof(*buffer));
free(buffer);
}
char***
hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
{

View File

@ -36,6 +36,8 @@
#define EINTERNAL 255
#endif
#define ELASTIC_BYTE_BUFFER_POOL_CLASS \
"org/apache/hadoop/io/ElasticByteBufferPool"
/** All APIs set errno to meaningful values */
@ -65,6 +67,10 @@ extern "C" {
struct hdfsFile_internal;
typedef struct hdfsFile_internal* hdfsFile;
struct hadoopRzOptions;
struct hadoopRzBuffer;
/**
* Determine if a file is open for read.
*
@ -85,6 +91,7 @@ extern "C" {
uint64_t totalBytesRead;
uint64_t totalLocalBytesRead;
uint64_t totalShortCircuitBytesRead;
uint64_t totalZeroCopyBytesRead;
};
/**
@ -680,7 +687,107 @@ extern "C" {
* @return 0 on success else -1
*/
int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
/**
* Allocate a zero-copy options structure.
*
* You must free all options structures allocated with this function using
* hadoopRzOptionsFree.
*
* @return A zero-copy options structure, or NULL if one could
* not be allocated. If NULL is returned, errno will
* contain the error number.
*/
struct hadoopRzOptions *hadoopRzOptionsAlloc(void);
/**
* Determine whether we should skip checksums in read0.
*
* @param opts The options structure.
* @param skip Nonzero to skip checksums sometimes; zero to always
* check them.
*
* @return 0 on success; -1 plus errno on failure.
*/
int hadoopRzOptionsSetSkipChecksum(
struct hadoopRzOptions *opts, int skip);
/**
* Set the ByteBufferPool to use with read0.
*
* @param opts The options structure.
* @param className If this is NULL, we will not use any
* ByteBufferPool. If this is non-NULL, it will be
* treated as the name of the pool class to use.
* For example, you can use
* ELASTIC_BYTE_BUFFER_POOL_CLASS.
*
* @return 0 if the ByteBufferPool class was found and
* instantiated;
* -1 plus errno otherwise.
*/
int hadoopRzOptionsSetByteBufferPool(
struct hadoopRzOptions *opts, const char *className);
/**
* Free a hadoopRzOptionsFree structure.
*
* @param opts The options structure to free.
* Any associated ByteBufferPool will also be freed.
*/
void hadoopRzOptionsFree(struct hadoopRzOptions *opts);
/**
* Perform a byte buffer read.
* If possible, this will be a zero-copy (mmap) read.
*
* @param file The file to read from.
* @param opts An options structure created by hadoopRzOptionsAlloc.
* @param maxLength The maximum length to read. We may read fewer bytes
* than this length.
*
* @return On success, returns a new hadoopRzBuffer.
* This buffer will continue to be valid and readable
* until it is released by readZeroBufferFree. Failure to
* release a buffer will lead to a memory leak.
*
* NULL plus an errno code on an error.
* errno = EOPNOTSUPP indicates that we could not do a
* zero-copy read, and there was no ByteBufferPool
* supplied.
*/
struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
struct hadoopRzOptions *opts, int32_t maxLength);
/**
* Determine the length of the buffer returned from readZero.
*
* @param buffer a buffer returned from readZero.
* @return the length of the buffer.
*/
int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer);
/**
* Get a pointer to the raw buffer returned from readZero.
*
* To find out how many bytes this buffer contains, call
* hadoopRzBufferLength.
*
* @param buffer a buffer returned from readZero.
* @return a pointer to the start of the buffer. This will be
* NULL when end-of-file has been reached.
*/
const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer);
/**
* Release a buffer obtained through readZero.
*
* @param file The hdfs stream that created this buffer. This must be
* the same stream you called hadoopReadZero on.
* @param buffer The buffer to release.
*/
void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer);
#ifdef __cplusplus
}
#endif

View File

@ -48,6 +48,15 @@ extern "C" {
* @param file The HDFS file
*/
void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
/**
* Disable domain socket security checks.
*
* @param 0 if domain socket security was disabled;
* -1 if not.
*/
int hdfsDisableDomainSocketSecurity(void);
#ifdef __cplusplus
}
#endif

View File

@ -608,3 +608,73 @@ JNIEnv* getJNIEnv(void)
return env;
}
int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name)
{
jclass clazz;
int ret;
clazz = (*env)->FindClass(env, name);
if (!clazz) {
printPendingExceptionAndFree(env, PRINT_EXC_ALL,
"javaObjectIsOfClass(%s)", name);
return -1;
}
ret = (*env)->IsInstanceOf(env, obj, clazz);
(*env)->DeleteLocalRef(env, clazz);
return ret == JNI_TRUE ? 1 : 0;
}
jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value)
{
jthrowable jthr;
jstring jkey = NULL, jvalue = NULL;
jthr = newJavaStr(env, key, &jkey);
if (jthr)
goto done;
jthr = newJavaStr(env, value, &jvalue);
if (jthr)
goto done;
jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
"org/apache/hadoop/conf/Configuration", "set",
"(Ljava/lang/String;Ljava/lang/String;)V",
jkey, jvalue);
if (jthr)
goto done;
done:
(*env)->DeleteLocalRef(env, jkey);
(*env)->DeleteLocalRef(env, jvalue);
return jthr;
}
jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
const char *valueName, jobject *out)
{
jclass clazz;
jfieldID fieldId;
jobject jEnum;
char prettyClass[256];
clazz = (*env)->FindClass(env, className);
if (!clazz) {
return newRuntimeError(env, "fetchEnum(%s, %s): failed to find class.",
className, valueName);
}
if (snprintf(prettyClass, sizeof(prettyClass), "L%s;", className)
>= sizeof(prettyClass)) {
return newRuntimeError(env, "fetchEnum(%s, %s): class name too long.",
className, valueName);
}
fieldId = (*env)->GetStaticFieldID(env, clazz, valueName, prettyClass);
if (!fieldId) {
return getPendingExceptionAndClear(env);
}
jEnum = (*env)->GetStaticObjectField(env, clazz, fieldId);
if (!jEnum) {
return getPendingExceptionAndClear(env);
}
*out = jEnum;
return NULL;
}

View File

@ -114,6 +114,47 @@ jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name);
* */
JNIEnv* getJNIEnv(void);
/**
* Figure out if a Java object is an instance of a particular class.
*
* @param env The Java environment.
* @param obj The object to check.
* @param name The class name to check.
*
* @return -1 if we failed to find the referenced class name.
* 0 if the object is not of the given class.
* 1 if the object is of the given class.
*/
int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name);
/**
* Set a value in a configuration object.
*
* @param env The JNI environment
* @param jConfiguration The configuration object to modify
* @param key The key to modify
* @param value The value to set the key to
*
* @return NULL on success; exception otherwise
*/
jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
const char *key, const char *value);
/**
* Fetch an instance of an Enum.
*
* @param env The JNI environment.
* @param className The enum class name.
* @param valueName The name of the enum value
* @param out (out param) on success, a local reference to an
* instance of the enum object. (Since Java enums are
* singletones, this is also the only instance.)
*
* @return NULL on success; exception otherwise
*/
jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
const char *valueName, jobject *out);
#endif /*LIBHDFS_JNI_HELPER_H*/
/**

View File

@ -17,14 +17,19 @@
*/
#include "exception.h"
#include "hdfs.h"
#include "hdfs_test.h"
#include "jni_helper.h"
#include "native_mini_dfs.h"
#include <errno.h>
#include <jni.h>
#include <limits.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <unistd.h>
#define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder"
#define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster"
@ -39,8 +44,44 @@ struct NativeMiniDfsCluster {
* The NativeMiniDfsCluster object
*/
jobject obj;
/**
* Path to the domain socket, or the empty string if there is none.
*/
char domainSocketPath[PATH_MAX];
};
static jthrowable nmdConfigureShortCircuit(JNIEnv *env,
struct NativeMiniDfsCluster *cl, jobject cobj)
{
jthrowable jthr;
char *tmpDir;
int ret = hdfsDisableDomainSocketSecurity();
if (ret) {
return newRuntimeError(env, "failed to disable hdfs domain "
"socket security: error %d", ret);
}
jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true");
if (jthr) {
return jthr;
}
tmpDir = getenv("TMPDIR");
if (!tmpDir) {
tmpDir = "/tmp";
}
snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
tmpDir, getpid(), rand());
snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d",
tmpDir, getpid(), rand());
jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path",
cl->domainSocketPath);
if (jthr) {
return jthr;
}
return NULL;
}
struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
{
struct NativeMiniDfsCluster* cl = NULL;
@ -81,6 +122,28 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
goto error;
}
}
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: Configuration::setBoolean");
goto error;
}
// Disable 'minimum block size' -- it's annoying in tests.
(*env)->DeleteLocalRef(env, jconfStr);
jconfStr = NULL;
jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: new String");
goto error;
}
jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF,
"setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: Configuration::setLong");
goto error;
}
// Creae MiniDFSCluster object
jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER,
"(L"HADOOP_CONF";)V", cobj);
if (jthr) {
@ -88,6 +151,14 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf)
"nmdCreate: NativeMiniDfsCluster#Builder#Builder");
goto error;
}
if (conf->configureShortCircuit) {
jthr = nmdConfigureShortCircuit(env, cl, cobj);
if (jthr) {
printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
"nmdCreate: nmdConfigureShortCircuit error");
goto error;
}
}
jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER,
"format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat);
if (jthr) {
@ -272,3 +343,29 @@ error_dlr_nn:
return ret;
}
int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
struct hdfsBuilder *bld)
{
int port, ret;
hdfsBuilderSetNameNode(bld, "localhost");
port = nmdGetNameNodePort(cl);
if (port < 0) {
fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port);
return EIO;
}
hdfsBuilderSetNameNodePort(bld, port);
if (cl->domainSocketPath[0]) {
ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true");
if (ret) {
return ret;
}
ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path",
cl->domainSocketPath);
if (ret) {
return ret;
}
}
return 0;
}

View File

@ -21,6 +21,7 @@
#include <jni.h> /* for jboolean */
struct hdfsBuilder;
struct NativeMiniDfsCluster;
/**
@ -28,17 +29,24 @@ struct NativeMiniDfsCluster;
*/
struct NativeMiniDfsConf {
/**
* Nonzero if the cluster should be formatted prior to startup
* Nonzero if the cluster should be formatted prior to startup.
*/
jboolean doFormat;
/**
* Whether or not to enable webhdfs in MiniDfsCluster
*/
jboolean webhdfsEnabled;
/**
* The http port of the namenode in MiniDfsCluster
*/
jint namenodeHttpPort;
/**
* Nonzero if we should configure short circuit.
*/
jboolean configureShortCircuit;
};
/**
@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster* cl);
*
* @return the port, or a negative error code
*/
int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
/**
* Get the http address that's in use by the given (non-HA) nativeMiniDfs
@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl);
int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl,
int *port, const char **hostName);
/**
* Configure the HDFS builder appropriately to connect to this cluster.
*
* @param bld The hdfs builder
*
* @return the port, or a negative error code
*/
int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl,
struct hdfsBuilder *bld);
#endif

View File

@ -0,0 +1,233 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "expect.h"
#include "hdfs.h"
#include "native_mini_dfs.h"
#include <errno.h>
#include <inttypes.h>
#include <semaphore.h>
#include <pthread.h>
#include <unistd.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#define TO_STR_HELPER(X) #X
#define TO_STR(X) TO_STR_HELPER(X)
#define TEST_FILE_NAME_LENGTH 128
#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096
#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215
#define TEST_ZEROCOPY_NUM_BLOCKS 6
#define SMALL_READ_LEN 16
#define ZC_BUF_LEN 32768
static uint8_t *getZeroCopyBlockData(int blockIdx)
{
uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE);
int i;
if (!buf) {
fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE);
exit(1);
}
for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) {
buf[i] = blockIdx + (i % 17);
}
return buf;
}
static int getZeroCopyBlockLen(int blockIdx)
{
if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) {
return 0;
} else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) {
return TEST_ZEROCOPY_LAST_BLOCK_SIZE;
} else {
return TEST_ZEROCOPY_FULL_BLOCK_SIZE;
}
}
static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused));
static void printBuf(const uint8_t *buf, size_t len)
{
size_t i;
for (i = 0; i < len; i++) {
fprintf(stderr, "%02x", buf[i]);
}
fprintf(stderr, "\n");
}
static int doTestZeroCopyReads(hdfsFS fs, const char *fileName)
{
hdfsFile file = NULL;
struct hadoopRzOptions *opts = NULL;
struct hadoopRzBuffer *buffer = NULL;
uint8_t *block;
file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0);
EXPECT_NONNULL(file);
opts = hadoopRzOptionsAlloc();
EXPECT_NONNULL(opts);
EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1));
/* haven't read anything yet */
EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL));
block = getZeroCopyBlockData(0);
EXPECT_NONNULL(block);
/* first read is half of a block. */
buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
hadoopRzBufferLength(buffer));
EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block,
TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
hadoopRzBufferFree(file, buffer);
/* read the next half of the block */
buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2);
EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2,
hadoopRzBufferLength(buffer));
EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer),
block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2),
TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2));
hadoopRzBufferFree(file, buffer);
free(block);
EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE,
TEST_ZEROCOPY_FULL_BLOCK_SIZE));
/* Now let's read just a few bytes. */
buffer = hadoopReadZero(file, opts, SMALL_READ_LEN);
EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer));
block = getZeroCopyBlockData(1);
EXPECT_NONNULL(block);
EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN));
hadoopRzBufferFree(file, buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
hdfsTell(fs, file));
EXPECT_ZERO(expectFileStats(file,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
/* Clear 'skip checksums' and test that we can't do zero-copy reads any
* more. Since there is no ByteBufferPool set, we should fail with
* EPROTONOSUPPORT.
*/
EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0));
EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE));
EXPECT_INT_EQ(EPROTONOSUPPORT, errno);
/* Now set a ByteBufferPool and try again. It should succeed this time. */
EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts,
ELASTIC_BYTE_BUFFER_POOL_CLASS));
buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE);
EXPECT_NONNULL(buffer);
EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer));
EXPECT_ZERO(expectFileStats(file,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
(2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN,
TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN));
EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer),
TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN));
free(block);
block = getZeroCopyBlockData(2);
EXPECT_NONNULL(block);
EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) +
(TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN));
hadoopRzBufferFree(file, buffer);
free(block);
hadoopRzOptionsFree(opts);
EXPECT_ZERO(hdfsCloseFile(fs, file));
return 0;
}
static int createZeroCopyTestFile(hdfsFS fs, char *testFileName,
size_t testFileNameLen)
{
int blockIdx, blockLen;
hdfsFile file;
uint8_t *data;
snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d",
getpid(), rand());
file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1,
TEST_ZEROCOPY_FULL_BLOCK_SIZE);
EXPECT_NONNULL(file);
for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) {
blockLen = getZeroCopyBlockLen(blockIdx);
data = getZeroCopyBlockData(blockIdx);
EXPECT_NONNULL(data);
EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen));
}
EXPECT_ZERO(hdfsCloseFile(fs, file));
return 0;
}
/**
* Test that we can write a file with libhdfs and then read it back
*/
int main(void)
{
int port;
struct NativeMiniDfsConf conf = {
.doFormat = 1,
.configureShortCircuit = 1,
};
char testFileName[TEST_FILE_NAME_LENGTH];
hdfsFS fs;
struct NativeMiniDfsCluster* cl;
struct hdfsBuilder *bld;
cl = nmdCreate(&conf);
EXPECT_NONNULL(cl);
EXPECT_ZERO(nmdWaitClusterUp(cl));
port = nmdGetNameNodePort(cl);
if (port < 0) {
fprintf(stderr, "TEST_ERROR: test_zerocopy: "
"nmdGetNameNodePort returned error %d\n", port);
return EXIT_FAILURE;
}
bld = hdfsNewBuilder();
EXPECT_NONNULL(bld);
EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld));
hdfsBuilderSetForceNewInstance(bld);
hdfsBuilderConfSetStr(bld, "dfs.block.size",
TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE));
/* ensure that we'll always get our mmaps */
hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum",
"true");
fs = hdfsBuilderConnect(bld);
EXPECT_NONNULL(fs);
EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName,
TEST_FILE_NAME_LENGTH));
EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName));
EXPECT_ZERO(hdfsDisconnect(fs));
EXPECT_ZERO(nmdShutdown(cl));
nmdFree(cl);
fprintf(stderr, "TEST_SUCCESS\n");
return EXIT_SUCCESS;
}

View File

@ -1415,4 +1415,32 @@
linearly increases.
</description>
</property>
<property>
<name>dfs.client.mmap.cache.size</name>
<value>1024</value>
<description>
When zero-copy reads are used, the DFSClient keeps a cache of recently used
memory mapped regions. This parameter controls the maximum number of
entries that we will keep in that cache.
If this is set to 0, we will not allow mmap.
The larger this number is, the more file descriptors we will potentially
use for memory-mapped files. mmaped files also use virtual address space.
You may need to increase your ulimit virtual address space limits before
increasing the client mmap cache size.
</description>
</property>
<property>
<name>dfs.client.mmap.cache.timeout.ms</name>
<value>900000</value>
<description>
The minimum length of time that we will keep an mmap entry in the cache
between uses. If an entry is in the cache longer than this, and nobody
uses it, it will be removed by a background thread.
</description>
</property>
</configuration>

View File

@ -0,0 +1,530 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Random;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
/**
* This class tests if EnhancedByteBufferAccess works correctly.
*/
public class TestEnhancedByteBufferAccess {
private static final Log LOG =
LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());
static TemporarySocketDirectory sockDir;
@BeforeClass
public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
private static byte[] byteBufferToArray(ByteBuffer buf) {
byte resultArray[] = new byte[buf.remaining()];
buf.get(resultArray);
buf.flip();
return resultArray;
}
public static HdfsConfiguration initZeroCopyTest() {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
HdfsConfiguration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
new File(sockDir.getDir(),
"TestRequestMmapAccess._PORT.sock").getAbsolutePath());
conf.setBoolean(DFSConfigKeys.
DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
return conf;
}
@Test
public void testZeroCopyReads() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
FSDataInputStream fsIn = null;
final int TEST_FILE_LENGTH = 12345;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, 7567L);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
ByteBuffer result = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.assertEquals(4096, result.remaining());
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
byteBufferToArray(result));
fsIn.releaseBuffer(result);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
@Test
public void testShortZeroCopyReads() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
FSDataInputStream fsIn = null;
final int TEST_FILE_LENGTH = 12345;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
// Try to read 8192, but only get 4096 because of the block size.
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
ByteBuffer result =
dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.assertEquals(4096, result.remaining());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
byteBufferToArray(result));
dfsIn.releaseBuffer(result);
// Try to read 4097, but only get 4096 because of the block size.
result =
dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.assertEquals(4096, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
byteBufferToArray(result));
dfsIn.releaseBuffer(result);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
@Test
public void testZeroCopyReadsNoFallback() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
FSDataInputStream fsIn = null;
final int TEST_FILE_LENGTH = 12345;
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, 7567L);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
ByteBuffer result;
try {
result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class));
Assert.fail("expected UnsupportedOperationException");
} catch (UnsupportedOperationException e) {
// expected
}
result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.assertEquals(4096, result.remaining());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalBytesRead());
Assert.assertEquals(4096,
dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
byteBufferToArray(result));
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
private static class CountingVisitor
implements ClientMmapManager.ClientMmapVisitor {
int count = 0;
@Override
public void accept(ClientMmap mmap) {
count++;
}
public void reset() {
count = 0;
}
}
@Test
public void testZeroCopyMmapCache() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
FSDataInputStream fsIn = null;
ByteBuffer results[] = { null, null, null, null, null };
DistributedFileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
final ClientMmapManager mmapManager = fs.getClient().getMmapManager();
final CountingVisitor countingVisitor = new CountingVisitor();
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
mmapManager.visitEvictable(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
results[0] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
fsIn.seek(0);
results[1] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(1, countingVisitor.count);
countingVisitor.reset();
mmapManager.visitEvictable(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
countingVisitor.reset();
// The mmaps should be of the first block of the file.
final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
@Override
public void accept(ClientMmap mmap) {
Assert.assertEquals(firstBlock, mmap.getBlock());
}
});
// Read more blocks.
results[2] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
results[3] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
try {
results[4] = fsIn.read(null, 4096,
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
Assert.fail("expected UnsupportedOperationException");
} catch (UnsupportedOperationException e) {
// expected
}
// we should have 3 mmaps, 0 evictable
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(3, countingVisitor.count);
countingVisitor.reset();
mmapManager.visitEvictable(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
// After we close the cursors, the mmaps should be evictable for
// a brief period of time. Then, they should be closed (we're
// using a very quick timeout)
for (ByteBuffer buffer : results) {
if (buffer != null) {
fsIn.releaseBuffer(buffer);
}
}
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
countingVisitor.reset();
try {
mmapManager.visitEvictable(countingVisitor);
} catch (InterruptedException e) {
e.printStackTrace();
return false;
}
return (0 == countingVisitor.count);
}
}, 10, 10000);
countingVisitor.reset();
mmapManager.visitMmaps(countingVisitor);
Assert.assertEquals(0, countingVisitor.count);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
/**
* Test HDFS fallback reads. HDFS streams support the ByteBufferReadable
* interface.
*/
@Test
public void testHdfsFallbackReads() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
FSDataInputStream fsIn = null;
DistributedFileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
testFallbackImpl(fsIn, original);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
private static class RestrictedAllocatingByteBufferPool
implements ByteBufferPool {
private final boolean direct;
RestrictedAllocatingByteBufferPool(boolean direct) {
this.direct = direct;
}
@Override
public ByteBuffer getBuffer(boolean direct, int length) {
Preconditions.checkArgument(this.direct == direct);
return direct ? ByteBuffer.allocateDirect(length) :
ByteBuffer.allocate(length);
}
@Override
public void putBuffer(ByteBuffer buffer) {
}
}
private static void testFallbackImpl(InputStream stream,
byte original[]) throws Exception {
RestrictedAllocatingByteBufferPool bufferPool =
new RestrictedAllocatingByteBufferPool(
stream instanceof ByteBufferReadable);
ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
Assert.assertEquals(10, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
byteBufferToArray(result));
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
Assert.assertEquals(5000, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
byteBufferToArray(result));
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
Assert.assertEquals(11375, result.remaining());
Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
byteBufferToArray(result));
result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
Assert.assertNull(result);
}
/**
* Test the {@link ByteBufferUtil#fallbackRead} function directly.
*/
@Test
public void testFallbackRead() throws Exception {
HdfsConfiguration conf = initZeroCopyTest();
MiniDFSCluster cluster = null;
final Path TEST_PATH = new Path("/a");
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
FSDataInputStream fsIn = null;
DistributedFileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
try {
DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
} catch (InterruptedException e) {
Assert.fail("unexpected InterruptedException during " +
"waitReplication: " + e);
} catch (TimeoutException e) {
Assert.fail("unexpected TimeoutException during " +
"waitReplication: " + e);
}
fsIn = fs.open(TEST_PATH);
byte original[] = new byte[TEST_FILE_LENGTH];
IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
fsIn.close();
fsIn = fs.open(TEST_PATH);
testFallbackImpl(fsIn, original);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
}
}
/**
* Test fallback reads on a stream which does not support the
* ByteBufferReadable * interface.
*/
@Test
public void testIndirectFallbackReads() throws Exception {
final File TEST_DIR = new File(
System.getProperty("test.build.data","build/test/data"));
final String TEST_PATH = TEST_DIR + File.separator +
"indirectFallbackTestFile";
final int TEST_FILE_LENGTH = 16385;
final int RANDOM_SEED = 23453;
FileOutputStream fos = null;
FileInputStream fis = null;
try {
fos = new FileOutputStream(TEST_PATH);
Random random = new Random(RANDOM_SEED);
byte original[] = new byte[TEST_FILE_LENGTH];
random.nextBytes(original);
fos.write(original);
fos.close();
fos = null;
fis = new FileInputStream(TEST_PATH);
testFallbackImpl(fis, original);
} finally {
IOUtils.cleanup(LOG, fos, fis);
new File(TEST_PATH).delete();
}
}
}

View File

@ -25,7 +25,6 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@ -36,11 +35,26 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
public class TestBlockReaderLocal {
private static TemporarySocketDirectory sockDir;
@BeforeClass
public static void init() {
sockDir = new TemporarySocketDirectory();
DomainSocket.disableBindPathValidation();
}
@AfterClass
public static void shutdown() throws IOException {
sockDir.close();
}
public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2,
int off2, int len) {
for (int i = 0; i < len; i++) {
@ -100,10 +114,11 @@ public class TestBlockReaderLocal {
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
@ -138,6 +153,7 @@ public class TestBlockReaderLocal {
test.doTest(blockReaderLocal, original);
} finally {
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (dataIn != null) dataIn.close();
if (checkIn != null) checkIn.close();
@ -382,10 +398,11 @@ public class TestBlockReaderLocal {
final long RANDOM_SEED = 4567L;
FSDataInputStream fsIn = null;
byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH];
FileSystem fs = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
FileSystem fs = cluster.getFileSystem();
fs = cluster.getFileSystem();
DFSTestUtil.createFile(fs, TEST_PATH,
BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED);
try {
@ -417,6 +434,7 @@ public class TestBlockReaderLocal {
} finally {
DFSInputStream.tcpReadsDisabledForTesting = false;
if (fsIn != null) fsIn.close();
if (fs != null) fs.close();
if (cluster != null) cluster.shutdown();
if (sockDir != null) sockDir.close();
}