diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java new file mode 100644 index 00000000000..720e6a14158 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessor.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** + * The public API for ReplicaAccessor objects. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public abstract class ReplicaAccessor { + /** + * Read bytes from the replica. + * + * @param pos The position in the replica to start reading at. + * Must not be negative. + * @param buf The byte array to read into. + * @param off The offset within buf to start reading into. + * @param len The maximum length to read. + * + * @return The number of bytes read. If the read extends past the end + * of the replica, a short read count will be returned. We + * will never return a negative number. We will never + * return a short read count unless EOF is reached. + */ + public abstract int read(long pos, byte[] buf, int off, int len) + throws IOException; + + /** + * Read bytes from the replica. + * + * @param pos The position in the replica to start reading at. + * Must not be negative. + * @param buf The byte buffer to read into. The amount to read will be + * dictated by the remaining bytes between the current + * position and the limit. The ByteBuffer may or may not be + * direct. + * + * @return The number of bytes read. If the read extends past the end + * of the replica, a short read count will be returned. We + * will never return a negative number. We will never return + * a short read count unless EOF is reached. + */ + public abstract int read(long pos, ByteBuffer buf) throws IOException; + + /** + * Release the resources associated with the ReplicaAccessor. + * + * It is recommended that implementations never throw an IOException. The + * method is declared as throwing IOException in order to remain compatible + * with java.io.Closeable. If an exception is thrown, the ReplicaAccessor + * must still be closed when the function returns in order to prevent a + * resource leak. + */ + public abstract void close() throws IOException; + + /** + * Return true if bytes read via this accessor should count towards the + * local byte count statistics. + */ + public abstract boolean isLocal(); + + /** + * Return true if bytes read via this accessor should count towards the + * short-circuit byte count statistics. + */ + public abstract boolean isShortCircuit(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessorBuilder.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessorBuilder.java new file mode 100644 index 00000000000..2905df12457 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReplicaAccessorBuilder.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; + +/** + * The public API for creating a new ReplicaAccessor. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public abstract class ReplicaAccessorBuilder { + /** + * Set the file name which is being opened. Provided for debugging purposes. + */ + public abstract ReplicaAccessorBuilder setFileName(String fileName); + + /** Set the block ID and block pool ID which are being opened. */ + public abstract ReplicaAccessorBuilder + setBlock(long blockId, String blockPoolId); + + /** + * Set whether checksums must be verified. Checksums should be skipped if + * the user has disabled checksum verification in the configuration. Users + * may wish to do this if their software does checksum verification at a + * higher level than HDFS. + */ + public abstract ReplicaAccessorBuilder + setVerifyChecksum(boolean verifyChecksum); + + /** Set the name of the HDFS client. Provided for debugging purposes. */ + public abstract ReplicaAccessorBuilder setClientName(String clientName); + + /** + * Set whether short-circuit is enabled. Short-circuit may be disabled if + * the user has set dfs.client.read.shortcircuit to false, or if the block + * being read is under construction. The fact that this bit is enabled does + * not mean that the user has permission to do short-circuit reads or to + * access the replica-- that must be checked separately by the + * ReplicaAccessorBuilder implementation. + */ + public abstract ReplicaAccessorBuilder + setAllowShortCircuitReads(boolean allowShortCircuit); + + /** + * Set the length of the replica which is visible to this client. If bytes + * are added later, they will not be visible to the ReplicaAccessor we are + * building. In order to see more of the replica, the client must re-open + * this HDFS file. The visible length provides an upper bound, but not a + * lower one. If the replica is deleted or truncated, fewer bytes may be + * visible than specified here. + */ + public abstract ReplicaAccessorBuilder setVisibleLength(long visibleLength); + + /** + * Set the configuration to use. ReplicaAccessorBuilder subclasses should + * define their own configuration prefix. For example, the foobar plugin + * could look for configuration keys like foo.bar.parameter1, + * foo.bar.parameter2. + */ + public abstract ReplicaAccessorBuilder setConfiguration(Configuration conf); + + /** + * Set the block access token to use. + */ + public abstract ReplicaAccessorBuilder setBlockAccessToken(byte[] token); + + /** + * Build a new ReplicaAccessor. + * + * The implementation must perform any necessary access checks before + * constructing the ReplicaAccessor. If there is a hardware-level or + * network-level setup operation that could fail, it should be done here. If + * the implementation returns a ReplicaAccessor, we will assume that it works + * and not attempt to construct a normal BlockReader. + * + * If the ReplicaAccessor could not be built, implementations may wish to log + * a message at TRACE level indicating why. + * + * @return null if the ReplicaAccessor could not be built; the + * ReplicaAccessor otherwise. + */ + public abstract ReplicaAccessor build(); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index 557ec5a64ca..34b964e179b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -117,6 +117,9 @@ public interface HdfsClientConfigKeys { "dfs.datanode.hdfs-blocks-metadata.enabled"; boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; + static final String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY = + PREFIX + "replica.accessor.builder.classes"; + /** dfs.client.retry configuration properties */ interface Retry { String PREFIX = HdfsClientConfigKeys.PREFIX + "retry."; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index d67af0eef3e..792d858226f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.Options.ChecksumOpt; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.ReplicaAccessorBuilder; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.util.ByteArrayManager; import org.apache.hadoop.ipc.Client; @@ -78,6 +79,11 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write; +import java.lang.Class; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + /** * DFSClient configuration. */ @@ -124,6 +130,8 @@ public class DfsClientConf { private final long hedgedReadThresholdMillis; private final int hedgedReadThreadpoolSize; + private final List> + replicaAccessorBuilderClasses; public DfsClientConf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout @@ -238,8 +246,35 @@ public DfsClientConf(Configuration conf) { HedgedRead.THRESHOLD_MILLIS_KEY, HedgedRead.THRESHOLD_MILLIS_DEFAULT); hedgedReadThreadpoolSize = conf.getInt( - HedgedRead.THREADPOOL_SIZE_KEY, - HedgedRead.THREADPOOL_SIZE_DEFAULT); + HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, + HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); + + replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf); + } + + @SuppressWarnings("unchecked") + private List> + loadReplicaAccessorBuilderClasses(Configuration conf) + { + String classNames[] = conf.getTrimmedStrings( + HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY); + if (classNames.length == 0) { + return Collections.emptyList(); + } + ArrayList> classes = + new ArrayList>(); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + for (String className: classNames) { + try { + Class cls = + (Class) + classLoader.loadClass(className); + classes.add(cls); + } catch (Throwable t) { + LOG.warn("Unable to load " + className, t); + } + } + return classes; } private DataChecksum.Type getChecksumType(Configuration conf) { @@ -516,6 +551,14 @@ public int getHedgedReadThreadpoolSize() { return hedgedReadThreadpoolSize; } + /** + * @return the replicaAccessorBuilderClasses + */ + public List> + getReplicaAccessorBuilderClasses() { + return replicaAccessorBuilderClasses; + } + /** * @return the shortCircuitConf */ @@ -766,4 +809,4 @@ public String confAsString() { return builder.toString(); } } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto index 5071d15a8df..a091d417d7c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/datatransfer.proto @@ -302,3 +302,7 @@ message OpBlockChecksumResponseProto { required bytes md5 = 3; optional ChecksumTypeProto crcType = 4; } + +message OpCustomProto { + required string customId = 1; +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index 96044e9e1ef..8517173ded0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -24,8 +24,12 @@ import java.io.DataOutputStream; import java.io.FileInputStream; import java.io.IOException; +import java.lang.reflect.Constructor; import java.net.InetSocketAddress; +import java.util.List; +import com.google.common.io.ByteArrayDataOutput; +import com.google.common.io.ByteStreams; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -52,6 +56,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; +import org.apache.hadoop.io.ByteWritable; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.net.unix.DomainSocket; @@ -326,6 +331,10 @@ public BlockReader build() throws IOException { BlockReader reader = null; Preconditions.checkNotNull(configuration); + reader = tryToCreateExternalBlockReader(); + if (reader != null) { + return reader; + } final ShortCircuitConf scConf = conf.getShortCircuitConf(); if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) { if (clientContext.getUseLegacyBlockReaderLocal()) { @@ -362,6 +371,45 @@ public BlockReader build() throws IOException { return getRemoteBlockReaderFromTcp(); } + private BlockReader tryToCreateExternalBlockReader() { + List> clses = + conf.getReplicaAccessorBuilderClasses(); + for (Class cls : clses) { + try { + ByteArrayDataOutput bado = ByteStreams.newDataOutput(); + token.write(bado); + byte tokenBytes[] = bado.toByteArray(); + + Constructor ctor = + cls.getConstructor(); + ReplicaAccessorBuilder builder = ctor.newInstance(); + ReplicaAccessor accessor = builder. + setAllowShortCircuitReads(allowShortCircuitLocalReads). + setBlock(block.getBlockId(), block.getBlockPoolId()). + setBlockAccessToken(tokenBytes). + setClientName(clientName). + setConfiguration(configuration). + setFileName(fileName). + setVerifyChecksum(verifyChecksum). + setVisibleLength(length). + build(); + if (accessor == null) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": No ReplicaAccessor created by " + + cls.getName()); + } + } else { + return new ExternalBlockReader(accessor, length, startOffset); + } + } catch (Throwable t) { + LOG.warn("Failed to construct new object of type " + + cls.getName(), t); + } + } + return null; + } + + /** * Get {@link BlockReaderLocalLegacy} for short circuited local reads. * This block reader implements the path-based style of local reads diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java new file mode 100644 index 00000000000..e135d8e41a1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExternalBlockReader.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.ReadOption; +import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; + +/** + * An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from + * replicas. + */ +@InterfaceAudience.Private +public final class ExternalBlockReader implements BlockReader { + private final ReplicaAccessor accessor; + private final long visibleLength; + private long pos; + + ExternalBlockReader(ReplicaAccessor accessor, long visibleLength, + long startOffset) { + this.accessor = accessor; + this.visibleLength = visibleLength; + this.pos = startOffset; + } + + @Override + public int read(byte[] buf, int off, int len) throws IOException { + int nread = accessor.read(pos, buf, off, len); + pos += nread; + return nread; + } + + @Override + public int read(ByteBuffer buf) throws IOException { + int nread = accessor.read(pos, buf); + pos += nread; + return nread; + } + + @Override + public long skip(long n) throws IOException { + // You cannot skip backwards + if (n <= 0) { + return 0; + } + // You can't skip past the end of the replica. + long oldPos = pos; + pos += n; + if (pos > visibleLength) { + pos = visibleLength; + } + return pos - oldPos; + } + + @Override + public int available() throws IOException { + // We return the amount of bytes that we haven't read yet from the + // replica, based on our current position. Some of the other block + // readers return a shorter length than that. The only advantage to + // returning a shorter length is that the DFSInputStream will + // trash your block reader and create a new one if someone tries to + // seek() beyond the available() region. + long diff = visibleLength - pos; + if (diff > Integer.MAX_VALUE) { + return Integer.MAX_VALUE; + } else { + return (int)diff; + } + } + + @Override + public void close() throws IOException { + accessor.close(); + } + + @Override + public void readFully(byte[] buf, int offset, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, offset, len); + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public boolean isLocal() { + return accessor.isLocal(); + } + + @Override + public boolean isShortCircuit() { + return accessor.isShortCircuit(); + } + + @Override + public ClientMmap getClientMmap(EnumSet opts) { + // For now, pluggable ReplicaAccessors do not support zero-copy. + return null; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java index cf8addf1727..30774980225 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -37,7 +37,8 @@ public enum Op { TRANSFER_BLOCK((byte)86), REQUEST_SHORT_CIRCUIT_FDS((byte)87), RELEASE_SHORT_CIRCUIT_FDS((byte)88), - REQUEST_SHORT_CIRCUIT_SHM((byte)89); + REQUEST_SHORT_CIRCUIT_SHM((byte)89), + CUSTOM((byte)127); /** The code for this operation. */ public final byte code; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index a6fbb29ece2..d435543f650 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCustomProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java new file mode 100644 index 00000000000..48d337b6f2b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestExternalBlockReader.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import com.google.common.primitives.Ints; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.client.HdfsDataInputStream; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.io.IOUtils; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.BufferOverflowException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.UUID; + +public class TestExternalBlockReader { + private static final Log LOG = LogFactory.getLog(TestExternalBlockReader.class); + + private static long SEED = 1234; + + @Test + public void testMisconfiguredExternalBlockReader() throws Exception { + Configuration conf = new Configuration(); + conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY, + "org.apache.hadoop.hdfs.NonExistentReplicaAccessorBuilderClass"); + conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .build(); + final int TEST_LENGTH = 2048; + DistributedFileSystem dfs = cluster.getFileSystem(); + try { + DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED); + FSDataInputStream stream = dfs.open(new Path("/a")); + byte buf[] = new byte[TEST_LENGTH]; + IOUtils.readFully(stream, buf, 0, TEST_LENGTH); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_LENGTH); + Assert.assertArrayEquals(expected, buf); + stream.close(); + } finally { + dfs.close(); + cluster.shutdown(); + } + } + + private static final String SYNTHETIC_BLOCK_READER_TEST_UUID_KEY = + "synthetic.block.reader.test.uuid.key"; + + private static final HashMap> + accessors = new HashMap>(1); + + public static class SyntheticReplicaAccessorBuilder + extends ReplicaAccessorBuilder { + String fileName; + long blockId; + String blockPoolId; + boolean verifyChecksum; + String clientName; + boolean allowShortCircuit; + long visibleLength; + Configuration conf; + + @Override + public ReplicaAccessorBuilder setFileName(String fileName) { + this.fileName = fileName; + return this; + } + + @Override + public ReplicaAccessorBuilder setBlock(long blockId, String blockPoolId) { + this.blockId = blockId; + this.blockPoolId = blockPoolId; + return this; + } + + @Override + public ReplicaAccessorBuilder setVerifyChecksum(boolean verifyChecksum) { + this.verifyChecksum = verifyChecksum; + return this; + } + + @Override + public ReplicaAccessorBuilder setClientName(String clientName) { + this.clientName = clientName; + return this; + } + + @Override + public ReplicaAccessorBuilder setAllowShortCircuitReads(boolean allowShortCircuit) { + this.allowShortCircuit = allowShortCircuit; + return this; + } + + @Override + public ReplicaAccessorBuilder setVisibleLength(long visibleLength) { + this.visibleLength = visibleLength; + return this; + } + + @Override + public ReplicaAccessorBuilder setConfiguration(Configuration conf) { + this.conf = conf; + return this; + } + + @Override + public ReplicaAccessorBuilder setBlockAccessToken(byte[] token) { + return this; + } + + @Override + public ReplicaAccessor build() { + if (visibleLength < 1024) { + LOG.info("SyntheticReplicaAccessorFactory returning null for a " + + "smaller replica with size " + visibleLength); //trace + return null; + } + return new SyntheticReplicaAccessor(this); + } + } + + public static class SyntheticReplicaAccessor extends ReplicaAccessor { + final long length; + final byte contents[]; + final SyntheticReplicaAccessorBuilder builder; + long totalRead = 0; + int numCloses = 0; + String error = ""; + String prefix = ""; + + SyntheticReplicaAccessor(SyntheticReplicaAccessorBuilder builder) { + this.length = builder.visibleLength; + this.contents = DFSTestUtil. + calculateFileContentsFromSeed(SEED, Ints.checkedCast(length)); + this.builder = builder; + String uuid = this.builder.conf. + get(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY); + LinkedList accessorsList = + accessors.get(uuid); + if (accessorsList == null) { + accessorsList = new LinkedList(); + } + accessorsList.add(this); + accessors.put(uuid, accessorsList); + } + + @Override + public synchronized int read(long pos, byte[] buf, int off, int len) + throws IOException { + if (pos > Integer.MAX_VALUE) { + return 0; + } else if (pos < 0) { + addError("Attempted to read from a location that was less " + + "than 0 at " + pos); + return 0; + } + int i = 0, nread = 0; + for (int ipos = (int)pos; + (ipos < contents.length) && (nread < len); + ipos++) { + buf[i++] = contents[ipos]; + nread++; + totalRead++; + LOG.info("ipos = " + ipos + ", contents.length = " + contents.length + ", nread = " + nread + ", len = " + len); + } + return nread; + } + + @Override + public synchronized int read(long pos, ByteBuffer buf) throws IOException { + if (pos > Integer.MAX_VALUE) { + return 0; + } else if (pos < 0) { + addError("Attempted to read from a location that was less " + + "than 0 at " + pos); + return 0; + } + int i = 0, nread = 0; + for (int ipos = (int)pos; + ipos < contents.length; ipos++) { + try { + buf.put(contents[ipos]); + } catch (BufferOverflowException bos) { + break; + } + nread++; + totalRead++; + } + return nread; + } + + @Override + public synchronized void close() throws IOException { + numCloses++; + } + + @Override + public boolean isLocal() { + return true; + } + + @Override + public boolean isShortCircuit() { + return true; + } + + synchronized String getError() { + return error; + } + + synchronized void addError(String text) { + LOG.error("SyntheticReplicaAccessor error: " + text); + error = error + prefix + text; + prefix = "; "; + } + } + + @Test + public void testExternalBlockReader() throws Exception { + Configuration conf = new Configuration(); + conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY, + SyntheticReplicaAccessorBuilder.class.getName()); + conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0); + String uuid = UUID.randomUUID().toString(); + conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .build(); + final int TEST_LENGTH = 2047; + DistributedFileSystem dfs = cluster.getFileSystem(); + try { + DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED); + HdfsDataInputStream stream = + (HdfsDataInputStream)dfs.open(new Path("/a")); + byte buf[] = new byte[TEST_LENGTH]; + IOUtils.readFully(stream, buf, 0, TEST_LENGTH); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_LENGTH); + ReadStatistics stats = stream.getReadStatistics(); + Assert.assertEquals(1024, stats.getTotalShortCircuitBytesRead()); + Assert.assertEquals(2047, stats.getTotalLocalBytesRead()); + Assert.assertEquals(2047, stats.getTotalBytesRead()); + Assert.assertArrayEquals(expected, buf); + stream.close(); + ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, new Path("/a")); + Assert.assertNotNull(block); + LinkedList accessorList = accessors.get(uuid); + Assert.assertNotNull(accessorList); + Assert.assertEquals(2, accessorList.size()); + SyntheticReplicaAccessor accessor = accessorList.get(0); + Assert.assertTrue(accessor.builder.allowShortCircuit); + Assert.assertEquals(block.getBlockPoolId(), + accessor.builder.blockPoolId); + Assert.assertEquals(block.getBlockId(), + accessor.builder.blockId); + Assert.assertEquals(dfs.getClient().clientName, + accessor.builder.clientName); + Assert.assertEquals("/a", accessor.builder.fileName); + Assert.assertTrue(accessor.builder.verifyChecksum); + Assert.assertEquals(1024L, accessor.builder.visibleLength); + Assert.assertEquals(1024L, accessor.totalRead); + Assert.assertEquals("", accessor.getError()); + Assert.assertEquals(1, accessor.numCloses); + accessors.remove(uuid); + } finally { + dfs.close(); + cluster.shutdown(); + } + } +}