HDFS-8924. Add pluggable interface for reading replicas in DFSClient. (Colin Patrick McCabe via Lei Xu)

(cherry picked from 7087e700e0)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java
This commit is contained in:
Lei Xu 2015-08-21 17:02:00 -07:00
parent a7e5a88b59
commit 3a4d614f68
10 changed files with 711 additions and 4 deletions

View File

@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
import java.nio.ByteBuffer;
/**
* The public API for ReplicaAccessor objects.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class ReplicaAccessor {
/**
* Read bytes from the replica.
*
* @param pos The position in the replica to start reading at.
* Must not be negative.
* @param buf The byte array to read into.
* @param off The offset within buf to start reading into.
* @param len The maximum length to read.
*
* @return The number of bytes read. If the read extends past the end
* of the replica, a short read count will be returned. We
* will never return a negative number. We will never
* return a short read count unless EOF is reached.
*/
public abstract int read(long pos, byte[] buf, int off, int len)
throws IOException;
/**
* Read bytes from the replica.
*
* @param pos The position in the replica to start reading at.
* Must not be negative.
* @param buf The byte buffer to read into. The amount to read will be
* dictated by the remaining bytes between the current
* position and the limit. The ByteBuffer may or may not be
* direct.
*
* @return The number of bytes read. If the read extends past the end
* of the replica, a short read count will be returned. We
* will never return a negative number. We will never return
* a short read count unless EOF is reached.
*/
public abstract int read(long pos, ByteBuffer buf) throws IOException;
/**
* Release the resources associated with the ReplicaAccessor.
*
* It is recommended that implementations never throw an IOException. The
* method is declared as throwing IOException in order to remain compatible
* with java.io.Closeable. If an exception is thrown, the ReplicaAccessor
* must still be closed when the function returns in order to prevent a
* resource leak.
*/
public abstract void close() throws IOException;
/**
* Return true if bytes read via this accessor should count towards the
* local byte count statistics.
*/
public abstract boolean isLocal();
/**
* Return true if bytes read via this accessor should count towards the
* short-circuit byte count statistics.
*/
public abstract boolean isShortCircuit();
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
/**
* The public API for creating a new ReplicaAccessor.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class ReplicaAccessorBuilder {
/**
* Set the file name which is being opened. Provided for debugging purposes.
*/
public abstract ReplicaAccessorBuilder setFileName(String fileName);
/** Set the block ID and block pool ID which are being opened. */
public abstract ReplicaAccessorBuilder
setBlock(long blockId, String blockPoolId);
/**
* Set whether checksums must be verified. Checksums should be skipped if
* the user has disabled checksum verification in the configuration. Users
* may wish to do this if their software does checksum verification at a
* higher level than HDFS.
*/
public abstract ReplicaAccessorBuilder
setVerifyChecksum(boolean verifyChecksum);
/** Set the name of the HDFS client. Provided for debugging purposes. */
public abstract ReplicaAccessorBuilder setClientName(String clientName);
/**
* Set whether short-circuit is enabled. Short-circuit may be disabled if
* the user has set dfs.client.read.shortcircuit to false, or if the block
* being read is under construction. The fact that this bit is enabled does
* not mean that the user has permission to do short-circuit reads or to
* access the replica-- that must be checked separately by the
* ReplicaAccessorBuilder implementation.
*/
public abstract ReplicaAccessorBuilder
setAllowShortCircuitReads(boolean allowShortCircuit);
/**
* Set the length of the replica which is visible to this client. If bytes
* are added later, they will not be visible to the ReplicaAccessor we are
* building. In order to see more of the replica, the client must re-open
* this HDFS file. The visible length provides an upper bound, but not a
* lower one. If the replica is deleted or truncated, fewer bytes may be
* visible than specified here.
*/
public abstract ReplicaAccessorBuilder setVisibleLength(long visibleLength);
/**
* Set the configuration to use. ReplicaAccessorBuilder subclasses should
* define their own configuration prefix. For example, the foobar plugin
* could look for configuration keys like foo.bar.parameter1,
* foo.bar.parameter2.
*/
public abstract ReplicaAccessorBuilder setConfiguration(Configuration conf);
/**
* Set the block access token to use.
*/
public abstract ReplicaAccessorBuilder setBlockAccessToken(byte[] token);
/**
* Build a new ReplicaAccessor.
*
* The implementation must perform any necessary access checks before
* constructing the ReplicaAccessor. If there is a hardware-level or
* network-level setup operation that could fail, it should be done here. If
* the implementation returns a ReplicaAccessor, we will assume that it works
* and not attempt to construct a normal BlockReader.
*
* If the ReplicaAccessor could not be built, implementations may wish to log
* a message at TRACE level indicating why.
*
* @return null if the ReplicaAccessor could not be built; the
* ReplicaAccessor otherwise.
*/
public abstract ReplicaAccessor build();
}

