HDFS-8924. Add pluggable interface for reading replicas in DFSClient. (Colin Patrick McCabe via Lei Xu)
This commit is contained in:
parent
caa636bf10
commit
7087e700e0
|
@ -0,0 +1,88 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
/**
|
||||
* The public API for ReplicaAccessor objects.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public abstract class ReplicaAccessor {
|
||||
/**
|
||||
* Read bytes from the replica.
|
||||
*
|
||||
* @param pos The position in the replica to start reading at.
|
||||
* Must not be negative.
|
||||
* @param buf The byte array to read into.
|
||||
* @param off The offset within buf to start reading into.
|
||||
* @param len The maximum length to read.
|
||||
*
|
||||
* @return The number of bytes read. If the read extends past the end
|
||||
* of the replica, a short read count will be returned. We
|
||||
* will never return a negative number. We will never
|
||||
* return a short read count unless EOF is reached.
|
||||
*/
|
||||
public abstract int read(long pos, byte[] buf, int off, int len)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Read bytes from the replica.
|
||||
*
|
||||
* @param pos The position in the replica to start reading at.
|
||||
* Must not be negative.
|
||||
* @param buf The byte buffer to read into. The amount to read will be
|
||||
* dictated by the remaining bytes between the current
|
||||
* position and the limit. The ByteBuffer may or may not be
|
||||
* direct.
|
||||
*
|
||||
* @return The number of bytes read. If the read extends past the end
|
||||
* of the replica, a short read count will be returned. We
|
||||
* will never return a negative number. We will never return
|
||||
* a short read count unless EOF is reached.
|
||||
*/
|
||||
public abstract int read(long pos, ByteBuffer buf) throws IOException;
|
||||
|
||||
/**
|
||||
* Release the resources associated with the ReplicaAccessor.
|
||||
*
|
||||
* It is recommended that implementations never throw an IOException. The
|
||||
* method is declared as throwing IOException in order to remain compatible
|
||||
* with java.io.Closeable. If an exception is thrown, the ReplicaAccessor
|
||||
* must still be closed when the function returns in order to prevent a
|
||||
* resource leak.
|
||||
*/
|
||||
public abstract void close() throws IOException;
|
||||
|
||||
/**
|
||||
* Return true if bytes read via this accessor should count towards the
|
||||
* local byte count statistics.
|
||||
*/
|
||||
public abstract boolean isLocal();
|
||||
|
||||
/**
|
||||
* Return true if bytes read via this accessor should count towards the
|
||||
* short-circuit byte count statistics.
|
||||
*/
|
||||
public abstract boolean isShortCircuit();
|
||||
}
|
|
@ -0,0 +1,101 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
|
||||
/**
|
||||
* The public API for creating a new ReplicaAccessor.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Stable
|
||||
public abstract class ReplicaAccessorBuilder {
|
||||
/**
|
||||
* Set the file name which is being opened. Provided for debugging purposes.
|
||||
*/
|
||||
public abstract ReplicaAccessorBuilder setFileName(String fileName);
|
||||
|
||||
/** Set the block ID and block pool ID which are being opened. */
|
||||
public abstract ReplicaAccessorBuilder
|
||||
setBlock(long blockId, String blockPoolId);
|
||||
|
||||
/**
|
||||
* Set whether checksums must be verified. Checksums should be skipped if
|
||||
* the user has disabled checksum verification in the configuration. Users
|
||||
* may wish to do this if their software does checksum verification at a
|
||||
* higher level than HDFS.
|
||||
*/
|
||||
public abstract ReplicaAccessorBuilder
|
||||
setVerifyChecksum(boolean verifyChecksum);
|
||||
|
||||
/** Set the name of the HDFS client. Provided for debugging purposes. */
|
||||
public abstract ReplicaAccessorBuilder setClientName(String clientName);
|
||||
|
||||
/**
|
||||
* Set whether short-circuit is enabled. Short-circuit may be disabled if
|
||||
* the user has set dfs.client.read.shortcircuit to false, or if the block
|
||||
* being read is under construction. The fact that this bit is enabled does
|
||||
* not mean that the user has permission to do short-circuit reads or to
|
||||
* access the replica-- that must be checked separately by the
|
||||
* ReplicaAccessorBuilder implementation.
|
||||
*/
|
||||
public abstract ReplicaAccessorBuilder
|
||||
setAllowShortCircuitReads(boolean allowShortCircuit);
|
||||
|
||||
/**
|
||||
* Set the length of the replica which is visible to this client. If bytes
|
||||
* are added later, they will not be visible to the ReplicaAccessor we are
|
||||
* building. In order to see more of the replica, the client must re-open
|
||||
* this HDFS file. The visible length provides an upper bound, but not a
|
||||
* lower one. If the replica is deleted or truncated, fewer bytes may be
|
||||
* visible than specified here.
|
||||
*/
|
||||
public abstract ReplicaAccessorBuilder setVisibleLength(long visibleLength);
|
||||
|
||||
/**
|
||||
* Set the configuration to use. ReplicaAccessorBuilder subclasses should
|
||||
* define their own configuration prefix. For example, the foobar plugin
|
||||
* could look for configuration keys like foo.bar.parameter1,
|
||||
* foo.bar.parameter2.
|
||||
*/
|
||||
public abstract ReplicaAccessorBuilder setConfiguration(Configuration conf);
|
||||
|
||||
/**
|
||||
* Set the block access token to use.
|
||||
*/
|
||||
public abstract ReplicaAccessorBuilder setBlockAccessToken(byte[] token);
|
||||
|
||||
/**
|
||||
* Build a new ReplicaAccessor.
|
||||
*
|
||||
* The implementation must perform any necessary access checks before
|
||||
* constructing the ReplicaAccessor. If there is a hardware-level or
|
||||
* network-level setup operation that could fail, it should be done here. If
|
||||
* the implementation returns a ReplicaAccessor, we will assume that it works
|
||||
* and not attempt to construct a normal BlockReader.
|
||||
*
|
||||
* If the ReplicaAccessor could not be built, implementations may wish to log
|
||||
* a message at TRACE level indicating why.
|
||||
*
|
||||
* @return null if the ReplicaAccessor could not be built; the
|
||||
* ReplicaAccessor otherwise.
|
||||
*/
|
||||
public abstract ReplicaAccessor build();
|
||||
}
|
|
@ -113,6 +113,9 @@ public interface HdfsClientConfigKeys {
|
|||
"dfs.datanode.hdfs-blocks-metadata.enabled";
|
||||
boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
|
||||
|
||||
static final String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
|
||||
PREFIX + "replica.accessor.builder.classes";
|
||||
|
||||
/** dfs.client.retry configuration properties */
|
||||
interface Retry {
|
||||
String PREFIX = HdfsClientConfigKeys.PREFIX + "retry.";
|
||||
|
|
|
@ -23,7 +23,9 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.Options.ChecksumOpt;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.ReplicaAccessorBuilder;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.util.ByteArrayManager;
|
||||
import org.apache.hadoop.ipc.Client;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
@ -83,6 +85,11 @@ import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Retry;
|
|||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.ShortCircuit;
|
||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write;
|
||||
|
||||
import java.lang.Class;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* DFSClient configuration.
|
||||
*/
|
||||
|
@ -126,6 +133,8 @@ public class DfsClientConf {
|
|||
|
||||
private final long hedgedReadThresholdMillis;
|
||||
private final int hedgedReadThreadpoolSize;
|
||||
private final List<Class<? extends ReplicaAccessorBuilder>>
|
||||
replicaAccessorBuilderClasses;
|
||||
|
||||
public DfsClientConf(Configuration conf) {
|
||||
// The hdfsTimeout is currently the same as the ipc timeout
|
||||
|
@ -231,8 +240,35 @@ public class DfsClientConf {
|
|||
HedgedRead.THRESHOLD_MILLIS_KEY,
|
||||
HedgedRead.THRESHOLD_MILLIS_DEFAULT);
|
||||
hedgedReadThreadpoolSize = conf.getInt(
|
||||
HedgedRead.THREADPOOL_SIZE_KEY,
|
||||
HedgedRead.THREADPOOL_SIZE_DEFAULT);
|
||||
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
|
||||
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
|
||||
|
||||
replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<Class<? extends ReplicaAccessorBuilder>>
|
||||
loadReplicaAccessorBuilderClasses(Configuration conf)
|
||||
{
|
||||
String classNames[] = conf.getTrimmedStrings(
|
||||
HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY);
|
||||
if (classNames.length == 0) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
ArrayList<Class<? extends ReplicaAccessorBuilder>> classes =
|
||||
new ArrayList<Class<? extends ReplicaAccessorBuilder>>();
|
||||
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
|
||||
for (String className: classNames) {
|
||||
try {
|
||||
Class<? extends ReplicaAccessorBuilder> cls =
|
||||
(Class<? extends ReplicaAccessorBuilder>)
|
||||
classLoader.loadClass(className);
|
||||
classes.add(cls);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Unable to load " + className, t);
|
||||
}
|
||||
}
|
||||
return classes;
|
||||
}
|
||||
|
||||
private DataChecksum.Type getChecksumType(Configuration conf) {
|
||||
|
@ -488,6 +524,14 @@ public class DfsClientConf {
|
|||
return hedgedReadThreadpoolSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the replicaAccessorBuilderClasses
|
||||
*/
|
||||
public List<Class<? extends ReplicaAccessorBuilder>>
|
||||
getReplicaAccessorBuilderClasses() {
|
||||
return replicaAccessorBuilderClasses;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the shortCircuitConf
|
||||
*/
|
||||
|
|
|
@ -302,3 +302,7 @@ message OpBlockChecksumResponseProto {
|
|||
required bytes md5 = 3;
|
||||
optional ChecksumTypeProto crcType = 4;
|
||||
}
|
||||
|
||||
message OpCustomProto {
|
||||
required string customId = 1;
|
||||
}
|
||||
|
|
|
@ -24,8 +24,12 @@ import java.io.DataInputStream;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.List;
|
||||
|
||||
import com.google.common.io.ByteArrayDataOutput;
|
||||
import com.google.common.io.ByteStreams;
|
||||
import org.apache.commons.lang.mutable.MutableBoolean;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -52,6 +56,7 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplica;
|
|||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
|
||||
import org.apache.hadoop.io.ByteWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.net.unix.DomainSocket;
|
||||
|
@ -326,6 +331,10 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
BlockReader reader = null;
|
||||
|
||||
Preconditions.checkNotNull(configuration);
|
||||
reader = tryToCreateExternalBlockReader();
|
||||
if (reader != null) {
|
||||
return reader;
|
||||
}
|
||||
final ShortCircuitConf scConf = conf.getShortCircuitConf();
|
||||
if (scConf.isShortCircuitLocalReads() && allowShortCircuitLocalReads) {
|
||||
if (clientContext.getUseLegacyBlockReaderLocal()) {
|
||||
|
@ -362,6 +371,45 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
|||
return getRemoteBlockReaderFromTcp();
|
||||
}
|
||||
|
||||
private BlockReader tryToCreateExternalBlockReader() {
|
||||
List<Class<? extends ReplicaAccessorBuilder>> clses =
|
||||
conf.getReplicaAccessorBuilderClasses();
|
||||
for (Class<? extends ReplicaAccessorBuilder> cls : clses) {
|
||||
try {
|
||||
ByteArrayDataOutput bado = ByteStreams.newDataOutput();
|
||||
token.write(bado);
|
||||
byte tokenBytes[] = bado.toByteArray();
|
||||
|
||||
Constructor<? extends ReplicaAccessorBuilder> ctor =
|
||||
cls.getConstructor();
|
||||
ReplicaAccessorBuilder builder = ctor.newInstance();
|
||||
ReplicaAccessor accessor = builder.
|
||||
setAllowShortCircuitReads(allowShortCircuitLocalReads).
|
||||
setBlock(block.getBlockId(), block.getBlockPoolId()).
|
||||
setBlockAccessToken(tokenBytes).
|
||||
setClientName(clientName).
|
||||
setConfiguration(configuration).
|
||||
setFileName(fileName).
|
||||
setVerifyChecksum(verifyChecksum).
|
||||
setVisibleLength(length).
|
||||
build();
|
||||
if (accessor == null) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this + ": No ReplicaAccessor created by " +
|
||||
cls.getName());
|
||||
}
|
||||
} else {
|
||||
return new ExternalBlockReader(accessor, length, startOffset);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Failed to construct new object of type " +
|
||||
cls.getName(), t);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get {@link BlockReaderLocalLegacy} for short circuited local reads.
|
||||
* This block reader implements the path-based style of local reads
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.EnumSet;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.fs.ReadOption;
|
||||
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
|
||||
|
||||
/**
|
||||
* An ExternalBlockReader uses pluggable ReplicaAccessor objects to read from
|
||||
* replicas.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public final class ExternalBlockReader implements BlockReader {
|
||||
private final ReplicaAccessor accessor;
|
||||
private final long visibleLength;
|
||||
private long pos;
|
||||
|
||||
ExternalBlockReader(ReplicaAccessor accessor, long visibleLength,
|
||||
long startOffset) {
|
||||
this.accessor = accessor;
|
||||
this.visibleLength = visibleLength;
|
||||
this.pos = startOffset;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(byte[] buf, int off, int len) throws IOException {
|
||||
int nread = accessor.read(pos, buf, off, len);
|
||||
pos += nread;
|
||||
return nread;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int read(ByteBuffer buf) throws IOException {
|
||||
int nread = accessor.read(pos, buf);
|
||||
pos += nread;
|
||||
return nread;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long skip(long n) throws IOException {
|
||||
// You cannot skip backwards
|
||||
if (n <= 0) {
|
||||
return 0;
|
||||
}
|
||||
// You can't skip past the end of the replica.
|
||||
long oldPos = pos;
|
||||
pos += n;
|
||||
if (pos > visibleLength) {
|
||||
pos = visibleLength;
|
||||
}
|
||||
return pos - oldPos;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int available() throws IOException {
|
||||
// We return the amount of bytes that we haven't read yet from the
|
||||
// replica, based on our current position. Some of the other block
|
||||
// readers return a shorter length than that. The only advantage to
|
||||
// returning a shorter length is that the DFSInputStream will
|
||||
// trash your block reader and create a new one if someone tries to
|
||||
// seek() beyond the available() region.
|
||||
long diff = visibleLength - pos;
|
||||
if (diff > Integer.MAX_VALUE) {
|
||||
return Integer.MAX_VALUE;
|
||||
} else {
|
||||
return (int)diff;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
accessor.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFully(byte[] buf, int offset, int len) throws IOException {
|
||||
BlockReaderUtil.readFully(this, buf, offset, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int readAll(byte[] buf, int offset, int len) throws IOException {
|
||||
return BlockReaderUtil.readAll(this, buf, offset, len);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLocal() {
|
||||
return accessor.isLocal();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShortCircuit() {
|
||||
return accessor.isShortCircuit();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientMmap getClientMmap(EnumSet<ReadOption> opts) {
|
||||
// For now, pluggable ReplicaAccessors do not support zero-copy.
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -37,7 +37,8 @@ public enum Op {
|
|||
TRANSFER_BLOCK((byte)86),
|
||||
REQUEST_SHORT_CIRCUIT_FDS((byte)87),
|
||||
RELEASE_SHORT_CIRCUIT_FDS((byte)88),
|
||||
REQUEST_SHORT_CIRCUIT_SHM((byte)89);
|
||||
REQUEST_SHORT_CIRCUIT_SHM((byte)89),
|
||||
CUSTOM((byte)127);
|
||||
|
||||
/** The code for this operation. */
|
||||
public final byte code;
|
||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCustomProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
|
||||
|
|
|
@ -0,0 +1,298 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSInputStream.ReadStatistics;
|
||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.BufferOverflowException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.HashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.UUID;
|
||||
|
||||
public class TestExternalBlockReader {
|
||||
private static final Log LOG = LogFactory.getLog(TestExternalBlockReader.class);
|
||||
|
||||
private static long SEED = 1234;
|
||||
|
||||
@Test
|
||||
public void testMisconfiguredExternalBlockReader() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY,
|
||||
"org.apache.hadoop.hdfs.NonExistentReplicaAccessorBuilderClass");
|
||||
conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.build();
|
||||
final int TEST_LENGTH = 2048;
|
||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
try {
|
||||
DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED);
|
||||
FSDataInputStream stream = dfs.open(new Path("/a"));
|
||||
byte buf[] = new byte[TEST_LENGTH];
|
||||
IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
|
||||
byte expected[] = DFSTestUtil.
|
||||
calculateFileContentsFromSeed(SEED, TEST_LENGTH);
|
||||
Assert.assertArrayEquals(expected, buf);
|
||||
stream.close();
|
||||
} finally {
|
||||
dfs.close();
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private static final String SYNTHETIC_BLOCK_READER_TEST_UUID_KEY =
|
||||
"synthetic.block.reader.test.uuid.key";
|
||||
|
||||
private static final HashMap<String, LinkedList<SyntheticReplicaAccessor>>
|
||||
accessors = new HashMap<String, LinkedList<SyntheticReplicaAccessor>>(1);
|
||||
|
||||
public static class SyntheticReplicaAccessorBuilder
|
||||
extends ReplicaAccessorBuilder {
|
||||
String fileName;
|
||||
long blockId;
|
||||
String blockPoolId;
|
||||
boolean verifyChecksum;
|
||||
String clientName;
|
||||
boolean allowShortCircuit;
|
||||
long visibleLength;
|
||||
Configuration conf;
|
||||
|
||||
@Override
|
||||
public ReplicaAccessorBuilder setFileName(String fileName) {
|
||||
this.fileName = fileName;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicaAccessorBuilder setBlock(long blockId, String blockPoolId) {
|
||||
this.blockId = blockId;
|
||||
this.blockPoolId = blockPoolId;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicaAccessorBuilder setVerifyChecksum(boolean verifyChecksum) {
|
||||
this.verifyChecksum = verifyChecksum;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicaAccessorBuilder setClientName(String clientName) {
|
||||
this.clientName = clientName;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicaAccessorBuilder setAllowShortCircuitReads(boolean allowShortCircuit) {
|
||||
this.allowShortCircuit = allowShortCircuit;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicaAccessorBuilder setVisibleLength(long visibleLength) {
|
||||
this.visibleLength = visibleLength;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicaAccessorBuilder setConfiguration(Configuration conf) {
|
||||
this.conf = conf;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicaAccessorBuilder setBlockAccessToken(byte[] token) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReplicaAccessor build() {
|
||||
if (visibleLength < 1024) {
|
||||
LOG.info("SyntheticReplicaAccessorFactory returning null for a " +
|
||||
"smaller replica with size " + visibleLength); //trace
|
||||
return null;
|
||||
}
|
||||
return new SyntheticReplicaAccessor(this);
|
||||
}
|
||||
}
|
||||
|
||||
public static class SyntheticReplicaAccessor extends ReplicaAccessor {
|
||||
final long length;
|
||||
final byte contents[];
|
||||
final SyntheticReplicaAccessorBuilder builder;
|
||||
long totalRead = 0;
|
||||
int numCloses = 0;
|
||||
String error = "";
|
||||
String prefix = "";
|
||||
|
||||
SyntheticReplicaAccessor(SyntheticReplicaAccessorBuilder builder) {
|
||||
this.length = builder.visibleLength;
|
||||
this.contents = DFSTestUtil.
|
||||
calculateFileContentsFromSeed(SEED, Ints.checkedCast(length));
|
||||
this.builder = builder;
|
||||
String uuid = this.builder.conf.
|
||||
get(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY);
|
||||
LinkedList<SyntheticReplicaAccessor> accessorsList =
|
||||
accessors.get(uuid);
|
||||
if (accessorsList == null) {
|
||||
accessorsList = new LinkedList<SyntheticReplicaAccessor>();
|
||||
}
|
||||
accessorsList.add(this);
|
||||
accessors.put(uuid, accessorsList);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int read(long pos, byte[] buf, int off, int len)
|
||||
throws IOException {
|
||||
if (pos > Integer.MAX_VALUE) {
|
||||
return 0;
|
||||
} else if (pos < 0) {
|
||||
addError("Attempted to read from a location that was less " +
|
||||
"than 0 at " + pos);
|
||||
return 0;
|
||||
}
|
||||
int i = 0, nread = 0;
|
||||
for (int ipos = (int)pos;
|
||||
(ipos < contents.length) && (nread < len);
|
||||
ipos++) {
|
||||
buf[i++] = contents[ipos];
|
||||
nread++;
|
||||
totalRead++;
|
||||
LOG.info("ipos = " + ipos + ", contents.length = " + contents.length + ", nread = " + nread + ", len = " + len);
|
||||
}
|
||||
return nread;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized int read(long pos, ByteBuffer buf) throws IOException {
|
||||
if (pos > Integer.MAX_VALUE) {
|
||||
return 0;
|
||||
} else if (pos < 0) {
|
||||
addError("Attempted to read from a location that was less " +
|
||||
"than 0 at " + pos);
|
||||
return 0;
|
||||
}
|
||||
int i = 0, nread = 0;
|
||||
for (int ipos = (int)pos;
|
||||
ipos < contents.length; ipos++) {
|
||||
try {
|
||||
buf.put(contents[ipos]);
|
||||
} catch (BufferOverflowException bos) {
|
||||
break;
|
||||
}
|
||||
nread++;
|
||||
totalRead++;
|
||||
}
|
||||
return nread;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void close() throws IOException {
|
||||
numCloses++;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isLocal() {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isShortCircuit() {
|
||||
return true;
|
||||
}
|
||||
|
||||
synchronized String getError() {
|
||||
return error;
|
||||
}
|
||||
|
||||
synchronized void addError(String text) {
|
||||
LOG.error("SyntheticReplicaAccessor error: " + text);
|
||||
error = error + prefix + text;
|
||||
prefix = "; ";
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testExternalBlockReader() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(HdfsClientConfigKeys.REPLICA_ACCESSOR_BUILDER_CLASSES_KEY,
|
||||
SyntheticReplicaAccessorBuilder.class.getName());
|
||||
conf.setLong(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
|
||||
conf.setLong(DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY, 0);
|
||||
String uuid = UUID.randomUUID().toString();
|
||||
conf.set(SYNTHETIC_BLOCK_READER_TEST_UUID_KEY, uuid);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.build();
|
||||
final int TEST_LENGTH = 2047;
|
||||
DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
try {
|
||||
DFSTestUtil.createFile(dfs, new Path("/a"), TEST_LENGTH, (short)1, SEED);
|
||||
HdfsDataInputStream stream =
|
||||
(HdfsDataInputStream)dfs.open(new Path("/a"));
|
||||
byte buf[] = new byte[TEST_LENGTH];
|
||||
IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
|
||||
byte expected[] = DFSTestUtil.
|
||||
calculateFileContentsFromSeed(SEED, TEST_LENGTH);
|
||||
ReadStatistics stats = stream.getReadStatistics();
|
||||
Assert.assertEquals(1024, stats.getTotalShortCircuitBytesRead());
|
||||
Assert.assertEquals(2047, stats.getTotalLocalBytesRead());
|
||||
Assert.assertEquals(2047, stats.getTotalBytesRead());
|
||||
Assert.assertArrayEquals(expected, buf);
|
||||
stream.close();
|
||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(dfs, new Path("/a"));
|
||||
Assert.assertNotNull(block);
|
||||
LinkedList<SyntheticReplicaAccessor> accessorList = accessors.get(uuid);
|
||||
Assert.assertNotNull(accessorList);
|
||||
Assert.assertEquals(2, accessorList.size());
|
||||
SyntheticReplicaAccessor accessor = accessorList.get(0);
|
||||
Assert.assertTrue(accessor.builder.allowShortCircuit);
|
||||
Assert.assertEquals(block.getBlockPoolId(),
|
||||
accessor.builder.blockPoolId);
|
||||
Assert.assertEquals(block.getBlockId(),
|
||||
accessor.builder.blockId);
|
||||
Assert.assertEquals(dfs.getClient().clientName,
|
||||
accessor.builder.clientName);
|
||||
Assert.assertEquals("/a", accessor.builder.fileName);
|
||||
Assert.assertTrue(accessor.builder.verifyChecksum);
|
||||
Assert.assertEquals(1024L, accessor.builder.visibleLength);
|
||||
Assert.assertEquals(1024L, accessor.totalRead);
|
||||
Assert.assertEquals("", accessor.getError());
|
||||
Assert.assertEquals(1, accessor.numCloses);
|
||||
accessors.remove(uuid);
|
||||
} finally {
|
||||
dfs.close();
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue