diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java new file mode 100644 index 00000000000..c31c29b5b6d --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferUtil.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.ByteBufferPool; + +import com.google.common.base.Preconditions; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class ByteBufferUtil { + + /** + * Determine if a stream can do a byte buffer read via read(ByteBuffer buf) + */ + private static boolean streamHasByteBufferRead(InputStream stream) { + if (!(stream instanceof ByteBufferReadable)) { + return false; + } + if (!(stream instanceof FSDataInputStream)) { + return true; + } + return ((FSDataInputStream)stream).getWrappedStream() + instanceof ByteBufferReadable; + } + + /** + * Perform a fallback read. + */ + public static ByteBuffer fallbackRead( + InputStream stream, ByteBufferPool bufferPool, int maxLength) + throws IOException { + if (bufferPool == null) { + throw new UnsupportedOperationException("zero-copy reads " + + "were not available, and you did not provide a fallback " + + "ByteBufferPool."); + } + boolean useDirect = streamHasByteBufferRead(stream); + ByteBuffer buffer = bufferPool.getBuffer(useDirect, maxLength); + if (buffer == null) { + throw new UnsupportedOperationException("zero-copy reads " + + "were not available, and the ByteBufferPool did not provide " + + "us with " + (useDirect ? "a direct" : "an indirect") + + "buffer."); + } + Preconditions.checkState(buffer.capacity() > 0); + Preconditions.checkState(buffer.isDirect() == useDirect); + maxLength = Math.min(maxLength, buffer.capacity()); + boolean success = false; + try { + if (useDirect) { + buffer.clear(); + buffer.limit(maxLength); + ByteBufferReadable readable = (ByteBufferReadable)stream; + int totalRead = 0; + while (true) { + if (totalRead >= maxLength) { + success = true; + break; + } + int nRead = readable.read(buffer); + if (nRead < 0) { + if (totalRead > 0) { + success = true; + } + break; + } + totalRead += nRead; + } + buffer.flip(); + } else { + buffer.clear(); + int nRead = stream.read(buffer.array(), + buffer.arrayOffset(), maxLength); + if (nRead >= 0) { + buffer.limit(nRead); + success = true; + } + } + } finally { + if (!success) { + // If we got an error while reading, or if we are at EOF, we + // don't need the buffer any more. We can give it back to the + // bufferPool. + bufferPool.putBuffer(buffer); + buffer = null; + } + } + return buffer; + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java index 5c032c3a6a5..a77ca437729 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java @@ -1,4 +1,5 @@ /** + * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,17 +20,29 @@ package org.apache.hadoop.fs; import java.io.*; import java.nio.ByteBuffer; +import java.util.EnumSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.fs.ByteBufferUtil; +import org.apache.hadoop.util.IdentityHashStore; /** Utility that wraps a {@link FSInputStream} in a {@link DataInputStream} * and buffers input through a {@link BufferedInputStream}. */ @InterfaceAudience.Public @InterfaceStability.Stable public class FSDataInputStream extends DataInputStream - implements Seekable, PositionedReadable, Closeable, - ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead { + implements Seekable, PositionedReadable, Closeable, + ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, + HasEnhancedByteBufferAccess { + /** + * Map ByteBuffers that we have handed out to readers to ByteBufferPool + * objects + */ + private final IdentityHashStore + extendedReadBuffers + = new IdentityHashStore(0); public FSDataInputStream(InputStream in) throws IOException { @@ -167,4 +180,45 @@ public class FSDataInputStream extends DataInputStream "support setting the drop-behind caching setting."); } } + + @Override + public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, + EnumSet opts) + throws IOException, UnsupportedOperationException { + try { + return ((HasEnhancedByteBufferAccess)in).read(bufferPool, + maxLength, opts); + } + catch (ClassCastException e) { + ByteBuffer buffer = ByteBufferUtil. + fallbackRead(this, bufferPool, maxLength); + if (buffer != null) { + extendedReadBuffers.put(buffer, bufferPool); + } + return buffer; + } + } + + private static final EnumSet EMPTY_READ_OPTIONS_SET = + EnumSet.noneOf(ReadOption.class); + + final public ByteBuffer read(ByteBufferPool bufferPool, int maxLength) + throws IOException, UnsupportedOperationException { + return read(bufferPool, maxLength, EMPTY_READ_OPTIONS_SET); + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + try { + ((HasEnhancedByteBufferAccess)in).releaseBuffer(buffer); + } + catch (ClassCastException e) { + ByteBufferPool bufferPool = extendedReadBuffers.remove( buffer); + if (bufferPool == null) { + throw new IllegalArgumentException("tried to release a buffer " + + "that was not created by this stream."); + } + bufferPool.putBuffer(buffer); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java index 8d668feeaba..148e6745f60 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java @@ -18,9 +18,11 @@ package org.apache.hadoop.fs; import java.io.*; +import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.ZeroCopyUnavailableException; /**************************************************************** * FSInputStream is a generic old InputStream with a little bit diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java new file mode 100644 index 00000000000..31178185020 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java @@ -0,0 +1,79 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.ByteBufferPool; + +/** + * FSDataInputStreams implement this interface to provide enhanced + * byte buffer access. Usually this takes the form of mmap support. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public interface HasEnhancedByteBufferAccess { + /** + * Get a ByteBuffer containing file data. + * + * This ByteBuffer may come from the stream itself, via a call like mmap, + * or it may come from the ByteBufferFactory which is passed in as an + * argument. + * + * @param factory + * If this is non-null, it will be used to create a fallback + * ByteBuffer when the stream itself cannot create one. + * @param maxLength + * The maximum length of buffer to return. We may return a buffer + * which is shorter than this. + * @param opts + * Options to use when reading. + * + * @return + * We will return null on EOF (and only on EOF). + * Otherwise, we will return a direct ByteBuffer containing at + * least one byte. You must free this ByteBuffer when you are + * done with it by calling releaseBuffer on it. + * The buffer will continue to be readable until it is released + * in this manner. However, the input stream's close method may + * warn about unclosed buffers. + * @throws + * IOException: if there was an error reading. + * UnsupportedOperationException: if factory was null, and we + * needed an external byte buffer. UnsupportedOperationException + * will never be thrown unless the factory argument is null. + */ + public ByteBuffer read(ByteBufferPool factory, int maxLength, + EnumSet opts) + throws IOException, UnsupportedOperationException; + + /** + * Release a ByteBuffer which was created by the enhanced ByteBuffer read + * function. You must not continue using the ByteBuffer after calling this + * function. + * + * @param buffer + * The ByteBuffer to release. + */ + public void releaseBuffer(ByteBuffer buffer); +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java new file mode 100644 index 00000000000..fa7aca1c497 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ReadOption.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Options that can be used when reading from a FileSystem. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public enum ReadOption { + /** + * Skip checksums when reading. This option may be useful when reading a file + * format that has built-in checksums, or for testing purposes. + */ + SKIP_CHECKSUMS, +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java new file mode 100644 index 00000000000..9cb68277e56 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ZeroCopyUnavailableException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.IOException; + +public class ZeroCopyUnavailableException extends IOException { + private static final long serialVersionUID = 0L; + + public ZeroCopyUnavailableException(String message) { + super(message); + } + + public ZeroCopyUnavailableException(String message, Exception e) { + super(message, e); + } + + public ZeroCopyUnavailableException(Exception e) { + super(e); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java new file mode 100644 index 00000000000..aa5f8731c54 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ByteBufferPool.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io; + +import java.nio.ByteBuffer; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Stable +public interface ByteBufferPool { + /** + * Get a new direct ByteBuffer. The pool can provide this from + * removing a buffer from its internal cache, or by allocating a + * new buffer. + * + * @param direct Whether the buffer should be direct. + * @param length The minimum length the buffer will have. + * @return A new ByteBuffer. This ByteBuffer must be direct. + * Its capacity can be less than what was requested, but + * must be at least 1 byte. + */ + ByteBuffer getBuffer(boolean direct, int length); + + /** + * Release a buffer back to the pool. + * The pool may choose to put this buffer into its cache. + * + * @param buffer a direct bytebuffer + */ + void putBuffer(ByteBuffer buffer); +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java new file mode 100644 index 00000000000..694fcbebcf6 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.io; + +import com.google.common.collect.ComparisonChain; +import org.apache.commons.lang.builder.HashCodeBuilder; + +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.TreeMap; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * This is a simple ByteBufferPool which just creates ByteBuffers as needed. + * It also caches ByteBuffers after they're released. It will always return + * the smallest cached buffer with at least the capacity you request. + * We don't try to do anything clever here like try to limit the maximum cache + * size. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public final class ElasticByteBufferPool implements ByteBufferPool { + private static final class Key implements Comparable { + private final int capacity; + private final long insertionTime; + + Key(int capacity, long insertionTime) { + this.capacity = capacity; + this.insertionTime = insertionTime; + } + + @Override + public int compareTo(Key other) { + return ComparisonChain.start(). + compare(capacity, other.capacity). + compare(insertionTime, other.insertionTime). + result(); + } + + @Override + public boolean equals(Object rhs) { + if (rhs == null) { + return false; + } + try { + Key o = (Key)rhs; + return (compareTo(o) == 0); + } catch (ClassCastException e) { + return false; + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder(). + append(capacity). + append(insertionTime). + toHashCode(); + } + } + + private final TreeMap buffers = + new TreeMap(); + + private final TreeMap directBuffers = + new TreeMap(); + + private final TreeMap getBufferTree(boolean direct) { + return direct ? directBuffers : buffers; + } + + @Override + public synchronized ByteBuffer getBuffer(boolean direct, int length) { + TreeMap tree = getBufferTree(direct); + Map.Entry entry = + tree.ceilingEntry(new Key(length, 0)); + if (entry == null) { + return direct ? ByteBuffer.allocateDirect(length) : + ByteBuffer.allocate(length); + } + tree.remove(entry.getKey()); + return entry.getValue(); + } + + @Override + public synchronized void putBuffer(ByteBuffer buffer) { + TreeMap tree = getBufferTree(buffer.isDirect()); + while (true) { + Key key = new Key(buffer.capacity(), System.nanoTime()); + if (!tree.containsKey(key)) { + tree.put(key, buffer); + return; + } + // Buffers are indexed by (capacity, time). + // If our key is not unique on the first try, we try again, since the + // time will be different. Since we use nanoseconds, it's pretty + // unlikely that we'll loop even once, unless the system clock has a + // poor granularity. + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java new file mode 100644 index 00000000000..4209488d39a --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/IdentityHashStore.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.util; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import com.google.common.base.Preconditions; + +/** + * The IdentityHashStore stores (key, value) mappings in an array. + * It is similar to java.util.HashTable, but much more lightweight. + * Neither inserting nor removing an element ever leads to any garbage + * getting created (assuming the array doesn't need to be enlarged). + * + * Unlike HashTable, it compares keys using + * {@link System#identityHashCode(Object)} and the identity operator. + * This is useful for types like ByteBuffer which have expensive hashCode + * and equals operators. + * + * We use linear probing to resolve collisions. This avoids the need for + * the overhead of linked list data structures. It also means that it is + * expensive to attempt to remove an element that isn't there, since we + * have to look at the entire array to be sure that it doesn't exist. + * + * @param The key type to use. + * @param THe value type to use. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +@SuppressWarnings("unchecked") +public final class IdentityHashStore { + /** + * Even elements are keys; odd elements are values. + * The array has size 1 + Math.pow(2, capacity). + */ + private Object buffer[]; + + private int numInserted = 0; + + private int capacity; + + /** + * The default maxCapacity value to use. + */ + private static final int DEFAULT_MAX_CAPACITY = 2; + + public IdentityHashStore(int capacity) { + Preconditions.checkArgument(capacity >= 0); + if (capacity == 0) { + this.capacity = 0; + this.buffer = null; + } else { + // Round the capacity we need up to a power of 2. + realloc((int)Math.pow(2, + Math.ceil(Math.log(capacity) / Math.log(2)))); + } + } + + private void realloc(int newCapacity) { + Preconditions.checkArgument(newCapacity > 0); + Object prevBuffer[] = buffer; + this.capacity = newCapacity; + // Each element takes two array slots -- one for the key, + // and another for the value. We also want a load factor + // of 0.50. Combine those together and you get 4 * newCapacity. + this.buffer = new Object[4 * newCapacity]; + this.numInserted = 0; + if (prevBuffer != null) { + for (int i = 0; i < prevBuffer.length; i += 2) { + if (prevBuffer[i] != null) { + putInternal(prevBuffer[i], prevBuffer[i + 1]); + } + } + } + } + + private void putInternal(Object k, Object v) { + int hash = System.identityHashCode(k); + final int numEntries = buffer.length / 2; + int index = hash % numEntries; + while (true) { + if (buffer[2 * index] == null) { + buffer[2 * index] = k; + buffer[1 + (2 * index)] = v; + numInserted++; + return; + } + index = (index + 1) % numEntries; + } + } + + /** + * Add a new (key, value) mapping. + * + * Inserting a new (key, value) never overwrites a previous one. + * In other words, you can insert the same key multiple times and it will + * lead to multiple entries. + */ + public void put(K k, V v) { + Preconditions.checkNotNull(k); + if (buffer == null) { + realloc(DEFAULT_MAX_CAPACITY); + } else if (numInserted + 1 > capacity) { + realloc(capacity * 2); + } + putInternal(k, v); + } + + private int getElementIndex(K k) { + if (buffer == null) { + return -1; + } + final int numEntries = buffer.length / 2; + int hash = System.identityHashCode(k); + int index = hash % numEntries; + int firstIndex = index; + do { + if (buffer[2 * index] == k) { + return index; + } + index = (index + 1) % numEntries; + } while (index != firstIndex); + return -1; + } + + /** + * Retrieve a value associated with a given key. + */ + public V get(K k) { + int index = getElementIndex(k); + if (index < 0) { + return null; + } + return (V)buffer[1 + (2 * index)]; + } + + /** + * Retrieve a value associated with a given key, and delete the + * relevant entry. + */ + public V remove(K k) { + int index = getElementIndex(k); + if (index < 0) { + return null; + } + V val = (V)buffer[1 + (2 * index)]; + buffer[2 * index] = null; + buffer[1 + (2 * index)] = null; + numInserted--; + return val; + } + + public boolean isEmpty() { + return numInserted == 0; + } + + public int numElements() { + return numInserted; + } + + public int capacity() { + return capacity; + } + + public interface Visitor { + void accept(K k, V v); + } + + /** + * Visit all key, value pairs in the IdentityHashStore. + */ + public void visitAll(Visitor visitor) { + int length = buffer == null ? 0 : buffer.length; + for (int i = 0; i < length; i += 2) { + if (buffer[i] != null) { + visitor.accept((K)buffer[i], (V)buffer[i + 1]); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java new file mode 100644 index 00000000000..795c8e32ccf --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestIdentityHashStore.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.util; + +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.List; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.util.IdentityHashStore; +import org.apache.hadoop.util.IdentityHashStore.Visitor; +import org.junit.Test; + +public class TestIdentityHashStore { + private static final Log LOG = LogFactory.getLog(TestIdentityHashStore.class.getName()); + + private static class Key { + private final String name; + + Key(String name) { + this.name = name; + } + + @Override + public int hashCode() { + throw new RuntimeException("should not be used!"); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof Key)) { + return false; + } + Key other = (Key)o; + return name.equals(other.name); + } + } + + @Test(timeout=60000) + public void testStartingWithZeroCapacity() { + IdentityHashStore store = + new IdentityHashStore(0); + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.fail("found key " + k + " in empty IdentityHashStore."); + } + }); + Assert.assertTrue(store.isEmpty()); + final Key key1 = new Key("key1"); + Integer value1 = new Integer(100); + store.put(key1, value1); + Assert.assertTrue(!store.isEmpty()); + Assert.assertEquals(value1, store.get(key1)); + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.assertEquals(key1, k); + } + }); + Assert.assertEquals(value1, store.remove(key1)); + Assert.assertTrue(store.isEmpty()); + } + + @Test(timeout=60000) + public void testDuplicateInserts() { + IdentityHashStore store = + new IdentityHashStore(4); + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.fail("found key " + k + " in empty IdentityHashStore."); + } + }); + Assert.assertTrue(store.isEmpty()); + Key key1 = new Key("key1"); + Integer value1 = new Integer(100); + Integer value2 = new Integer(200); + Integer value3 = new Integer(300); + store.put(key1, value1); + Key equalToKey1 = new Key("key1"); + + // IdentityHashStore compares by object equality, not equals() + Assert.assertNull(store.get(equalToKey1)); + + Assert.assertTrue(!store.isEmpty()); + Assert.assertEquals(value1, store.get(key1)); + store.put(key1, value2); + store.put(key1, value3); + final List allValues = new LinkedList(); + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + allValues.add(v); + } + }); + Assert.assertEquals(3, allValues.size()); + for (int i = 0; i < 3; i++) { + Integer value = store.remove(key1); + Assert.assertTrue(allValues.remove(value)); + } + Assert.assertNull(store.remove(key1)); + Assert.assertTrue(store.isEmpty()); + } + + @Test(timeout=60000) + public void testAdditionsAndRemovals() { + IdentityHashStore store = + new IdentityHashStore(0); + final int NUM_KEYS = 1000; + LOG.debug("generating " + NUM_KEYS + " keys"); + final List keys = new ArrayList(NUM_KEYS); + for (int i = 0; i < NUM_KEYS; i++) { + keys.add(new Key("key " + i)); + } + for (int i = 0; i < NUM_KEYS; i++) { + store.put(keys.get(i), i); + } + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.assertTrue(keys.contains(k)); + } + }); + for (int i = 0; i < NUM_KEYS; i++) { + Assert.assertEquals(Integer.valueOf(i), + store.remove(keys.get(i))); + } + store.visitAll(new Visitor() { + @Override + public void accept(Key k, Integer v) { + Assert.fail("expected all entries to be removed"); + } + }); + Assert.assertTrue("expected the store to be " + + "empty, but found " + store.numElements() + " elements.", + store.isEmpty()); + Assert.assertEquals(1024, store.capacity()); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c6885c55290..dfb6bf601b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -9,6 +9,9 @@ Release 2.3.0 - UNRELEASED HDFS-5122. Support failover and retry in WebHdfsFileSystem for NN HA. (Haohui Mai via jing9) + HDFS-4953. Enable HDFS local reads via mmap. + (Colin Patrick McCabe via wang). + IMPROVEMENTS HDFS-4657. Limit the number of blocks logged by the NN after a block @@ -50,6 +53,12 @@ Release 2.3.0 - UNRELEASED HDFS-5240. Separate formatting from logging in the audit logger API (daryn) + HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more + intuitive. (Contributed by Colin Patrick McCabe) + + HDFS-5260. Merge zero-copy memory-mapped HDFS client reads to trunk and + branch-2. (cnauroth) + OPTIMIZATIONS HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) @@ -72,6 +81,8 @@ Release 2.3.0 - UNRELEASED HDFS-5170. BlockPlacementPolicyDefault uses the wrong classname when alerting to enable debug logging. (Andrew Wang) + HDFS-5266. ElasticByteBufferPool#Key does not implement equals. (cnauroth) + Release 2.2.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index acfbea0c8a5..fb05e3ab1aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -330,4 +330,14 @@ + + + + + + + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt index 56528927987..be9c53edf77 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/src/CMakeLists.txt @@ -142,6 +142,7 @@ target_link_libraries(test_native_mini_dfs ) add_executable(test_libhdfs_threaded + main/native/libhdfs/expect.c main/native/libhdfs/test_libhdfs_threaded.c ) target_link_libraries(test_libhdfs_threaded @@ -150,6 +151,16 @@ target_link_libraries(test_libhdfs_threaded pthread ) +add_executable(test_libhdfs_zerocopy + main/native/libhdfs/expect.c + main/native/libhdfs/test/test_libhdfs_zerocopy.c +) +target_link_libraries(test_libhdfs_zerocopy + hdfs + native_mini_dfs + pthread +) + IF(REQUIRE_LIBWEBHDFS) add_subdirectory(contrib/libwebhdfs) ENDIF(REQUIRE_LIBWEBHDFS) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java index e1e40c0191f..2f0686a9beb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java @@ -20,12 +20,16 @@ package org.apache.hadoop.hdfs; import java.io.IOException; import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.hdfs.client.ClientMmap; +import org.apache.hadoop.hdfs.client.ClientMmapManager; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; /** * A BlockReader is responsible for reading a single block * from a single datanode. */ public interface BlockReader extends ByteBufferReadable { + /* same interface as inputStream java.io.InputStream#read() * used by DFSInputStream#read() @@ -81,4 +85,14 @@ public interface BlockReader extends ByteBufferReadable { * All short-circuit reads are also local. */ boolean isShortCircuit(); + + /** + * Get a ClientMmap object for this BlockReader. + * + * @param curBlock The current block. + * @return The ClientMmap object, or null if mmap is not + * supported. + */ + ClientMmap getClientMmap(LocatedBlock curBlock, + ClientMmapManager mmapManager); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index c1cb0b3db3f..aeac1757976 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -22,11 +22,15 @@ import java.io.DataInputStream; import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.client.ClientMmap; +import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.util.DirectBufferPool; import org.apache.hadoop.io.IOUtils; @@ -87,6 +91,8 @@ class BlockReaderLocal implements BlockReader { private final ExtendedBlock block; private final FileInputStreamCache fisCache; + private ClientMmap clientMmap; + private boolean mmapDisabled; private static int getSlowReadBufferNumChunks(int bufSize, int bytesPerChecksum) { @@ -113,6 +119,8 @@ class BlockReaderLocal implements BlockReader { this.datanodeID = datanodeID; this.block = block; this.fisCache = fisCache; + this.clientMmap = null; + this.mmapDisabled = false; // read and handle the common header here. For now just a version checksumIn.getChannel().position(0); @@ -487,6 +495,10 @@ class BlockReaderLocal implements BlockReader { @Override public synchronized void close() throws IOException { + if (clientMmap != null) { + clientMmap.unref(); + clientMmap = null; + } if (fisCache != null) { if (LOG.isDebugEnabled()) { LOG.debug("putting FileInputStream for " + filename + @@ -534,4 +546,30 @@ class BlockReaderLocal implements BlockReader { public boolean isShortCircuit() { return true; } + + @Override + public ClientMmap getClientMmap(LocatedBlock curBlock, + ClientMmapManager mmapManager) { + if (clientMmap == null) { + if (mmapDisabled) { + return null; + } + try { + clientMmap = mmapManager.fetch(datanodeID, block, dataIn); + if (clientMmap == null) { + mmapDisabled = true; + return null; + } + } catch (InterruptedException e) { + LOG.error("Interrupted while setting up mmap for " + filename, e); + Thread.currentThread().interrupt(); + return null; + } catch (IOException e) { + LOG.error("unable to set up mmap for " + filename, e); + mmapDisabled = true; + return null; + } + } + return clientMmap; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java index aeb6279bead..85ee41b6305 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java @@ -28,6 +28,8 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.hdfs.client.ClientMmap; +import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -35,6 +37,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo; import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.util.DirectBufferPool; @@ -701,4 +704,10 @@ class BlockReaderLocalLegacy implements BlockReader { public boolean isShortCircuit() { return true; } + + @Override + public ClientMmap getClientMmap(LocatedBlock curBlock, + ClientMmapManager mmapManager) { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index bb33497ac65..0cec1ac546f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -106,6 +106,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.VolumeId; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.protocol.ClientProtocol; @@ -208,7 +209,43 @@ public class DFSClient implements java.io.Closeable { private boolean shouldUseLegacyBlockReaderLocal; private final CachingStrategy defaultReadCachingStrategy; private final CachingStrategy defaultWriteCachingStrategy; + private ClientMmapManager mmapManager; + private static final ClientMmapManagerFactory MMAP_MANAGER_FACTORY = + new ClientMmapManagerFactory(); + + private static final class ClientMmapManagerFactory { + private ClientMmapManager mmapManager = null; + /** + * Tracks the number of users of mmapManager. + */ + private int refcnt = 0; + + synchronized ClientMmapManager get(Configuration conf) { + if (refcnt++ == 0) { + mmapManager = ClientMmapManager.fromConf(conf); + } else { + String mismatches = mmapManager.verifyConfigurationMatches(conf); + if (!mismatches.isEmpty()) { + LOG.warn("The ClientMmapManager settings you specified " + + "have been ignored because another thread created the " + + "ClientMmapManager first. " + mismatches); + } + } + return mmapManager; + } + + synchronized void unref(ClientMmapManager mmapManager) { + if (this.mmapManager != mmapManager) { + throw new IllegalArgumentException(); + } + if (--refcnt == 0) { + IOUtils.cleanup(LOG, mmapManager); + mmapManager = null; + } + } + } + /** * DFSClient configuration */ @@ -536,6 +573,7 @@ public class DFSClient implements java.io.Closeable { new CachingStrategy(readDropBehind, readahead); this.defaultWriteCachingStrategy = new CachingStrategy(writeDropBehind, readahead); + this.mmapManager = MMAP_MANAGER_FACTORY.get(conf); } /** @@ -740,9 +778,12 @@ public class DFSClient implements java.io.Closeable { /** Abort and release resources held. Ignore all errors. */ void abort() { + if (mmapManager != null) { + MMAP_MANAGER_FACTORY.unref(mmapManager); + mmapManager = null; + } clientRunning = false; closeAllFilesBeingWritten(true); - try { // remove reference to this client and stop the renewer, // if there is no more clients under the renewer. @@ -786,6 +827,10 @@ public class DFSClient implements java.io.Closeable { */ @Override public synchronized void close() throws IOException { + if (mmapManager != null) { + MMAP_MANAGER_FACTORY.unref(mmapManager); + mmapManager = null; + } if(clientRunning) { closeAllFilesBeingWritten(false); clientRunning = false; @@ -2514,4 +2559,9 @@ public class DFSClient implements java.io.Closeable { public CachingStrategy getDefaultWriteCachingStrategy() { return defaultWriteCachingStrategy; } + + @VisibleForTesting + public ClientMmapManager getMmapManager() { + return mmapManager; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index e2a6c4c0185..9d6ca70ebbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -376,6 +376,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_DEFAULT = 1024 * 1024; public static final String DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC = "dfs.client.domain.socket.data.traffic"; public static final boolean DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT = false; + public static final String DFS_CLIENT_MMAP_CACHE_SIZE = "dfs.client.mmap.cache.size"; + public static final int DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT = 1024; + public static final String DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS = "dfs.client.mmap.cache.timeout.ms"; + public static final long DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT = 15 * 60 * 1000; + public static final String DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT = "dfs.client.mmap.cache.timeout.ms"; + public static final int DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT = 4; // property for fsimage compression public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 4131ffa4426..74fcc6f14ea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -24,6 +24,7 @@ import java.net.Socket; import java.nio.ByteBuffer; import java.util.AbstractMap; import java.util.ArrayList; +import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -36,11 +37,15 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ByteBufferReadable; +import org.apache.hadoop.fs.ByteBufferUtil; import org.apache.hadoop.fs.CanSetDropBehind; import org.apache.hadoop.fs.CanSetReadahead; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; +import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.hdfs.client.ClientMmap; import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.TcpPeerServer; @@ -54,12 +59,14 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException; +import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.IdentityHashStore; import com.google.common.annotations.VisibleForTesting; @@ -69,7 +76,8 @@ import com.google.common.annotations.VisibleForTesting; ****************************************************************/ @InterfaceAudience.Private public class DFSInputStream extends FSInputStream -implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead { +implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, + HasEnhancedByteBufferAccess { @VisibleForTesting static boolean tcpReadsDisabledForTesting = false; private final PeerCache peerCache; @@ -87,17 +95,28 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead { private CachingStrategy cachingStrategy; private final ReadStatistics readStatistics = new ReadStatistics(); + /** + * Track the ByteBuffers that we have handed out to readers. + * + * The value type can be either ByteBufferPool or ClientMmap, depending on + * whether we this is a memory-mapped buffer or not. + */ + private final IdentityHashStore + extendedReadBuffers = new IdentityHashStore(0); + public static class ReadStatistics { public ReadStatistics() { this.totalBytesRead = 0; this.totalLocalBytesRead = 0; this.totalShortCircuitBytesRead = 0; + this.totalZeroCopyBytesRead = 0; } public ReadStatistics(ReadStatistics rhs) { this.totalBytesRead = rhs.getTotalBytesRead(); this.totalLocalBytesRead = rhs.getTotalLocalBytesRead(); this.totalShortCircuitBytesRead = rhs.getTotalShortCircuitBytesRead(); + this.totalZeroCopyBytesRead = rhs.getTotalZeroCopyBytesRead(); } /** @@ -123,6 +142,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead { public long getTotalShortCircuitBytesRead() { return totalShortCircuitBytesRead; } + + /** + * @return The total number of zero-copy bytes read. + */ + public long getTotalZeroCopyBytesRead() { + return totalZeroCopyBytesRead; + } /** * @return The total number of bytes read which were not local. @@ -145,12 +171,21 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead { this.totalLocalBytesRead += amt; this.totalShortCircuitBytesRead += amt; } + + void addZeroCopyBytes(long amt) { + this.totalBytesRead += amt; + this.totalLocalBytesRead += amt; + this.totalShortCircuitBytesRead += amt; + this.totalZeroCopyBytesRead += amt; + } private long totalBytesRead; private long totalLocalBytesRead; private long totalShortCircuitBytesRead; + + private long totalZeroCopyBytesRead; } private final FileInputStreamCache fileInputStreamCache; @@ -587,6 +622,20 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead { } dfsClient.checkOpen(); + if (!extendedReadBuffers.isEmpty()) { + final StringBuilder builder = new StringBuilder(); + extendedReadBuffers.visitAll(new IdentityHashStore.Visitor() { + private String prefix = ""; + @Override + public void accept(ByteBuffer k, Object v) { + builder.append(prefix).append(k); + prefix = ", "; + } + }); + DFSClient.LOG.warn("closing file " + src + ", but there are still " + + "unreleased ByteBuffers allocated by read(). " + + "Please release " + builder.toString() + "."); + } if (blockReader != null) { blockReader.close(); blockReader = null; @@ -1393,4 +1442,100 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead { this.cachingStrategy.setDropBehind(dropBehind); closeCurrentBlockReader(); } + + @Override + public synchronized ByteBuffer read(ByteBufferPool bufferPool, + int maxLength, EnumSet opts) + throws IOException, UnsupportedOperationException { + assert(maxLength > 0); + if (((blockReader == null) || (blockEnd == -1)) && + (pos < getFileLength())) { + /* + * If we don't have a blockReader, or the one we have has no more bytes + * left to read, we call seekToBlockSource to get a new blockReader and + * recalculate blockEnd. Note that we assume we're not at EOF here + * (we check this above). + */ + if ((!seekToBlockSource(pos)) || (blockReader == null)) { + throw new IOException("failed to allocate new BlockReader " + + "at position " + pos); + } + } + boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS); + if (canSkipChecksums) { + ByteBuffer buffer = tryReadZeroCopy(maxLength); + if (buffer != null) { + return buffer; + } + } + ByteBuffer buffer = ByteBufferUtil. + fallbackRead(this, bufferPool, maxLength); + if (buffer != null) { + extendedReadBuffers.put(buffer, bufferPool); + } + return buffer; + } + + private synchronized ByteBuffer tryReadZeroCopy(int maxLength) + throws IOException { + // Java ByteBuffers can't be longer than 2 GB, because they use + // 4-byte signed integers to represent capacity, etc. + // So we can't mmap the parts of the block higher than the 2 GB offset. + // FIXME: we could work around this with multiple memory maps. + // See HDFS-5101. + long blockEnd32 = Math.min(Integer.MAX_VALUE, blockEnd); + long curPos = pos; + long blockLeft = blockEnd32 - curPos + 1; + if (blockLeft <= 0) { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("unable to perform a zero-copy read from offset " + + curPos + " of " + src + "; blockLeft = " + blockLeft + + "; blockEnd32 = " + blockEnd32 + ", blockEnd = " + blockEnd + + "; maxLength = " + maxLength); + } + return null; + } + int length = Math.min((int)blockLeft, maxLength); + long blockStartInFile = currentLocatedBlock.getStartOffset(); + long blockPos = curPos - blockStartInFile; + long limit = blockPos + length; + ClientMmap clientMmap = + blockReader.getClientMmap(currentLocatedBlock, + dfsClient.getMmapManager()); + if (clientMmap == null) { + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("unable to perform a zero-copy read from offset " + + curPos + " of " + src + "; BlockReader#getClientMmap returned " + + "null."); + } + return null; + } + seek(pos + length); + ByteBuffer buffer = clientMmap.getMappedByteBuffer().asReadOnlyBuffer(); + buffer.position((int)blockPos); + buffer.limit((int)limit); + clientMmap.ref(); + extendedReadBuffers.put(buffer, clientMmap); + readStatistics.addZeroCopyBytes(length); + if (DFSClient.LOG.isDebugEnabled()) { + DFSClient.LOG.debug("readZeroCopy read " + maxLength + " bytes from " + + "offset " + curPos + " via the zero-copy read path. " + + "blockEnd = " + blockEnd); + } + return buffer; + } + + @Override + public synchronized void releaseBuffer(ByteBuffer buffer) { + Object val = extendedReadBuffers.remove(buffer); + if (val == null) { + throw new IllegalArgumentException("tried to release a buffer " + + "that was not created by this stream, " + buffer); + } + if (val instanceof ClientMmap) { + ((ClientMmap)val).unref(); + } else if (val instanceof ByteBufferPool) { + ((ByteBufferPool)val).putBuffer(buffer); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 9d69dc18182..f587c3b5d58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -27,9 +27,12 @@ import java.nio.ByteBuffer; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.FSInputChecker; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.ClientMmap; +import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; @@ -485,4 +488,10 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { public boolean isShortCircuit() { return false; } + + @Override + public ClientMmap getClientMmap(LocatedBlock curBlock, + ClientMmapManager mmapManager) { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 27726ff9fd1..521fb70aa38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -29,9 +29,12 @@ import java.nio.channels.ReadableByteChannel; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.client.ClientMmap; +import org.apache.hadoop.hdfs.client.ClientMmapManager; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver; @@ -40,7 +43,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; @@ -451,4 +453,10 @@ public class RemoteBlockReader2 implements BlockReader { public boolean isShortCircuit() { return false; } + + @Override + public ClientMmap getClientMmap(LocatedBlock curBlock, + ClientMmapManager manager) { + return null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java new file mode 100644 index 00000000000..566c2b5457c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client; + +import java.io.FileInputStream; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.nio.MappedByteBuffer; +import java.nio.channels.FileChannel.MapMode; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** + * A memory-mapped region used by an HDFS client. + * + * This class includes a reference count and some other information used by + * ClientMmapManager to track and cache mmaps. + */ +@InterfaceAudience.Private +public class ClientMmap { + static final Log LOG = LogFactory.getLog(ClientMmap.class); + + /** + * A reference to the manager of this mmap. + * + * This is only a weak reference to help minimize the damange done by + * code which leaks references accidentally. + */ + private final WeakReference manager; + + /** + * The actual mapped memory region. + */ + private final MappedByteBuffer map; + + /** + * A reference count tracking how many threads are using this object. + */ + private final AtomicInteger refCount = new AtomicInteger(1); + + /** + * Block pertaining to this mmap + */ + private final ExtendedBlock block; + + /** + * The DataNode where this mmap came from. + */ + private final DatanodeID datanodeID; + + /** + * The monotonic time when this mmap was last evictable. + */ + private long lastEvictableTimeNs; + + public static ClientMmap load(ClientMmapManager manager, FileInputStream in, + ExtendedBlock block, DatanodeID datanodeID) + throws IOException { + MappedByteBuffer map = + in.getChannel().map(MapMode.READ_ONLY, 0, + in.getChannel().size()); + return new ClientMmap(manager, map, block, datanodeID); + } + + private ClientMmap(ClientMmapManager manager, MappedByteBuffer map, + ExtendedBlock block, DatanodeID datanodeID) + throws IOException { + this.manager = new WeakReference(manager); + this.map = map; + this.block = block; + this.datanodeID = datanodeID; + this.lastEvictableTimeNs = 0; + } + + /** + * Decrement the reference count on this object. + * Should be called with the ClientMmapManager lock held. + */ + public void unref() { + int count = refCount.decrementAndGet(); + if (count < 0) { + throw new IllegalArgumentException("can't decrement the " + + "reference count on this ClientMmap lower than 0."); + } else if (count == 0) { + ClientMmapManager man = manager.get(); + if (man == null) { + unmap(); + } else { + man.makeEvictable(this); + } + } + } + + /** + * Increment the reference count on this object. + * + * @return The new reference count. + */ + public int ref() { + return refCount.getAndIncrement(); + } + + @VisibleForTesting + public ExtendedBlock getBlock() { + return block; + } + + DatanodeID getDatanodeID() { + return datanodeID; + } + + public MappedByteBuffer getMappedByteBuffer() { + return map; + } + + public void setLastEvictableTimeNs(long lastEvictableTimeNs) { + this.lastEvictableTimeNs = lastEvictableTimeNs; + } + + public long getLastEvictableTimeNs() { + return this.lastEvictableTimeNs; + } + + /** + * Unmap the memory region. + * + * There isn't any portable way to unmap a memory region in Java. + * So we use the sun.nio method here. + * Note that unmapping a memory region could cause crashes if code + * continues to reference the unmapped code. However, if we don't + * manually unmap the memory, we are dependent on the finalizer to + * do it, and we have no idea when the finalizer will run. + */ + void unmap() { + assert(refCount.get() == 0); + if (map instanceof sun.nio.ch.DirectBuffer) { + final sun.misc.Cleaner cleaner = + ((sun.nio.ch.DirectBuffer) map).cleaner(); + cleaner.clean(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java new file mode 100644 index 00000000000..856e586e8e6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmapManager.java @@ -0,0 +1,482 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client; + +import java.io.Closeable; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.ref.WeakReference; +import java.util.Iterator; +import java.util.TreeMap; +import java.util.Map.Entry; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.io.IOUtils; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ComparisonChain; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Tracks mmap instances used on an HDFS client. + * + * mmaps can be used concurrently by multiple threads at once. + * mmaps cannot be closed while they are in use. + * + * The cache is important for performance, because the first time an mmap is + * created, the page table entries (PTEs) are not yet set up. + * Even when reading data that is entirely resident in memory, reading an + * mmap the second time is faster. + */ +@InterfaceAudience.Private +public class ClientMmapManager implements Closeable { + public static final Log LOG = LogFactory.getLog(ClientMmapManager.class); + + private boolean closed = false; + + private final int cacheSize; + + private final long timeoutNs; + + private final int runsPerTimeout; + + private final Lock lock = new ReentrantLock(); + + /** + * Maps block, datanode_id to the client mmap object. + * If the ClientMmap is in the process of being loaded, + * {@link Waitable#await()} will block. + * + * Protected by the ClientMmapManager lock. + */ + private final TreeMap> mmaps = + new TreeMap>(); + + /** + * Maps the last use time to the client mmap object. + * We ensure that each last use time is unique by inserting a jitter of a + * nanosecond or two if necessary. + * + * Protected by the ClientMmapManager lock. + * ClientMmap objects that are in use are never evictable. + */ + private final TreeMap evictable = + new TreeMap(); + + private final ScheduledThreadPoolExecutor executor = + new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder(). + setDaemon(true).setNameFormat("ClientMmapManager"). + build()); + + /** + * The CacheCleaner for this ClientMmapManager. We don't create this + * and schedule it until it becomes necessary. + */ + private CacheCleaner cacheCleaner; + + /** + * Factory method to create a ClientMmapManager from a Hadoop + * configuration. + */ + public static ClientMmapManager fromConf(Configuration conf) { + return new ClientMmapManager(conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE, + DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT), + conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, + DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT), + conf.getInt(DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT, + DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT)); + } + + public ClientMmapManager(int cacheSize, long timeoutMs, int runsPerTimeout) { + this.cacheSize = cacheSize; + this.timeoutNs = timeoutMs * 1000000; + this.runsPerTimeout = runsPerTimeout; + } + + long getTimeoutMs() { + return this.timeoutNs / 1000000; + } + + int getRunsPerTimeout() { + return this.runsPerTimeout; + } + + public String verifyConfigurationMatches(Configuration conf) { + StringBuilder bld = new StringBuilder(); + int cacheSize = conf.getInt(DFS_CLIENT_MMAP_CACHE_SIZE, + DFS_CLIENT_MMAP_CACHE_SIZE_DEFAULT); + if (this.cacheSize != cacheSize) { + bld.append("You specified a cache size of ").append(cacheSize). + append(", but the existing cache size is ").append(this.cacheSize). + append(". "); + } + long timeoutMs = conf.getLong(DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, + DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS_DEFAULT); + if (getTimeoutMs() != timeoutMs) { + bld.append("You specified a cache timeout of ").append(timeoutMs). + append(" ms, but the existing cache timeout is "). + append(getTimeoutMs()).append("ms").append(". "); + } + int runsPerTimeout = conf.getInt( + DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT, + DFS_CLIENT_MMAP_CACHE_THREAD_RUNS_PER_TIMEOUT_DEFAULT); + if (getRunsPerTimeout() != runsPerTimeout) { + bld.append("You specified ").append(runsPerTimeout). + append(" runs per timeout, but the existing runs per timeout is "). + append(getTimeoutMs()).append(". "); + } + return bld.toString(); + } + + private static class Waitable { + private T val; + private final Condition cond; + + public Waitable(Condition cond) { + this.val = null; + this.cond = cond; + } + + public T await() throws InterruptedException { + while (this.val == null) { + this.cond.await(); + } + return this.val; + } + + public void provide(T val) { + this.val = val; + this.cond.signalAll(); + } + } + + private static class Key implements Comparable { + private final ExtendedBlock block; + private final DatanodeID datanode; + + Key(ExtendedBlock block, DatanodeID datanode) { + this.block = block; + this.datanode = datanode; + } + + /** + * Compare two ClientMmap regions that we're storing. + * + * When we append to a block, we bump the genstamp. It is important to + * compare the genStamp here. That way, we will not return a shorter + * mmap than required. + */ + @Override + public int compareTo(Key o) { + return ComparisonChain.start(). + compare(block.getBlockId(), o.block.getBlockId()). + compare(block.getGenerationStamp(), o.block.getGenerationStamp()). + compare(block.getBlockPoolId(), o.block.getBlockPoolId()). + compare(datanode, o.datanode). + result(); + } + + @Override + public boolean equals(Object rhs) { + if (rhs == null) { + return false; + } + try { + Key o = (Key)rhs; + return (compareTo(o) == 0); + } catch (ClassCastException e) { + return false; + } + } + + @Override + public int hashCode() { + return block.hashCode() ^ datanode.hashCode(); + } + } + + /** + * Thread which handles expiring mmaps from the cache. + */ + private static class CacheCleaner implements Runnable, Closeable { + private WeakReference managerRef; + private ScheduledFuture future; + + CacheCleaner(ClientMmapManager manager) { + this.managerRef= new WeakReference(manager); + } + + @Override + public void run() { + ClientMmapManager manager = managerRef.get(); + if (manager == null) return; + long curTime = System.nanoTime(); + try { + manager.lock.lock(); + manager.evictStaleEntries(curTime); + } finally { + manager.lock.unlock(); + } + } + + void setFuture(ScheduledFuture future) { + this.future = future; + } + + @Override + public void close() throws IOException { + future.cancel(false); + } + } + + /** + * Evict entries which are older than curTime + timeoutNs from the cache. + * + * NOTE: you must call this function with the lock held. + */ + private void evictStaleEntries(long curTime) { + if (closed) { + return; + } + Iterator> iter = + evictable.entrySet().iterator(); + while (iter.hasNext()) { + Entry entry = iter.next(); + if (entry.getKey() + timeoutNs >= curTime) { + return; + } + ClientMmap mmap = entry.getValue(); + Key key = new Key(mmap.getBlock(), mmap.getDatanodeID()); + mmaps.remove(key); + iter.remove(); + mmap.unmap(); + } + } + + /** + * Evict one mmap object from the cache. + * + * NOTE: you must call this function with the lock held. + * + * @return True if an object was evicted; false if none + * could be evicted. + */ + private boolean evictOne() { + Entry entry = evictable.pollFirstEntry(); + if (entry == null) { + // We don't want to try creating another mmap region, because the + // cache is full. + return false; + } + ClientMmap evictedMmap = entry.getValue(); + Key evictedKey = new Key(evictedMmap.getBlock(), + evictedMmap.getDatanodeID()); + mmaps.remove(evictedKey); + evictedMmap.unmap(); + return true; + } + + /** + * Create a new mmap object. + * + * NOTE: you must call this function with the lock held. + * + * @param key The key which describes this mmap. + * @param in The input stream to use to create the mmap. + * @return The new mmap object, or null if there were + * insufficient resources. + * @throws IOException If there was an I/O error creating the mmap. + */ + private ClientMmap create(Key key, FileInputStream in) throws IOException { + if (mmaps.size() + 1 > cacheSize) { + if (!evictOne()) { + LOG.warn("mmap cache is full (with " + cacheSize + " elements) and " + + "nothing is evictable. Ignoring request for mmap with " + + "datanodeID=" + key.datanode + ", " + "block=" + key.block); + return null; + } + } + // Create the condition variable that other threads may wait on. + Waitable waitable = + new Waitable(lock.newCondition()); + mmaps.put(key, waitable); + // Load the entry + boolean success = false; + ClientMmap mmap = null; + try { + try { + lock.unlock(); + mmap = ClientMmap.load(this, in, key.block, key.datanode); + } finally { + lock.lock(); + } + if (cacheCleaner == null) { + cacheCleaner = new CacheCleaner(this); + ScheduledFuture future = + executor.scheduleAtFixedRate(cacheCleaner, + timeoutNs, timeoutNs / runsPerTimeout, TimeUnit.NANOSECONDS); + cacheCleaner.setFuture(future); + } + success = true; + } finally { + if (!success) { + LOG.warn("failed to create mmap for datanodeID=" + key.datanode + + ", " + "block=" + key.block); + mmaps.remove(key); + } + waitable.provide(mmap); + } + if (LOG.isDebugEnabled()) { + LOG.info("created a new ClientMmap for block " + key.block + + " on datanode " + key.datanode); + } + return mmap; + } + + /** + * Get or create an mmap region. + * + * @param node The DataNode that owns the block for this mmap region. + * @param block The block ID, block pool ID, and generation stamp of + * the block we want to read. + * @param in An open file for this block. This stream is only used + * if we have to create a new mmap; if we use an + * existing one, it is ignored. + * + * @return The client mmap region. + */ + public ClientMmap fetch(DatanodeID datanodeID, ExtendedBlock block, + FileInputStream in) throws IOException, InterruptedException { + LOG.debug("fetching mmap with datanodeID=" + datanodeID + ", " + + "block=" + block); + Key key = new Key(block, datanodeID); + ClientMmap mmap = null; + try { + lock.lock(); + if (closed) { + throw new IOException("ClientMmapManager is closed."); + } + while (mmap == null) { + Waitable entry = mmaps.get(key); + if (entry == null) { + return create(key, in); + } + mmap = entry.await(); + } + if (mmap.ref() == 1) { + // When going from nobody using the mmap (ref = 0) to somebody + // using the mmap (ref = 1), we must make the mmap un-evictable. + evictable.remove(mmap.getLastEvictableTimeNs()); + } + } + finally { + lock.unlock(); + } + if (LOG.isDebugEnabled()) { + LOG.debug("reusing existing mmap with datanodeID=" + datanodeID + + ", " + "block=" + block); + } + return mmap; + } + + /** + * Make an mmap evictable. + * + * When an mmap is evictable, it may be removed from the cache if necessary. + * mmaps can only be evictable if nobody is using them. + * + * @param mmap The mmap to make evictable. + */ + void makeEvictable(ClientMmap mmap) { + try { + lock.lock(); + if (closed) { + // If this ClientMmapManager is closed, then don't bother with the + // cache; just close the mmap. + mmap.unmap(); + return; + } + long now = System.nanoTime(); + while (evictable.containsKey(now)) { + now++; + } + mmap.setLastEvictableTimeNs(now); + evictable.put(now, mmap); + } finally { + lock.unlock(); + } + } + + @Override + public void close() throws IOException { + try { + lock.lock(); + closed = true; + IOUtils.cleanup(LOG, cacheCleaner); + + // Unmap all the mmaps that nobody is using. + // The ones which are in use will be unmapped just as soon as people stop + // using them. + evictStaleEntries(Long.MAX_VALUE); + + executor.shutdown(); + } finally { + lock.unlock(); + } + } + + @VisibleForTesting + public interface ClientMmapVisitor { + void accept(ClientMmap mmap); + } + + @VisibleForTesting + public synchronized void visitMmaps(ClientMmapVisitor visitor) + throws InterruptedException { + for (Waitable entry : mmaps.values()) { + visitor.accept(entry.await()); + } + } + + public void visitEvictable(ClientMmapVisitor visitor) + throws InterruptedException { + for (ClientMmap mmap : evictable.values()) { + visitor.accept(mmap); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c new file mode 100644 index 00000000000..6b80ea90a20 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.c @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "expect.h" +#include "hdfs.h" + +#include +#include +#include +#include + +int expectFileStats(hdfsFile file, + uint64_t expectedTotalBytesRead, + uint64_t expectedTotalLocalBytesRead, + uint64_t expectedTotalShortCircuitBytesRead, + uint64_t expectedTotalZeroCopyBytesRead) +{ + struct hdfsReadStatistics *stats = NULL; + EXPECT_ZERO(hdfsFileGetReadStatistics(file, &stats)); + fprintf(stderr, "expectFileStats(expectedTotalBytesRead=%"PRId64", " + "expectedTotalLocalBytesRead=%"PRId64", " + "expectedTotalShortCircuitBytesRead=%"PRId64", " + "expectedTotalZeroCopyBytesRead=%"PRId64", " + "totalBytesRead=%"PRId64", " + "totalLocalBytesRead=%"PRId64", " + "totalShortCircuitBytesRead=%"PRId64", " + "totalZeroCopyBytesRead=%"PRId64")\n", + expectedTotalBytesRead, + expectedTotalLocalBytesRead, + expectedTotalShortCircuitBytesRead, + expectedTotalZeroCopyBytesRead, + stats->totalBytesRead, + stats->totalLocalBytesRead, + stats->totalShortCircuitBytesRead, + stats->totalZeroCopyBytesRead); + if (expectedTotalBytesRead != UINT64_MAX) { + EXPECT_INT64_EQ(expectedTotalBytesRead, stats->totalBytesRead); + } + if (expectedTotalLocalBytesRead != UINT64_MAX) { + EXPECT_INT64_EQ(expectedTotalLocalBytesRead, + stats->totalLocalBytesRead); + } + if (expectedTotalShortCircuitBytesRead != UINT64_MAX) { + EXPECT_INT64_EQ(expectedTotalShortCircuitBytesRead, + stats->totalShortCircuitBytesRead); + } + if (expectedTotalZeroCopyBytesRead != UINT64_MAX) { + EXPECT_INT64_EQ(expectedTotalZeroCopyBytesRead, + stats->totalZeroCopyBytesRead); + } + hdfsFileFreeReadStatistics(stats); + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h index 9d5d863881b..3dc777127dc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h @@ -19,16 +19,19 @@ #ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H #define LIBHDFS_NATIVE_TESTS_EXPECT_H +#include #include +struct hdfsFile_internal; + #define EXPECT_ZERO(x) \ do { \ int __my_ret__ = x; \ if (__my_ret__) { \ int __my_errno__ = errno; \ - fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ + fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \ "code %d (errno: %d): got nonzero from %s\n", \ - __LINE__, __my_ret__, __my_errno__, #x); \ + __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \ return __my_ret__; \ } \ } while (0); @@ -38,9 +41,9 @@ void* __my_ret__ = x; \ int __my_errno__ = errno; \ if (__my_ret__ != NULL) { \ - fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \ + fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \ "got non-NULL value %p from %s\n", \ - __LINE__, __my_errno__, __my_ret__, #x); \ + __FILE__, __LINE__, __my_errno__, __my_ret__, #x); \ return -1; \ } \ } while (0); @@ -50,8 +53,8 @@ void* __my_ret__ = x; \ int __my_errno__ = errno; \ if (__my_ret__ == NULL) { \ - fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \ - "got NULL from %s\n", __LINE__, __my_errno__, #x); \ + fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \ + "got NULL from %s\n", __FILE__, __LINE__, __my_errno__, #x); \ return -1; \ } \ } while (0); @@ -61,15 +64,16 @@ int __my_ret__ = x; \ int __my_errno__ = errno; \ if (__my_ret__ != -1) { \ - fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ - "code %d (errno: %d): expected -1 from %s\n", __LINE__, \ + fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \ + "code %d (errno: %d): expected -1 from %s\n", \ + __FILE__, __LINE__, \ __my_ret__, __my_errno__, #x); \ return -1; \ } \ if (__my_errno__ != e) { \ - fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ + fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \ "code %d (errno: %d): expected errno = %d from %s\n", \ - __LINE__, __my_ret__, __my_errno__, e, #x); \ + __FILE__, __LINE__, __my_ret__, __my_errno__, e, #x); \ return -1; \ } \ } while (0); @@ -79,9 +83,9 @@ int __my_ret__ = x; \ int __my_errno__ = errno; \ if (!__my_ret__) { \ - fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ - "code %d (errno: %d): got zero from %s\n", __LINE__, \ - __my_ret__, __my_errno__, #x); \ + fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \ + "code %d (errno: %d): got zero from %s\n", __FILE__, __LINE__, \ + __my_ret__, __my_errno__, #x); \ return -1; \ } \ } while (0); @@ -91,9 +95,9 @@ int __my_ret__ = x; \ int __my_errno__ = errno; \ if (__my_ret__ < 0) { \ - fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ + fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \ "code %d (errno: %d): got negative return from %s\n", \ - __LINE__, __my_ret__, __my_errno__, #x); \ + __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \ return __my_ret__; \ } \ } while (0); @@ -103,9 +107,21 @@ int __my_ret__ = y; \ int __my_errno__ = errno; \ if (__my_ret__ != (x)) { \ - fprintf(stderr, "TEST_ERROR: failed on line %d with return " \ + fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \ "code %d (errno: %d): expected %d\n", \ - __LINE__, __my_ret__, __my_errno__, (x)); \ + __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \ + return -1; \ + } \ + } while (0); + +#define EXPECT_INT64_EQ(x, y) \ + do { \ + int64_t __my_ret__ = y; \ + int __my_errno__ = errno; \ + if (__my_ret__ != (x)) { \ + fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \ + "value %"PRId64" (errno: %d): expected %"PRId64"\n", \ + __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \ return -1; \ } \ } while (0); @@ -117,4 +133,17 @@ ret = -errno; \ } while (ret == -EINTR); +/** + * Test that an HDFS file has the given statistics. + * + * Any parameter can be set to UINT64_MAX to avoid checking it. + * + * @return 0 on success; error code otherwise + */ +int expectFileStats(struct hdfsFile_internal *file, + uint64_t expectedTotalBytesRead, + uint64_t expectedTotalLocalBytesRead, + uint64_t expectedTotalShortCircuitBytesRead, + uint64_t expectedTotalZeroCopyBytesRead); + #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c index 27824347692..bd1ae5d40ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c @@ -39,6 +39,7 @@ #define JAVA_NET_ISA "java/net/InetSocketAddress" #define JAVA_NET_URI "java/net/URI" #define JAVA_STRING "java/lang/String" +#define READ_OPTION "org/apache/hadoop/fs/ReadOption" #define JAVA_VOID "V" @@ -143,6 +144,15 @@ int hdfsFileGetReadStatistics(hdfsFile file, goto done; } s->totalShortCircuitBytesRead = jVal.j; + jthr = invokeMethod(env, &jVal, INSTANCE, readStats, + "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics", + "getTotalZeroCopyBytesRead", "()J"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed"); + goto done; + } + s->totalZeroCopyBytesRead = jVal.j; *stats = s; s = NULL; ret = 0; @@ -183,6 +193,25 @@ void hdfsFileDisableDirectRead(hdfsFile file) file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ; } +int hdfsDisableDomainSocketSecurity(void) +{ + jthrowable jthr; + JNIEnv* env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return -1; + } + jthr = invokeMethod(env, NULL, STATIC, NULL, + "org/apache/hadoop/net/unix/DomainSocket", + "disableBindPathValidation", "()V"); + if (jthr) { + errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "DomainSocket#disableBindPathValidation"); + return -1; + } + return 0; +} + /** * hdfsJniEnv: A wrapper struct to be used as 'value' * while saving thread -> JNIEnv* mappings @@ -220,40 +249,6 @@ static jthrowable constructNewObjectOfPath(JNIEnv *env, const char *path, return NULL; } -/** - * Set a configuration value. - * - * @param env The JNI environment - * @param jConfiguration The configuration object to modify - * @param key The key to modify - * @param value The value to set the key to - * - * @return NULL on success; exception otherwise - */ -static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration, - const char *key, const char *value) -{ - jthrowable jthr; - jstring jkey = NULL, jvalue = NULL; - - jthr = newJavaStr(env, key, &jkey); - if (jthr) - goto done; - jthr = newJavaStr(env, value, &jvalue); - if (jthr) - goto done; - jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration, - HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING), - JPARAM(JAVA_STRING), JAVA_VOID), - jkey, jvalue); - if (jthr) - goto done; -done: - destroyLocalReference(env, jkey); - destroyLocalReference(env, jvalue); - return jthr; -} - static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration, const char *key, char **val) { @@ -2108,6 +2103,395 @@ int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime) return 0; } +/** + * Zero-copy options. + * + * We cache the EnumSet of ReadOptions which has to be passed into every + * readZero call, to avoid reconstructing it each time. This cache is cleared + * whenever an element changes. + */ +struct hadoopRzOptions +{ + JNIEnv *env; + int skipChecksums; + jobject byteBufferPool; + jobject cachedEnumSet; +}; + +struct hadoopRzOptions *hadoopRzOptionsAlloc(void) +{ + struct hadoopRzOptions *opts; + JNIEnv *env; + + env = getJNIEnv(); + if (!env) { + // Check to make sure the JNI environment is set up properly. + errno = EINTERNAL; + return NULL; + } + opts = calloc(1, sizeof(struct hadoopRzOptions)); + if (!opts) { + errno = ENOMEM; + return NULL; + } + return opts; +} + +static void hadoopRzOptionsClearCached(JNIEnv *env, + struct hadoopRzOptions *opts) +{ + if (!opts->cachedEnumSet) { + return; + } + (*env)->DeleteGlobalRef(env, opts->cachedEnumSet); + opts->cachedEnumSet = NULL; +} + +int hadoopRzOptionsSetSkipChecksum( + struct hadoopRzOptions *opts, int skip) +{ + JNIEnv *env; + env = getJNIEnv(); + if (!env) { + errno = EINTERNAL; + return -1; + } + hadoopRzOptionsClearCached(env, opts); + opts->skipChecksums = !!skip; + return 0; +} + +int hadoopRzOptionsSetByteBufferPool( + struct hadoopRzOptions *opts, const char *className) +{ + JNIEnv *env; + jthrowable jthr; + jobject byteBufferPool = NULL; + + env = getJNIEnv(); + if (!env) { + errno = EINTERNAL; + return -1; + } + + // Note: we don't have to call hadoopRzOptionsClearCached in this + // function, since the ByteBufferPool is passed separately from the + // EnumSet of ReadOptions. + + jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V"); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hadoopRzOptionsSetByteBufferPool(className=%s): ", className); + errno = EINVAL; + return -1; + } + if (opts->byteBufferPool) { + // Delete any previous ByteBufferPool we had. + (*env)->DeleteGlobalRef(env, opts->byteBufferPool); + } + opts->byteBufferPool = byteBufferPool; + return 0; +} + +void hadoopRzOptionsFree(struct hadoopRzOptions *opts) +{ + JNIEnv *env; + env = getJNIEnv(); + if (!env) { + return; + } + hadoopRzOptionsClearCached(env, opts); + if (opts->byteBufferPool) { + (*env)->DeleteGlobalRef(env, opts->byteBufferPool); + opts->byteBufferPool = NULL; + } + free(opts); +} + +struct hadoopRzBuffer +{ + jobject byteBuffer; + uint8_t *ptr; + int32_t length; + int direct; +}; + +static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env, + struct hadoopRzOptions *opts, jobject *enumSet) +{ + jthrowable jthr = NULL; + jobject enumInst = NULL, enumSetObj = NULL; + jvalue jVal; + + if (opts->cachedEnumSet) { + // If we cached the value, return it now. + *enumSet = opts->cachedEnumSet; + goto done; + } + if (opts->skipChecksums) { + jthr = fetchEnumInstance(env, READ_OPTION, + "SKIP_CHECKSUMS", &enumInst); + if (jthr) { + goto done; + } + jthr = invokeMethod(env, &jVal, STATIC, NULL, + "java/util/EnumSet", "of", + "(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst); + if (jthr) { + goto done; + } + enumSetObj = jVal.l; + } else { + jclass clazz = (*env)->FindClass(env, READ_OPTION); + if (!clazz) { + jthr = newRuntimeError(env, "failed " + "to find class for %s", READ_OPTION); + goto done; + } + jthr = invokeMethod(env, &jVal, STATIC, NULL, + "java/util/EnumSet", "noneOf", + "(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz); + enumSetObj = jVal.l; + } + // create global ref + opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj); + if (!opts->cachedEnumSet) { + jthr = getPendingExceptionAndClear(env); + goto done; + } + *enumSet = opts->cachedEnumSet; + jthr = NULL; +done: + (*env)->DeleteLocalRef(env, enumInst); + (*env)->DeleteLocalRef(env, enumSetObj); + return jthr; +} + +static int hadoopReadZeroExtractBuffer(JNIEnv *env, + const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer) +{ + int ret; + jthrowable jthr; + jvalue jVal; + uint8_t *directStart; + void *mallocBuf = NULL; + jint position; + jarray array = NULL; + + jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer, + "java/nio/ByteBuffer", "remaining", "()I"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: "); + goto done; + } + buffer->length = jVal.i; + jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer, + "java/nio/ByteBuffer", "position", "()I"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hadoopReadZeroExtractBuffer: ByteBuffer#position failed: "); + goto done; + } + position = jVal.i; + directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer); + if (directStart) { + // Handle direct buffers. + buffer->ptr = directStart + position; + buffer->direct = 1; + ret = 0; + goto done; + } + // Handle indirect buffers. + // The JNI docs don't say that GetDirectBufferAddress throws any exceptions + // when it fails. However, they also don't clearly say that it doesn't. It + // seems safest to clear any pending exceptions here, to prevent problems on + // various JVMs. + (*env)->ExceptionClear(env); + if (!opts->byteBufferPool) { + fputs("hadoopReadZeroExtractBuffer: we read through the " + "zero-copy path, but failed to get the address of the buffer via " + "GetDirectBufferAddress. Please make sure your JVM supports " + "GetDirectBufferAddress.\n", stderr); + ret = ENOTSUP; + goto done; + } + // Get the backing array object of this buffer. + jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer, + "java/nio/ByteBuffer", "array", "()[B"); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hadoopReadZeroExtractBuffer: ByteBuffer#array failed: "); + goto done; + } + array = jVal.l; + if (!array) { + fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.", + stderr); + ret = EIO; + goto done; + } + mallocBuf = malloc(buffer->length); + if (!mallocBuf) { + fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n", + buffer->length); + ret = ENOMEM; + goto done; + } + (*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf); + jthr = (*env)->ExceptionOccurred(env); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: "); + goto done; + } + buffer->ptr = mallocBuf; + buffer->direct = 0; + ret = 0; + +done: + free(mallocBuf); + (*env)->DeleteLocalRef(env, array); + return ret; +} + +static int translateZCRException(JNIEnv *env, jthrowable exc) +{ + int ret; + char *className = NULL; + jthrowable jthr = classNameOfObject(exc, env, &className); + + if (jthr) { + fputs("hadoopReadZero: failed to get class name of " + "exception from read().\n", stderr); + destroyLocalReference(env, exc); + destroyLocalReference(env, jthr); + ret = EIO; + goto done; + } + if (!strcmp(className, "java.lang.UnsupportedOperationException")) { + ret = EPROTONOSUPPORT; + goto done; + } + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hadoopZeroCopyRead: ZeroCopyCursor#read failed"); +done: + free(className); + return ret; +} + +struct hadoopRzBuffer* hadoopReadZero(hdfsFile file, + struct hadoopRzOptions *opts, int32_t maxLength) +{ + JNIEnv *env; + jthrowable jthr = NULL; + jvalue jVal; + jobject enumSet = NULL, byteBuffer = NULL; + struct hadoopRzBuffer* buffer = NULL; + int ret; + + env = getJNIEnv(); + if (!env) { + errno = EINTERNAL; + return NULL; + } + if (file->type != INPUT) { + fputs("Cannot read from a non-InputStream object!\n", stderr); + ret = EINVAL; + goto done; + } + buffer = calloc(1, sizeof(struct hadoopRzBuffer)); + if (!buffer) { + ret = ENOMEM; + goto done; + } + jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet); + if (jthr) { + ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hadoopReadZero: hadoopRzOptionsGetEnumSet failed: "); + goto done; + } + jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read", + "(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)" + "Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet); + if (jthr) { + ret = translateZCRException(env, jthr); + goto done; + } + byteBuffer = jVal.l; + if (!byteBuffer) { + buffer->byteBuffer = NULL; + buffer->length = 0; + buffer->ptr = NULL; + } else { + buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer); + if (!buffer->byteBuffer) { + ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL, + "hadoopReadZero: failed to create global ref to ByteBuffer"); + goto done; + } + ret = hadoopReadZeroExtractBuffer(env, opts, buffer); + if (ret) { + goto done; + } + } + ret = 0; +done: + (*env)->DeleteLocalRef(env, byteBuffer); + if (ret) { + if (buffer) { + if (buffer->byteBuffer) { + (*env)->DeleteGlobalRef(env, buffer->byteBuffer); + } + free(buffer); + } + errno = ret; + return NULL; + } else { + errno = 0; + } + return buffer; +} + +int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer) +{ + return buffer->length; +} + +const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer) +{ + return buffer->ptr; +} + +void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer) +{ + jvalue jVal; + jthrowable jthr; + JNIEnv* env; + + env = getJNIEnv(); + if (env == NULL) { + errno = EINTERNAL; + return; + } + if (buffer->byteBuffer) { + jthr = invokeMethod(env, &jVal, INSTANCE, file->file, + HADOOP_ISTRM, "releaseBuffer", + "(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "hadoopRzBufferFree: releaseBuffer failed: "); + // even on error, we have to delete the reference. + } + (*env)->DeleteGlobalRef(env, buffer->byteBuffer); + } + if (!buffer->direct) { + free(buffer->ptr); + } + memset(buffer, 0, sizeof(*buffer)); + free(buffer); +} + char*** hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h index 1871665955c..f9f3840d471 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h @@ -36,6 +36,8 @@ #define EINTERNAL 255 #endif +#define ELASTIC_BYTE_BUFFER_POOL_CLASS \ + "org/apache/hadoop/io/ElasticByteBufferPool" /** All APIs set errno to meaningful values */ @@ -65,6 +67,10 @@ extern "C" { struct hdfsFile_internal; typedef struct hdfsFile_internal* hdfsFile; + struct hadoopRzOptions; + + struct hadoopRzBuffer; + /** * Determine if a file is open for read. * @@ -85,6 +91,7 @@ extern "C" { uint64_t totalBytesRead; uint64_t totalLocalBytesRead; uint64_t totalShortCircuitBytesRead; + uint64_t totalZeroCopyBytesRead; }; /** @@ -680,7 +687,107 @@ extern "C" { * @return 0 on success else -1 */ int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime); - + + /** + * Allocate a zero-copy options structure. + * + * You must free all options structures allocated with this function using + * hadoopRzOptionsFree. + * + * @return A zero-copy options structure, or NULL if one could + * not be allocated. If NULL is returned, errno will + * contain the error number. + */ + struct hadoopRzOptions *hadoopRzOptionsAlloc(void); + + /** + * Determine whether we should skip checksums in read0. + * + * @param opts The options structure. + * @param skip Nonzero to skip checksums sometimes; zero to always + * check them. + * + * @return 0 on success; -1 plus errno on failure. + */ + int hadoopRzOptionsSetSkipChecksum( + struct hadoopRzOptions *opts, int skip); + + /** + * Set the ByteBufferPool to use with read0. + * + * @param opts The options structure. + * @param className If this is NULL, we will not use any + * ByteBufferPool. If this is non-NULL, it will be + * treated as the name of the pool class to use. + * For example, you can use + * ELASTIC_BYTE_BUFFER_POOL_CLASS. + * + * @return 0 if the ByteBufferPool class was found and + * instantiated; + * -1 plus errno otherwise. + */ + int hadoopRzOptionsSetByteBufferPool( + struct hadoopRzOptions *opts, const char *className); + + /** + * Free a hadoopRzOptionsFree structure. + * + * @param opts The options structure to free. + * Any associated ByteBufferPool will also be freed. + */ + void hadoopRzOptionsFree(struct hadoopRzOptions *opts); + + /** + * Perform a byte buffer read. + * If possible, this will be a zero-copy (mmap) read. + * + * @param file The file to read from. + * @param opts An options structure created by hadoopRzOptionsAlloc. + * @param maxLength The maximum length to read. We may read fewer bytes + * than this length. + * + * @return On success, returns a new hadoopRzBuffer. + * This buffer will continue to be valid and readable + * until it is released by readZeroBufferFree. Failure to + * release a buffer will lead to a memory leak. + * + * NULL plus an errno code on an error. + * errno = EOPNOTSUPP indicates that we could not do a + * zero-copy read, and there was no ByteBufferPool + * supplied. + */ + struct hadoopRzBuffer* hadoopReadZero(hdfsFile file, + struct hadoopRzOptions *opts, int32_t maxLength); + + /** + * Determine the length of the buffer returned from readZero. + * + * @param buffer a buffer returned from readZero. + * @return the length of the buffer. + */ + int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer); + + /** + * Get a pointer to the raw buffer returned from readZero. + * + * To find out how many bytes this buffer contains, call + * hadoopRzBufferLength. + * + * @param buffer a buffer returned from readZero. + * @return a pointer to the start of the buffer. This will be + * NULL when end-of-file has been reached. + */ + const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer); + + /** + * Release a buffer obtained through readZero. + * + * @param file The hdfs stream that created this buffer. This must be + * the same stream you called hadoopReadZero on. + * @param buffer The buffer to release. + */ + void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer); + #ifdef __cplusplus } #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h index b3ff4f2a637..0eab9a68aea 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h @@ -48,6 +48,15 @@ extern "C" { * @param file The HDFS file */ void hdfsFileDisableDirectRead(struct hdfsFile_internal *file); + + /** + * Disable domain socket security checks. + * + * @param 0 if domain socket security was disabled; + * -1 if not. + */ + int hdfsDisableDomainSocketSecurity(void); + #ifdef __cplusplus } #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c index c768c9c1d04..878289f96d1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c @@ -608,3 +608,73 @@ JNIEnv* getJNIEnv(void) return env; } +int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name) +{ + jclass clazz; + int ret; + + clazz = (*env)->FindClass(env, name); + if (!clazz) { + printPendingExceptionAndFree(env, PRINT_EXC_ALL, + "javaObjectIsOfClass(%s)", name); + return -1; + } + ret = (*env)->IsInstanceOf(env, obj, clazz); + (*env)->DeleteLocalRef(env, clazz); + return ret == JNI_TRUE ? 1 : 0; +} + +jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration, + const char *key, const char *value) +{ + jthrowable jthr; + jstring jkey = NULL, jvalue = NULL; + + jthr = newJavaStr(env, key, &jkey); + if (jthr) + goto done; + jthr = newJavaStr(env, value, &jvalue); + if (jthr) + goto done; + jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration, + "org/apache/hadoop/conf/Configuration", "set", + "(Ljava/lang/String;Ljava/lang/String;)V", + jkey, jvalue); + if (jthr) + goto done; +done: + (*env)->DeleteLocalRef(env, jkey); + (*env)->DeleteLocalRef(env, jvalue); + return jthr; +} + +jthrowable fetchEnumInstance(JNIEnv *env, const char *className, + const char *valueName, jobject *out) +{ + jclass clazz; + jfieldID fieldId; + jobject jEnum; + char prettyClass[256]; + + clazz = (*env)->FindClass(env, className); + if (!clazz) { + return newRuntimeError(env, "fetchEnum(%s, %s): failed to find class.", + className, valueName); + } + if (snprintf(prettyClass, sizeof(prettyClass), "L%s;", className) + >= sizeof(prettyClass)) { + return newRuntimeError(env, "fetchEnum(%s, %s): class name too long.", + className, valueName); + } + fieldId = (*env)->GetStaticFieldID(env, clazz, valueName, prettyClass); + if (!fieldId) { + return getPendingExceptionAndClear(env); + } + jEnum = (*env)->GetStaticObjectField(env, clazz, fieldId); + if (!jEnum) { + return getPendingExceptionAndClear(env); + } + *out = jEnum; + return NULL; +} + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h index f37dea739fc..c09f6a38cbb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.h @@ -114,6 +114,47 @@ jthrowable classNameOfObject(jobject jobj, JNIEnv *env, char **name); * */ JNIEnv* getJNIEnv(void); +/** + * Figure out if a Java object is an instance of a particular class. + * + * @param env The Java environment. + * @param obj The object to check. + * @param name The class name to check. + * + * @return -1 if we failed to find the referenced class name. + * 0 if the object is not of the given class. + * 1 if the object is of the given class. + */ +int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name); + +/** + * Set a value in a configuration object. + * + * @param env The JNI environment + * @param jConfiguration The configuration object to modify + * @param key The key to modify + * @param value The value to set the key to + * + * @return NULL on success; exception otherwise + */ +jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration, + const char *key, const char *value); + +/** + * Fetch an instance of an Enum. + * + * @param env The JNI environment. + * @param className The enum class name. + * @param valueName The name of the enum value + * @param out (out param) on success, a local reference to an + * instance of the enum object. (Since Java enums are + * singletones, this is also the only instance.) + * + * @return NULL on success; exception otherwise + */ +jthrowable fetchEnumInstance(JNIEnv *env, const char *className, + const char *valueName, jobject *out); + #endif /*LIBHDFS_JNI_HELPER_H*/ /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c index a1476ca18f0..77e2f0766d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.c @@ -17,14 +17,19 @@ */ #include "exception.h" +#include "hdfs.h" +#include "hdfs_test.h" #include "jni_helper.h" #include "native_mini_dfs.h" #include #include +#include #include #include #include +#include +#include #define MINIDFS_CLUSTER_BUILDER "org/apache/hadoop/hdfs/MiniDFSCluster$Builder" #define MINIDFS_CLUSTER "org/apache/hadoop/hdfs/MiniDFSCluster" @@ -39,8 +44,44 @@ struct NativeMiniDfsCluster { * The NativeMiniDfsCluster object */ jobject obj; + + /** + * Path to the domain socket, or the empty string if there is none. + */ + char domainSocketPath[PATH_MAX]; }; +static jthrowable nmdConfigureShortCircuit(JNIEnv *env, + struct NativeMiniDfsCluster *cl, jobject cobj) +{ + jthrowable jthr; + char *tmpDir; + + int ret = hdfsDisableDomainSocketSecurity(); + if (ret) { + return newRuntimeError(env, "failed to disable hdfs domain " + "socket security: error %d", ret); + } + jthr = hadoopConfSetStr(env, cobj, "dfs.client.read.shortcircuit", "true"); + if (jthr) { + return jthr; + } + tmpDir = getenv("TMPDIR"); + if (!tmpDir) { + tmpDir = "/tmp"; + } + snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d", + tmpDir, getpid(), rand()); + snprintf(cl->domainSocketPath, PATH_MAX, "%s/native_mini_dfs.sock.%d.%d", + tmpDir, getpid(), rand()); + jthr = hadoopConfSetStr(env, cobj, "dfs.domain.socket.path", + cl->domainSocketPath); + if (jthr) { + return jthr; + } + return NULL; +} + struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) { struct NativeMiniDfsCluster* cl = NULL; @@ -81,6 +122,28 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) goto error; } } + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "nmdCreate: Configuration::setBoolean"); + goto error; + } + // Disable 'minimum block size' -- it's annoying in tests. + (*env)->DeleteLocalRef(env, jconfStr); + jconfStr = NULL; + jthr = newJavaStr(env, "dfs.namenode.fs-limits.min-block-size", &jconfStr); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "nmdCreate: new String"); + goto error; + } + jthr = invokeMethod(env, NULL, INSTANCE, cobj, HADOOP_CONF, + "setLong", "(Ljava/lang/String;J)V", jconfStr, 0LL); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "nmdCreate: Configuration::setLong"); + goto error; + } + // Creae MiniDFSCluster object jthr = constructNewObjectOfClass(env, &bld, MINIDFS_CLUSTER_BUILDER, "(L"HADOOP_CONF";)V", cobj); if (jthr) { @@ -88,6 +151,14 @@ struct NativeMiniDfsCluster* nmdCreate(struct NativeMiniDfsConf *conf) "nmdCreate: NativeMiniDfsCluster#Builder#Builder"); goto error; } + if (conf->configureShortCircuit) { + jthr = nmdConfigureShortCircuit(env, cl, cobj); + if (jthr) { + printExceptionAndFree(env, jthr, PRINT_EXC_ALL, + "nmdCreate: nmdConfigureShortCircuit error"); + goto error; + } + } jthr = invokeMethod(env, &val, INSTANCE, bld, MINIDFS_CLUSTER_BUILDER, "format", "(Z)L" MINIDFS_CLUSTER_BUILDER ";", conf->doFormat); if (jthr) { @@ -272,3 +343,29 @@ error_dlr_nn: return ret; } + +int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl, + struct hdfsBuilder *bld) +{ + int port, ret; + + hdfsBuilderSetNameNode(bld, "localhost"); + port = nmdGetNameNodePort(cl); + if (port < 0) { + fprintf(stderr, "nmdGetNameNodePort failed with error %d\n", -port); + return EIO; + } + hdfsBuilderSetNameNodePort(bld, port); + if (cl->domainSocketPath[0]) { + ret = hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit", "true"); + if (ret) { + return ret; + } + ret = hdfsBuilderConfSetStr(bld, "dfs.domain.socket.path", + cl->domainSocketPath); + if (ret) { + return ret; + } + } + return 0; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h index 6bf29905ad9..41d69c2966a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/native_mini_dfs.h @@ -21,6 +21,7 @@ #include /* for jboolean */ +struct hdfsBuilder; struct NativeMiniDfsCluster; /** @@ -28,17 +29,24 @@ struct NativeMiniDfsCluster; */ struct NativeMiniDfsConf { /** - * Nonzero if the cluster should be formatted prior to startup + * Nonzero if the cluster should be formatted prior to startup. */ jboolean doFormat; + /** * Whether or not to enable webhdfs in MiniDfsCluster */ jboolean webhdfsEnabled; + /** * The http port of the namenode in MiniDfsCluster */ jint namenodeHttpPort; + + /** + * Nonzero if we should configure short circuit. + */ + jboolean configureShortCircuit; }; /** @@ -84,7 +92,7 @@ void nmdFree(struct NativeMiniDfsCluster* cl); * * @return the port, or a negative error code */ -int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); +int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); /** * Get the http address that's in use by the given (non-HA) nativeMiniDfs @@ -101,4 +109,14 @@ int nmdGetNameNodePort(const struct NativeMiniDfsCluster *cl); int nmdGetNameNodeHttpAddress(const struct NativeMiniDfsCluster *cl, int *port, const char **hostName); +/** + * Configure the HDFS builder appropriately to connect to this cluster. + * + * @param bld The hdfs builder + * + * @return the port, or a negative error code + */ +int nmdConfigureHdfsBuilder(struct NativeMiniDfsCluster *cl, + struct hdfsBuilder *bld); + #endif diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c new file mode 100644 index 00000000000..b22fee12964 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/test/test_libhdfs_zerocopy.c @@ -0,0 +1,233 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "expect.h" +#include "hdfs.h" +#include "native_mini_dfs.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define TO_STR_HELPER(X) #X +#define TO_STR(X) TO_STR_HELPER(X) + +#define TEST_FILE_NAME_LENGTH 128 +#define TEST_ZEROCOPY_FULL_BLOCK_SIZE 4096 +#define TEST_ZEROCOPY_LAST_BLOCK_SIZE 3215 +#define TEST_ZEROCOPY_NUM_BLOCKS 6 +#define SMALL_READ_LEN 16 + +#define ZC_BUF_LEN 32768 + +static uint8_t *getZeroCopyBlockData(int blockIdx) +{ + uint8_t *buf = malloc(TEST_ZEROCOPY_FULL_BLOCK_SIZE); + int i; + if (!buf) { + fprintf(stderr, "malloc(%d) failed\n", TEST_ZEROCOPY_FULL_BLOCK_SIZE); + exit(1); + } + for (i = 0; i < TEST_ZEROCOPY_FULL_BLOCK_SIZE; i++) { + buf[i] = blockIdx + (i % 17); + } + return buf; +} + +static int getZeroCopyBlockLen(int blockIdx) +{ + if (blockIdx >= TEST_ZEROCOPY_NUM_BLOCKS) { + return 0; + } else if (blockIdx == (TEST_ZEROCOPY_NUM_BLOCKS - 1)) { + return TEST_ZEROCOPY_LAST_BLOCK_SIZE; + } else { + return TEST_ZEROCOPY_FULL_BLOCK_SIZE; + } +} + +static void printBuf(const uint8_t *buf, size_t len) __attribute__((unused)); + +static void printBuf(const uint8_t *buf, size_t len) +{ + size_t i; + + for (i = 0; i < len; i++) { + fprintf(stderr, "%02x", buf[i]); + } + fprintf(stderr, "\n"); +} + +static int doTestZeroCopyReads(hdfsFS fs, const char *fileName) +{ + hdfsFile file = NULL; + struct hadoopRzOptions *opts = NULL; + struct hadoopRzBuffer *buffer = NULL; + uint8_t *block; + + file = hdfsOpenFile(fs, fileName, O_RDONLY, 0, 0, 0); + EXPECT_NONNULL(file); + opts = hadoopRzOptionsAlloc(); + EXPECT_NONNULL(opts); + EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 1)); + /* haven't read anything yet */ + EXPECT_ZERO(expectFileStats(file, 0LL, 0LL, 0LL, 0LL)); + block = getZeroCopyBlockData(0); + EXPECT_NONNULL(block); + /* first read is half of a block. */ + buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2); + EXPECT_NONNULL(buffer); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, + hadoopRzBufferLength(buffer)); + EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), block, + TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2)); + hadoopRzBufferFree(file, buffer); + /* read the next half of the block */ + buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2); + EXPECT_NONNULL(buffer); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2, + hadoopRzBufferLength(buffer)); + EXPECT_ZERO(memcmp(hadoopRzBufferGet(buffer), + block + (TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2), + TEST_ZEROCOPY_FULL_BLOCK_SIZE / 2)); + hadoopRzBufferFree(file, buffer); + free(block); + EXPECT_ZERO(expectFileStats(file, TEST_ZEROCOPY_FULL_BLOCK_SIZE, + TEST_ZEROCOPY_FULL_BLOCK_SIZE, + TEST_ZEROCOPY_FULL_BLOCK_SIZE, + TEST_ZEROCOPY_FULL_BLOCK_SIZE)); + /* Now let's read just a few bytes. */ + buffer = hadoopReadZero(file, opts, SMALL_READ_LEN); + EXPECT_NONNULL(buffer); + EXPECT_INT_EQ(SMALL_READ_LEN, hadoopRzBufferLength(buffer)); + block = getZeroCopyBlockData(1); + EXPECT_NONNULL(block); + EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer), SMALL_READ_LEN)); + hadoopRzBufferFree(file, buffer); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + hdfsTell(fs, file)); + EXPECT_ZERO(expectFileStats(file, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN)); + + /* Clear 'skip checksums' and test that we can't do zero-copy reads any + * more. Since there is no ByteBufferPool set, we should fail with + * EPROTONOSUPPORT. + */ + EXPECT_ZERO(hadoopRzOptionsSetSkipChecksum(opts, 0)); + EXPECT_NULL(hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE)); + EXPECT_INT_EQ(EPROTONOSUPPORT, errno); + + /* Now set a ByteBufferPool and try again. It should succeed this time. */ + EXPECT_ZERO(hadoopRzOptionsSetByteBufferPool(opts, + ELASTIC_BYTE_BUFFER_POOL_CLASS)); + buffer = hadoopReadZero(file, opts, TEST_ZEROCOPY_FULL_BLOCK_SIZE); + EXPECT_NONNULL(buffer); + EXPECT_INT_EQ(TEST_ZEROCOPY_FULL_BLOCK_SIZE, hadoopRzBufferLength(buffer)); + EXPECT_ZERO(expectFileStats(file, + (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN, + (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN, + (2 * TEST_ZEROCOPY_FULL_BLOCK_SIZE) + SMALL_READ_LEN, + TEST_ZEROCOPY_FULL_BLOCK_SIZE + SMALL_READ_LEN)); + EXPECT_ZERO(memcmp(block + SMALL_READ_LEN, hadoopRzBufferGet(buffer), + TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN)); + free(block); + block = getZeroCopyBlockData(2); + EXPECT_NONNULL(block); + EXPECT_ZERO(memcmp(block, hadoopRzBufferGet(buffer) + + (TEST_ZEROCOPY_FULL_BLOCK_SIZE - SMALL_READ_LEN), SMALL_READ_LEN)); + hadoopRzBufferFree(file, buffer); + free(block); + hadoopRzOptionsFree(opts); + EXPECT_ZERO(hdfsCloseFile(fs, file)); + return 0; +} + +static int createZeroCopyTestFile(hdfsFS fs, char *testFileName, + size_t testFileNameLen) +{ + int blockIdx, blockLen; + hdfsFile file; + uint8_t *data; + + snprintf(testFileName, testFileNameLen, "/zeroCopyTestFile.%d.%d", + getpid(), rand()); + file = hdfsOpenFile(fs, testFileName, O_WRONLY, 0, 1, + TEST_ZEROCOPY_FULL_BLOCK_SIZE); + EXPECT_NONNULL(file); + for (blockIdx = 0; blockIdx < TEST_ZEROCOPY_NUM_BLOCKS; blockIdx++) { + blockLen = getZeroCopyBlockLen(blockIdx); + data = getZeroCopyBlockData(blockIdx); + EXPECT_NONNULL(data); + EXPECT_INT_EQ(blockLen, hdfsWrite(fs, file, data, blockLen)); + } + EXPECT_ZERO(hdfsCloseFile(fs, file)); + return 0; +} + +/** + * Test that we can write a file with libhdfs and then read it back + */ +int main(void) +{ + int port; + struct NativeMiniDfsConf conf = { + .doFormat = 1, + .configureShortCircuit = 1, + }; + char testFileName[TEST_FILE_NAME_LENGTH]; + hdfsFS fs; + struct NativeMiniDfsCluster* cl; + struct hdfsBuilder *bld; + + cl = nmdCreate(&conf); + EXPECT_NONNULL(cl); + EXPECT_ZERO(nmdWaitClusterUp(cl)); + port = nmdGetNameNodePort(cl); + if (port < 0) { + fprintf(stderr, "TEST_ERROR: test_zerocopy: " + "nmdGetNameNodePort returned error %d\n", port); + return EXIT_FAILURE; + } + bld = hdfsNewBuilder(); + EXPECT_NONNULL(bld); + EXPECT_ZERO(nmdConfigureHdfsBuilder(cl, bld)); + hdfsBuilderSetForceNewInstance(bld); + hdfsBuilderConfSetStr(bld, "dfs.block.size", + TO_STR(TEST_ZEROCOPY_FULL_BLOCK_SIZE)); + /* ensure that we'll always get our mmaps */ + hdfsBuilderConfSetStr(bld, "dfs.client.read.shortcircuit.skip.checksum", + "true"); + fs = hdfsBuilderConnect(bld); + EXPECT_NONNULL(fs); + EXPECT_ZERO(createZeroCopyTestFile(fs, testFileName, + TEST_FILE_NAME_LENGTH)); + EXPECT_ZERO(doTestZeroCopyReads(fs, testFileName)); + EXPECT_ZERO(hdfsDisconnect(fs)); + EXPECT_ZERO(nmdShutdown(cl)); + nmdFree(cl); + fprintf(stderr, "TEST_SUCCESS\n"); + return EXIT_SUCCESS; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index f25245e36f7..451e9d6cf7a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1415,4 +1415,32 @@ linearly increases. + + + dfs.client.mmap.cache.size + 1024 + + When zero-copy reads are used, the DFSClient keeps a cache of recently used + memory mapped regions. This parameter controls the maximum number of + entries that we will keep in that cache. + + If this is set to 0, we will not allow mmap. + + The larger this number is, the more file descriptors we will potentially + use for memory-mapped files. mmaped files also use virtual address space. + You may need to increase your ulimit virtual address space limits before + increasing the client mmap cache size. + + + + + dfs.client.mmap.cache.timeout.ms + 900000 + + The minimum length of time that we will keep an mmap entry in the cache + between uses. If an entry is in the cache longer than this, and nobody + uses it, it will be removed by a background thread. + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java new file mode 100644 index 00000000000..c4045c35fb7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -0,0 +1,530 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.fs; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.nio.ByteBuffer; +import java.util.concurrent.TimeoutException; +import java.util.Arrays; +import java.util.EnumSet; +import java.util.Random; + +import org.apache.commons.lang.SystemUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.ClientMmap; +import org.apache.hadoop.hdfs.client.ClientMmapManager; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.io.ByteBufferPool; +import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.base.Preconditions; +import com.google.common.base.Supplier; + +/** + * This class tests if EnhancedByteBufferAccess works correctly. + */ +public class TestEnhancedByteBufferAccess { + private static final Log LOG = + LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName()); + + static TemporarySocketDirectory sockDir; + + @BeforeClass + public static void init() { + sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + } + + private static byte[] byteBufferToArray(ByteBuffer buf) { + byte resultArray[] = new byte[buf.remaining()]; + buf.get(resultArray); + buf.flip(); + return resultArray; + } + + public static HdfsConfiguration initZeroCopyTest() { + Assume.assumeTrue(NativeIO.isAvailable()); + Assume.assumeTrue(SystemUtils.IS_OS_UNIX); + HdfsConfiguration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); + conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3); + conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100); + conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, + new File(sockDir.getDir(), + "TestRequestMmapAccess._PORT.sock").getAbsolutePath()); + conf.setBoolean(DFSConfigKeys. + DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true); + return conf; + } + + @Test + public void testZeroCopyReads() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + FSDataInputStream fsIn = null; + final int TEST_FILE_LENGTH = 12345; + + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, 7567L); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + ByteBuffer result = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.assertEquals(4096, result.remaining()); + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), + byteBufferToArray(result)); + fsIn.releaseBuffer(result); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + @Test + public void testShortZeroCopyReads() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + FSDataInputStream fsIn = null; + final int TEST_FILE_LENGTH = 12345; + + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + + // Try to read 8192, but only get 4096 because of the block size. + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + ByteBuffer result = + dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.assertEquals(4096, result.remaining()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), + byteBufferToArray(result)); + dfsIn.releaseBuffer(result); + + // Try to read 4097, but only get 4096 because of the block size. + result = + dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.assertEquals(4096, result.remaining()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192), + byteBufferToArray(result)); + dfsIn.releaseBuffer(result); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + @Test + public void testZeroCopyReadsNoFallback() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + FSDataInputStream fsIn = null; + final int TEST_FILE_LENGTH = 12345; + + FileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, 7567L); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn; + ByteBuffer result; + try { + result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class)); + Assert.fail("expected UnsupportedOperationException"); + } catch (UnsupportedOperationException e) { + // expected + } + result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.assertEquals(4096, result.remaining()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalBytesRead()); + Assert.assertEquals(4096, + dfsIn.getReadStatistics().getTotalZeroCopyBytesRead()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096), + byteBufferToArray(result)); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + private static class CountingVisitor + implements ClientMmapManager.ClientMmapVisitor { + int count = 0; + + @Override + public void accept(ClientMmap mmap) { + count++; + } + + public void reset() { + count = 0; + } + } + + @Test + public void testZeroCopyMmapCache() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + final int TEST_FILE_LENGTH = 16385; + final int RANDOM_SEED = 23453; + FSDataInputStream fsIn = null; + ByteBuffer results[] = { null, null, null, null, null }; + + DistributedFileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + final ClientMmapManager mmapManager = fs.getClient().getMmapManager(); + final CountingVisitor countingVisitor = new CountingVisitor(); + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + mmapManager.visitEvictable(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + results[0] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + fsIn.seek(0); + results[1] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(1, countingVisitor.count); + countingVisitor.reset(); + mmapManager.visitEvictable(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + countingVisitor.reset(); + + // The mmaps should be of the first block of the file. + final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); + mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() { + @Override + public void accept(ClientMmap mmap) { + Assert.assertEquals(firstBlock, mmap.getBlock()); + } + }); + + // Read more blocks. + results[2] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + results[3] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + try { + results[4] = fsIn.read(null, 4096, + EnumSet.of(ReadOption.SKIP_CHECKSUMS)); + Assert.fail("expected UnsupportedOperationException"); + } catch (UnsupportedOperationException e) { + // expected + } + + // we should have 3 mmaps, 0 evictable + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(3, countingVisitor.count); + countingVisitor.reset(); + mmapManager.visitEvictable(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + + // After we close the cursors, the mmaps should be evictable for + // a brief period of time. Then, they should be closed (we're + // using a very quick timeout) + for (ByteBuffer buffer : results) { + if (buffer != null) { + fsIn.releaseBuffer(buffer); + } + } + GenericTestUtils.waitFor(new Supplier() { + public Boolean get() { + countingVisitor.reset(); + try { + mmapManager.visitEvictable(countingVisitor); + } catch (InterruptedException e) { + e.printStackTrace(); + return false; + } + return (0 == countingVisitor.count); + } + }, 10, 10000); + countingVisitor.reset(); + mmapManager.visitMmaps(countingVisitor); + Assert.assertEquals(0, countingVisitor.count); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + /** + * Test HDFS fallback reads. HDFS streams support the ByteBufferReadable + * interface. + */ + @Test + public void testHdfsFallbackReads() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + final int TEST_FILE_LENGTH = 16385; + final int RANDOM_SEED = 23453; + FSDataInputStream fsIn = null; + + DistributedFileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + testFallbackImpl(fsIn, original); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + private static class RestrictedAllocatingByteBufferPool + implements ByteBufferPool { + private final boolean direct; + + RestrictedAllocatingByteBufferPool(boolean direct) { + this.direct = direct; + } + @Override + public ByteBuffer getBuffer(boolean direct, int length) { + Preconditions.checkArgument(this.direct == direct); + return direct ? ByteBuffer.allocateDirect(length) : + ByteBuffer.allocate(length); + } + @Override + public void putBuffer(ByteBuffer buffer) { + } + } + + private static void testFallbackImpl(InputStream stream, + byte original[]) throws Exception { + RestrictedAllocatingByteBufferPool bufferPool = + new RestrictedAllocatingByteBufferPool( + stream instanceof ByteBufferReadable); + + ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10); + Assert.assertEquals(10, result.remaining()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10), + byteBufferToArray(result)); + + result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000); + Assert.assertEquals(5000, result.remaining()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010), + byteBufferToArray(result)); + + result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999); + Assert.assertEquals(11375, result.remaining()); + Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385), + byteBufferToArray(result)); + + result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10); + Assert.assertNull(result); + } + + /** + * Test the {@link ByteBufferUtil#fallbackRead} function directly. + */ + @Test + public void testFallbackRead() throws Exception { + HdfsConfiguration conf = initZeroCopyTest(); + MiniDFSCluster cluster = null; + final Path TEST_PATH = new Path("/a"); + final int TEST_FILE_LENGTH = 16385; + final int RANDOM_SEED = 23453; + FSDataInputStream fsIn = null; + + DistributedFileSystem fs = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, RANDOM_SEED); + try { + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + } catch (InterruptedException e) { + Assert.fail("unexpected InterruptedException during " + + "waitReplication: " + e); + } catch (TimeoutException e) { + Assert.fail("unexpected TimeoutException during " + + "waitReplication: " + e); + } + fsIn = fs.open(TEST_PATH); + byte original[] = new byte[TEST_FILE_LENGTH]; + IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH); + fsIn.close(); + fsIn = fs.open(TEST_PATH); + testFallbackImpl(fsIn, original); + } finally { + if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); + if (cluster != null) cluster.shutdown(); + } + } + + /** + * Test fallback reads on a stream which does not support the + * ByteBufferReadable * interface. + */ + @Test + public void testIndirectFallbackReads() throws Exception { + final File TEST_DIR = new File( + System.getProperty("test.build.data","build/test/data")); + final String TEST_PATH = TEST_DIR + File.separator + + "indirectFallbackTestFile"; + final int TEST_FILE_LENGTH = 16385; + final int RANDOM_SEED = 23453; + FileOutputStream fos = null; + FileInputStream fis = null; + try { + fos = new FileOutputStream(TEST_PATH); + Random random = new Random(RANDOM_SEED); + byte original[] = new byte[TEST_FILE_LENGTH]; + random.nextBytes(original); + fos.write(original); + fos.close(); + fos = null; + fis = new FileInputStream(TEST_PATH); + testFallbackImpl(fis, original); + } finally { + IOUtils.cleanup(LOG, fos, fis); + new File(TEST_PATH).delete(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java index 057b79fd114..57f5ce979ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java @@ -25,7 +25,6 @@ import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; -import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -36,11 +35,26 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; +import org.junit.BeforeClass; import org.junit.Test; public class TestBlockReaderLocal { + private static TemporarySocketDirectory sockDir; + + @BeforeClass + public static void init() { + sockDir = new TemporarySocketDirectory(); + DomainSocket.disableBindPathValidation(); + } + + @AfterClass + public static void shutdown() throws IOException { + sockDir.close(); + } + public static void assertArrayRegionsEqual(byte []buf1, int off1, byte []buf2, int off2, int len) { for (int i = 0; i < len; i++) { @@ -100,10 +114,11 @@ public class TestBlockReaderLocal { FSDataInputStream fsIn = null; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + FileSystem fs = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); + fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, TEST_PATH, BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); try { @@ -138,6 +153,7 @@ public class TestBlockReaderLocal { test.doTest(blockReaderLocal, original); } finally { if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); if (cluster != null) cluster.shutdown(); if (dataIn != null) dataIn.close(); if (checkIn != null) checkIn.close(); @@ -382,10 +398,11 @@ public class TestBlockReaderLocal { final long RANDOM_SEED = 4567L; FSDataInputStream fsIn = null; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; + FileSystem fs = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); - FileSystem fs = cluster.getFileSystem(); + fs = cluster.getFileSystem(); DFSTestUtil.createFile(fs, TEST_PATH, BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); try { @@ -417,6 +434,7 @@ public class TestBlockReaderLocal { } finally { DFSInputStream.tcpReadsDisabledForTesting = false; if (fsIn != null) fsIn.close(); + if (fs != null) fs.close(); if (cluster != null) cluster.shutdown(); if (sockDir != null) sockDir.close(); }