View File

@ -117,6 +117,9 @@ public interface HdfsClientConfigKeys {
"dfs.datanode.hdfs-blocks-metadata.enabled";
boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
static final String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
PREFIX + "replica.accessor.builder.classes";
/** dfs.client.retry configuration properties */
interface Retry {
String PREFIX = HdfsClientConfigKeys.PREFIX + "retry.";

View File

@ -24,6 +24,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.util.ByteArrayManager;
import org.apache.hadoop.ipc.Client;
@ -78,6 +79,11 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
import java.lang.Class;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* DFSClient configuration.
*/
@ -124,6 +130,8 @@ public class DfsClientConf {
private final long hedgedReadThresholdMillis;
private final int hedgedReadThreadpoolSize;
private final List<Class<? extends ReplicaAccessorBuilder>>
replicaAccessorBuilderClasses;
public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
@ -238,8 +246,35 @@ public class DfsClientConf {
HedgedRead.THRESHOLD_MILLIS_KEY,
HedgedRead.THRESHOLD_MILLIS_DEFAULT);
hedgedReadThreadpoolSize = conf.getInt(
HedgedRead.THREADPOOL_SIZE_KEY,
HedgedRead.THREADPOOL_SIZE_DEFAULT);
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);
}
@SuppressWarnings("unchecked")
private List<Class<? extends ReplicaAccessorBuilder>>
loadReplicaAccessorBuilderClasses(Configuration conf)
{
String classNames[] = conf.getTrimmedStrings(
HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY);
if (classNames.length == 0) {
return Collections.emptyList();
}
ArrayList<Class<? extends ReplicaAccessorBuilder>> classes =
new ArrayList<Class<? extends ReplicaAccessorBuilder>>();
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
for (String className: classNames) {
try {
Class<? extends ReplicaAccessorBuilder> cls =
(Class<? extends ReplicaAccessorBuilder>)
classLoader.loadClass(className);
classes.add(cls);
} catch (Throwable t) {
LOG.warn("Unable to load " + className, t);
}
}
return classes;
}
private DataChecksum.Type getChecksumType(Configuration conf) {
@ -516,6 +551,14 @@ public class DfsClientConf {
return hedgedReadThreadpoolSize;
}
/**
* @return the replicaAccessorBuilderClasses
*/
public List<Class<? extends ReplicaAccessorBuilder>>
getReplicaAccessorBuilderClasses() {
return replicaAccessorBuilderClasses;
}
/**
* @return the shortCircuitConf
*/
@ -766,4 +809,4 @@ public class DfsClientConf {
return builder.toString();
}
}
}
}

View File

@ -302,3 +302,7 @@ message OpBlockChecksumResponseProto {
required bytes md5 = 3;
optional ChecksumTypeProto crcType = 4;
}
message OpCustomProto {
required string customId = 1;
}

View File

@ -24,8 +24,12 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.List;
import com.google.common.io.ByteArrayDataOutput;
import com.google.common.io.ByteStreams;
import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -52,6 +56,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.io.ByteWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.unix.DomainSocket;
@ -326,6 +331,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
BlockReader reader = null;
Preconditions.checkNotNull(configuration);
reader = tryToCreateExternalBlockReader();
if (reader != null) {
return reader;
}
final ShortCircuitConf scConf = conf.getShortCircuitConf();
if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
if (clientContext.getUseLegacyBlockReaderLocal()) {
@ -362,6 +371,45 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
return getRemoteBlockReaderFromTcp();
}
private BlockReader tryToCreateExternalBlockReader() {
List<Class<? extends ReplicaAccessorBuilder>> clses =
conf.getReplicaAccessorBuilderClasses();
for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
try {
ByteArrayDataOutput bado = ByteStreams.newDataOutput();
token.write(bado);
byte tokenBytes[] = bado.toByteArray();
Constructor<? extends ReplicaAccessorBuilder> ctor =
cls.getConstructor();
ReplicaAccessorBuilder builder = ctor.newInstance();
ReplicaAccessor accessor = builder.
setAllowShortCircuitReads(allowShortCircuitLocalReads).
setBlock(block.getBlockId(), block.getBlockPoolId()).
setBlockAccessToken(tokenBytes).
setClientName(clientName).
setConfiguration(configuration).
setFileName(fileName).
setVerifyChecksum(verifyChecksum).
setVisibleLength(length).
build();
if (accessor == null) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": No ReplicaAccessor created by " +
cls.getName());
}
} else {
return new ExternalBlockReader(accessor, length, startOffset);
}
} catch (Throwable t) {
LOG.warn("Failed to construct new object of type " +
cls.getName(), t);
}
}
return null;
}
/**
* Get {@link BlockReaderLocalLegacy} for short circuited local reads.
* This block reader implements the path-based style of local reads

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.EnumSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
/**
* An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
* replicas.
*/
@InterfaceAudience.Private
public final class ExternalBlockReader implements BlockReader {
private final ReplicaAccessor accessor;
private final long visibleLength;
private long pos;
ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
long startOffset) {
this.accessor = accessor;
this.visibleLength = visibleLength;
this.pos = startOffset;
}
@Override
public int read(byte[] buf, int off, int len) throws IOException {
int nread = accessor.read(pos, buf, off, len);
pos += nread;
return nread;
}
@Override
public int read(ByteBuffer buf) throws IOException {
int nread = accessor.read(pos, buf);
pos += nread;
return nread;
}
@Override
public long skip(long n) throws IOException {
// You cannot skip backwards
if (n <= 0) {
return 0;
}
// You can't skip past the end of the replica.
long oldPos = pos;
pos += n;
if (pos > visibleLength) {
pos = visibleLength;
}
return pos - oldPos;
}
@Override
public int available() throws IOException {
// We return the amount of bytes that we haven't read yet from the
// replica, based on our current position. Some of the other block
// readers return a shorter length than that. The only advantage to
// returning a shorter length is that the DFSInputStream will
// trash your block reader and create a new one if someone tries to
// seek() beyond the available() region.
long diff = visibleLength - pos;
if (diff > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
} else {
return (int)diff;
}
}
@Override
public void close() throws IOException {
accessor.close();
}
@Override
public void readFully(byte[] buf, int offset, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, offset, len);
}
@Override
public int readAll(byte[] buf, int offset, int len) throws IOException {
return BlockReaderUtil.readAll(this, buf, offset, len);
}
@Override
public boolean isLocal() {
return accessor.isLocal();
}
@Override
public boolean isShortCircuit() {
return accessor.isShortCircuit();
}
@Override
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
// For now, pluggable ReplicaAccessors do not support zero-copy.
return null;
}
}

View File

@ -37,7 +37,8 @@ public enum Op {
TRANSFER_BLOCK((byte)86),
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
REQUEST_SHORT_CIRCUIT_SHM((byte)89);
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
CUSTOM((byte)127);
/** The code for this operation. */
public final byte code;

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCustomProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;

View File

@ -0,0 +1,298 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import com.google.common.primitives.Ints;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.junit.Assert;
import org.junit.Test;
import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.UUID;
public class TestExternalBlockReader {
private static final Log LOG = LogFactory.getLog(TestExternalBlockReader.class);
private static long SEED = 1234;
@Test
public void testMisconfiguredExternalBlockReader() throws Exception {
Configuration conf = new Configuration();
conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY,
"org.apache.hadoop.hdfs.NonExistentReplicaAccessorBuilderClass");
conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.build();
final int TEST_LENGTH = 2048;
DistributedFileSystem dfs = cluster.getFileSystem();
try {
DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED);
FSDataInputStream stream = dfs.open(new Path("/a"));
byte buf[] = new byte[TEST_LENGTH];
IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_LENGTH);
Assert.assertArrayEquals(expected, buf);
stream.close();
} finally {
dfs.close();
cluster.shutdown();
}
}
private static final String SYNTHETIC_BLOCK_READER_TEST_UUID_KEY =
"synthetic.block.reader.test.uuid.key";
private static final HashMap<String, LinkedList<SyntheticReplicaAccessor>>
accessors = new HashMap<String, LinkedList<SyntheticReplicaAccessor>>(1);
public static class SyntheticReplicaAccessorBuilder
extends ReplicaAccessorBuilder {
String fileName;
long blockId;
String blockPoolId;
boolean verifyChecksum;
String clientName;
boolean allowShortCircuit;
long visibleLength;
Configuration conf;
@Override
public ReplicaAccessorBuilder setFileName(String fileName) {
this.fileName = fileName;
return this;
}
@Override
public ReplicaAccessorBuilder setBlock(long blockId, String blockPoolId) {
this.blockId = blockId;
this.blockPoolId = blockPoolId;
return this;
}
@Override
public ReplicaAccessorBuilder setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum;
return this;
}
@Override
public ReplicaAccessorBuilder setClientName(String clientName) {
this.clientName = clientName;
return this;
}
@Override
public ReplicaAccessorBuilder setAllowShortCircuitReads(boolean allowShortCircuit) {
this.allowShortCircuit = allowShortCircuit;
return this;
}
@Override
public ReplicaAccessorBuilder setVisibleLength(long visibleLength) {
this.visibleLength = visibleLength;
return this;
}
@Override
public ReplicaAccessorBuilder setConfiguration(Configuration conf) {
this.conf = conf;
return this;
}
@Override
public ReplicaAccessorBuilder setBlockAccessToken(byte[] token) {
return this;
}
@Override
public ReplicaAccessor build() {
if (visibleLength < 1024) {
LOG.info("SyntheticReplicaAccessorFactory returning null for a " +
"smaller replica with size " + visibleLength); //trace
return null;
}
return new SyntheticReplicaAccessor(this);
}
}
public static class SyntheticReplicaAccessor extends ReplicaAccessor {
final long length;
final byte contents[];
final SyntheticReplicaAccessorBuilder builder;
long totalRead = 0;
int numCloses = 0;
String error = "";
String prefix = "";
SyntheticReplicaAccessor(SyntheticReplicaAccessorBuilder builder) {
this.length = builder.visibleLength;
this.contents = DFSTestUtil.
calculateFileContentsFromSeed(SEED, Ints.checkedCast(length));
this.builder = builder;
String uuid = this.builder.conf.
get(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY);
LinkedList<SyntheticReplicaAccessor> accessorsList =
accessors.get(uuid);
if (accessorsList == null) {
accessorsList = new LinkedList<SyntheticReplicaAccessor>();
}
accessorsList.add(this);
accessors.put(uuid, accessorsList);
}
@Override
public synchronized int read(long pos, byte[] buf, int off, int len)
throws IOException {
if (pos > Integer.MAX_VALUE) {
return 0;
} else if (pos < 0) {
addError("Attempted to read from a location that was less " +
"than 0 at " + pos);
return 0;
}
int i = 0, nread = 0;
for (int ipos = (int)pos;
(ipos < contents.length) && (nread < len);
ipos++) {
buf[i++] = contents[ipos];
nread++;
totalRead++;
LOG.info("ipos = " + ipos + ", contents.length = " + contents.length + ", nread = " + nread + ", len = " + len);
}
return nread;
}
@Override
public synchronized int read(long pos, ByteBuffer buf) throws IOException {
if (pos > Integer.MAX_VALUE) {
return 0;
} else if (pos < 0) {
addError("Attempted to read from a location that was less " +
"than 0 at " + pos);
return 0;
}
int i = 0, nread = 0;
for (int ipos = (int)pos;
ipos < contents.length; ipos++) {
try {
buf.put(contents[ipos]);
} catch (BufferOverflowException bos) {
break;
}
nread++;
totalRead++;
}
return nread;
}
@Override
public synchronized void close() throws IOException {
numCloses++;
}
@Override
public boolean isLocal() {
return true;
}
@Override
public boolean isShortCircuit() {
return true;
}
synchronized String getError() {
return error;
}
synchronized void addError(String text) {
LOG.error("SyntheticReplicaAccessor error: " + text);
error = error + prefix + text;
prefix = "; ";
}
}
@Test
public void testExternalBlockReader() throws Exception {
Configuration conf = new Configuration();
conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY,
SyntheticReplicaAccessorBuilder.class.getName());
conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
String uuid = UUID.randomUUID().toString();
conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.build();
final int TEST_LENGTH = 2047;
DistributedFileSystem dfs = cluster.getFileSystem();
try {
DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED);
HdfsDataInputStream stream =
(HdfsDataInputStream)dfs.open(new Path("/a"));
byte buf[] = new byte[TEST_LENGTH];
IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_LENGTH);
ReadStatistics stats = stream.getReadStatistics();
Assert.assertEquals(1024, stats.getTotalShortCircuitBytesRead());
Assert.assertEquals(2047, stats.getTotalLocalBytesRead());
Assert.assertEquals(2047, stats.getTotalBytesRead());
Assert.assertArrayEquals(expected, buf);
stream.close();
ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, new Path("/a"));
Assert.assertNotNull(block);
LinkedList<SyntheticReplicaAccessor> accessorList = accessors.get(uuid);
Assert.assertNotNull(accessorList);
Assert.assertEquals(2, accessorList.size());
SyntheticReplicaAccessor accessor = accessorList.get(0);
Assert.assertTrue(accessor.builder.allowShortCircuit);
Assert.assertEquals(block.getBlockPoolId(),
accessor.builder.blockPoolId);
Assert.assertEquals(block.getBlockId(),
accessor.builder.blockId);
Assert.assertEquals(dfs.getClient().clientName,
accessor.builder.clientName);
Assert.assertEquals("/a", accessor.builder.fileName);
Assert.assertTrue(accessor.builder.verifyChecksum);
Assert.assertEquals(1024L, accessor.builder.visibleLength);
Assert.assertEquals(1024L, accessor.totalRead);
Assert.assertEquals("", accessor.getError());
Assert.assertEquals(1, accessor.numCloses);
accessors.remove(uuid);
} finally {
dfs.close();
cluster.shutdown();
}
}
}