HDFS-8925. Move BlockReaderLocal to hdfs-client. Contributed by Mingliang Liu.

This commit is contained in:
Haohui Mai 2015-08-28 14:20:55 -07:00
parent 159969f658
commit a3d0534558
38 changed files with 200 additions and 182 deletions

View File

@ -22,11 +22,10 @@ import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.EnumSet; import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf; import org.apache.hadoop.hdfs.client.impl.DfsClientConf.ShortCircuitConf;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
@ -42,6 +41,9 @@ import org.apache.htrace.TraceScope;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* BlockReaderLocal enables local short circuited reads. If the DFS client is on * BlockReaderLocal enables local short circuited reads. If the DFS client is on
* the same machine as the datanode, then the client can read files directly * the same machine as the datanode, then the client can read files directly
@ -60,7 +62,7 @@ import com.google.common.base.Preconditions;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class BlockReaderLocal implements BlockReader { class BlockReaderLocal implements BlockReader {
static final Log LOG = LogFactory.getLog(BlockReaderLocal.class); static final Logger LOG = LoggerFactory.getLogger(BlockReaderLocal.class);
private static final DirectBufferPool bufferPool = new DirectBufferPool(); private static final DirectBufferPool bufferPool = new DirectBufferPool();
@ -88,7 +90,7 @@ class BlockReaderLocal implements BlockReader {
public Builder setCachingStrategy(CachingStrategy cachingStrategy) { public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
long readahead = cachingStrategy.getReadahead() != null ? long readahead = cachingStrategy.getReadahead() != null ?
cachingStrategy.getReadahead() : cachingStrategy.getReadahead() :
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT; HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead); this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
return this; return this;
} }

View File

@ -29,8 +29,6 @@ import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.ReadOption;
@ -45,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; import org.apache.hadoop.hdfs.shortcircuit.ClientMmap;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
@ -55,6 +54,9 @@ import org.apache.htrace.Sampler;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on * BlockReaderLocalLegacy enables local short circuited reads. If the DFS client is on
* the same machine as the datanode, then the client can read files directly * the same machine as the datanode, then the client can read files directly
@ -79,7 +81,8 @@ import org.apache.htrace.TraceScope;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class BlockReaderLocalLegacy implements BlockReader { class BlockReaderLocalLegacy implements BlockReader {
private static final Log LOG = LogFactory.getLog(BlockReaderLocalLegacy.class); private static final Logger LOG = LoggerFactory.getLogger(
BlockReaderLocalLegacy.class);
//Stores the cache and proxy for a local datanode. //Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo { private static class LocalDatanodeInfo {
@ -112,7 +115,7 @@ class BlockReaderLocalLegacy implements BlockReader {
proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() { proxy = ugi.doAs(new PrivilegedExceptionAction<ClientDatanodeProtocol>() {
@Override @Override
public ClientDatanodeProtocol run() throws Exception { public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(node, conf, return DFSUtilClient.createClientDatanodeProtocolProxy(node, conf,
socketTimeout, connectToDnViaHostname); socketTimeout, connectToDnViaHostname);
} }
}); });
@ -244,7 +247,7 @@ class BlockReaderLocalLegacy implements BlockReader {
} catch (IOException e) { } catch (IOException e) {
// remove from cache // remove from cache
localDatanodeInfo.removeBlockLocalPathInfo(blk); localDatanodeInfo.removeBlockLocalPathInfo(blk);
DFSClient.LOG.warn("BlockReaderLocalLegacy: Removing " + blk LOG.warn("BlockReaderLocalLegacy: Removing " + blk
+ " from cache because local file " + pathinfo.getBlockPath() + " from cache because local file " + pathinfo.getBlockPath()
+ " could not be opened."); + " could not be opened.");
throw e; throw e;
@ -689,7 +692,7 @@ class BlockReaderLocalLegacy implements BlockReader {
@Override @Override
public synchronized void close() throws IOException { public synchronized void close() throws IOException {
IOUtils.cleanup(LOG, dataIn, checksumIn); IOUtilsClient.cleanup(LOG, dataIn, checksumIn);
if (slowReadBuff != null) { if (slowReadBuff != null) {
bufferPool.returnBuffer(slowReadBuff); bufferPool.returnBuffer(slowReadBuff);
slowReadBuff = null; slowReadBuff = null;

View File

@ -19,8 +19,6 @@ package org.apache.hadoop.hdfs;
import java.util.HashMap; import java.util.HashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@ -32,6 +30,9 @@ import org.apache.hadoop.hdfs.util.ByteArrayManager;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* ClientContext contains context information for a client. * ClientContext contains context information for a client.
* *
@ -40,7 +41,7 @@ import com.google.common.annotations.VisibleForTesting;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ClientContext { public class ClientContext {
private static final Log LOG = LogFactory.getLog(ClientContext.class); private static final Logger LOG = LoggerFactory.getLogger(ClientContext.class);
/** /**
* Global map of context names to caches contexts. * Global map of context names to caches contexts.

View File

@ -22,22 +22,32 @@ import com.google.common.collect.Maps;
import com.google.common.primitives.SignedBytes; import com.google.common.primitives.SignedBytes;
import org.apache.commons.io.Charsets; import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderFactory;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.web.WebHdfsConstants; import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import javax.net.SocketFactory;
import java.io.IOException;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.SimpleDateFormat; import java.text.SimpleDateFormat;
import java.util.Collection; import java.util.Collection;
import java.util.Collections; import java.util.Collections;
@ -455,4 +465,62 @@ public class DFSUtilClient {
localAddrMap.put(addr.getHostAddress(), local); localAddrMap.put(addr.getHostAddress(), local);
return local; return local;
} }
/** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
connectToDnViaHostname, locatedBlock);
}
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(
datanodeid, conf, socketTimeout, connectToDnViaHostname);
}
/** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
}
/**
* Creates a new KeyProvider from the given Configuration.
*
* @param conf Configuration
* @return new KeyProvider, or null if no provider was found.
* @throws IOException if the KeyProvider is improperly specified in
* the Configuration
*/
public static KeyProvider createKeyProvider(
final Configuration conf) throws IOException {
final String providerUriStr =
conf.getTrimmed(HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
// No provider set in conf
if (providerUriStr.isEmpty()) {
return null;
}
final URI providerUri;
try {
providerUri = new URI(providerUriStr);
} catch (URISyntaxException e) {
throw new IOException(e);
}
KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
if (keyProvider == null) {
throw new IOException("Could not instantiate KeyProvider from " +
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '"
+ providerUriStr + "'");
}
if (keyProvider.isTransient()) {
throw new IOException("KeyProvider " + keyProvider.toString()
+ " was found but it is a transient provider.");
}
return keyProvider;
}
} }

View File

@ -21,14 +21,12 @@ import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider; import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache; import com.google.common.cache.Cache;
@ -36,10 +34,13 @@ import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener; import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification; import com.google.common.cache.RemovalNotification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@InterfaceAudience.Private @InterfaceAudience.Private
public class KeyProviderCache { public class KeyProviderCache {
public static final Log LOG = LogFactory.getLog(KeyProviderCache.class); public static final Logger LOG = LoggerFactory.getLogger(KeyProviderCache.class);
private final Cache<URI, KeyProvider> cache; private final Cache<URI, KeyProvider> cache;
@ -72,7 +73,7 @@ public class KeyProviderCache {
return cache.get(kpURI, new Callable<KeyProvider>() { return cache.get(kpURI, new Callable<KeyProvider>() {
@Override @Override
public KeyProvider call() throws Exception { public KeyProvider call() throws Exception {
return DFSUtil.createKeyProvider(conf); return DFSUtilClient.createKeyProvider(conf);
} }
}); });
} catch (Exception e) { } catch (Exception e) {
@ -83,11 +84,11 @@ public class KeyProviderCache {
private URI createKeyProviderURI(Configuration conf) { private URI createKeyProviderURI(Configuration conf) {
final String providerUriStr = final String providerUriStr =
conf.getTrimmed(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, ""); conf.getTrimmed(HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
// No provider set in conf // No provider set in conf
if (providerUriStr.isEmpty()) { if (providerUriStr.isEmpty()) {
LOG.error("Could not find uri with key [" LOG.error("Could not find uri with key ["
+ DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI
+ "] to create a keyProvider !!"); + "] to create a keyProvider !!");
return null; return null;
} }

View File

@ -27,15 +27,16 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.LinkedListMultimap; import com.google.common.collect.LinkedListMultimap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.util.IOUtilsClient;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* A cache of input stream sockets to Data Node. * A cache of input stream sockets to Data Node.
@ -44,7 +45,7 @@ import org.apache.hadoop.util.Time;
@InterfaceAudience.Private @InterfaceAudience.Private
@VisibleForTesting @VisibleForTesting
public class PeerCache { public class PeerCache {
private static final Log LOG = LogFactory.getLog(PeerCache.class); private static final Logger LOG = LoggerFactory.getLogger(PeerCache.class);
private static class Key { private static class Key {
final DatanodeID dnID; final DatanodeID dnID;
@ -188,7 +189,7 @@ public class PeerCache {
if (peer.isClosed()) return; if (peer.isClosed()) return;
if (capacity <= 0) { if (capacity <= 0) {
// Cache disabled. // Cache disabled.
IOUtils.cleanup(LOG, peer); IOUtilsClient.cleanup(LOG, peer);
return; return;
} }
putInternal(dnId, peer); putInternal(dnId, peer);
@ -222,7 +223,7 @@ public class PeerCache {
expiryPeriod) { expiryPeriod) {
break; break;
} }
IOUtils.cleanup(LOG, entry.getValue().getPeer()); IOUtilsClient.cleanup(LOG, entry.getValue().getPeer());
iter.remove(); iter.remove();
} }
} }
@ -241,7 +242,7 @@ public class PeerCache {
"capacity: " + capacity); "capacity: " + capacity);
} }
Entry<Key, Value> entry = iter.next(); Entry<Key, Value> entry = iter.next();
IOUtils.cleanup(LOG, entry.getValue().getPeer()); IOUtilsClient.cleanup(LOG, entry.getValue().getPeer());
iter.remove(); iter.remove();
} }
@ -269,7 +270,7 @@ public class PeerCache {
@VisibleForTesting @VisibleForTesting
synchronized void clear() { synchronized void clear() {
for (Value value : multimap.values()) { for (Value value : multimap.values()) {
IOUtils.cleanup(LOG, value.getPeer()); IOUtilsClient.cleanup(LOG, value.getPeer());
} }
multimap.clear(); multimap.clear();
} }

View File

@ -117,6 +117,11 @@ public interface HdfsClientConfigKeys {
"dfs.datanode.hdfs-blocks-metadata.enabled"; "dfs.datanode.hdfs-blocks-metadata.enabled";
boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false; boolean DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT = false;
String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal";
String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB
String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri";
String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY = String REPLICA_ACCESSOR_BUILDER_CLASSES_KEY =
PREFIX + "replica.accessor.builder.classes"; PREFIX + "replica.accessor.builder.classes";

View File

@ -20,13 +20,11 @@ package org.apache.hadoop.hdfs.protocol;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.ReconfigurationTaskStatus; import org.apache.hadoop.conf.ReconfigurationTaskStatus;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
@ -38,7 +36,7 @@ import org.apache.hadoop.security.token.TokenInfo;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
@KerberosInfo( @KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY) serverPrincipal = HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
@TokenInfo(BlockTokenSelector.class) @TokenInfo(BlockTokenSelector.class)
public interface ClientDatanodeProtocol { public interface ClientDatanodeProtocol {
/** /**

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hdfs.protocolPB; package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector; import org.apache.hadoop.hdfs.security.token.block.BlockTokenSelector;
import org.apache.hadoop.ipc.ProtocolInfo; import org.apache.hadoop.ipc.ProtocolInfo;
@ -26,7 +26,7 @@ import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenInfo;
@KerberosInfo( @KerberosInfo(
serverPrincipal = DFSConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY) serverPrincipal = HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY)
@TokenInfo(BlockTokenSelector.class) @TokenInfo(BlockTokenSelector.class)
@ProtocolInfo(protocolName = @ProtocolInfo(protocolName =
"org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol", "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol",

View File

@ -28,8 +28,6 @@ import javax.net.SocketFactory;
import com.google.common.base.Optional; import com.google.common.base.Optional;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -77,6 +75,8 @@ import com.google.common.primitives.Longs;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This class is the client side translator to translate the requests made on * This class is the client side translator to translate the requests made on
@ -88,8 +88,8 @@ import com.google.protobuf.ServiceException;
public class ClientDatanodeProtocolTranslatorPB implements public class ClientDatanodeProtocolTranslatorPB implements
ProtocolMetaInterface, ClientDatanodeProtocol, ProtocolMetaInterface, ClientDatanodeProtocol,
ProtocolTranslator, Closeable { ProtocolTranslator, Closeable {
public static final Log LOG = LogFactory public static final Logger LOG = LoggerFactory
.getLog(ClientDatanodeProtocolTranslatorPB.class); .getLogger(ClientDatanodeProtocolTranslatorPB.class);
/** RpcController is not used and hence is set to null */ /** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null; private final static RpcController NULL_CONTROLLER = null;
@ -226,7 +226,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
return new BlockLocalPathInfo(PBHelper.convert(resp.getBlock()), return new BlockLocalPathInfo(PBHelperClient.convert(resp.getBlock()),
resp.getLocalPath(), resp.getLocalMetaPath()); resp.getLocalPath(), resp.getLocalMetaPath());
} }
@ -294,7 +294,7 @@ public class ClientDatanodeProtocolTranslatorPB implements
GetDatanodeInfoResponseProto response; GetDatanodeInfoResponseProto response;
try { try {
response = rpcProxy.getDatanodeInfo(NULL_CONTROLLER, VOID_GET_DATANODE_INFO); response = rpcProxy.getDatanodeInfo(NULL_CONTROLLER, VOID_GET_DATANODE_INFO);
return PBHelper.convert(response.getLocalInfo()); return PBHelperClient.convert(response.getLocalInfo());
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }

View File

@ -23,12 +23,14 @@ import com.google.protobuf.CodedInputStream;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeLocalInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeLocalInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
@ -185,6 +187,17 @@ public class PBHelperClient {
return pinnings; return pinnings;
} }
public static ExtendedBlock convert(ExtendedBlockProto eb) {
if (eb == null) return null;
return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(),
eb.getGenerationStamp());
}
public static DatanodeLocalInfo convert(DatanodeLocalInfoProto proto) {
return new DatanodeLocalInfo(proto.getSoftwareVersion(),
proto.getConfigVersion(), proto.getUptime());
}
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) { static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
if (di == null) return null; if (di == null) return null;
return convert(di); return convert(di);

View File

@ -513,6 +513,9 @@ Release 2.8.0 - UNRELEASED
HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone HDFS-8938. Extract BlockToMarkCorrupt and ReplicationWork as standalone
classes from BlockManager. (Mingliang Liu via wheat9) classes from BlockManager. (Mingliang Liu via wheat9)
HDFS-8925. Move BlockReaderLocal to hdfs-client.
(Mingliang Liu via wheat9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

View File

@ -350,7 +350,8 @@ class BlockStorageLocationUtil {
TraceScope scope = TraceScope scope =
Trace.startSpan("getHdfsBlocksMetadata", parentSpan); Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
try { try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration, cdp = DFSUtilClient.createClientDatanodeProtocolProxy(
datanode, configuration,
timeout, connectToDnViaHostname); timeout, connectToDnViaHostname);
metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens); metadata = cdp.getHdfsBlocksMetadata(poolId, blockIds, dnTokens);
} catch (IOException e) { } catch (IOException e) {

View File

@ -80,8 +80,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024; public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
public static final String DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves"; public static final String DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY = "dfs.datanode.balance.max.concurrent.moves";
public static final int DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5; public static final int DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT = 5;
public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; @Deprecated
public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 4 * 1024 * 1024; // 4MB public static final String DFS_DATANODE_READAHEAD_BYTES_KEY =
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY;
@Deprecated
public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT =
HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes"; public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false; public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes"; public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
@ -505,7 +509,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_WEB_UGI_KEY = "dfs.web.ugi"; public static final String DFS_WEB_UGI_KEY = "dfs.web.ugi";
public static final String DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup"; public static final String DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup";
public static final String DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file"; public static final String DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file";
public static final String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY = "dfs.datanode.kerberos.principal"; public static final String DFS_DATANODE_KERBEROS_PRINCIPAL_KEY =
HdfsClientConfigKeys.DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
@Deprecated @Deprecated
public static final String DFS_DATANODE_USER_NAME_KEY = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY; public static final String DFS_DATANODE_USER_NAME_KEY = DFS_DATANODE_KERBEROS_PRINCIPAL_KEY;
public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = "dfs.datanode.shared.file.descriptor.paths"; public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS = "dfs.datanode.shared.file.descriptor.paths";
@ -602,7 +607,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class"; public static final String DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY = "dfs.data.transfer.saslproperties.resolver.class";
public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100; public static final int DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES_DEFAULT = 100;
public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses"; public static final String DFS_NAMENODE_LIST_ENCRYPTION_ZONES_NUM_RESPONSES = "dfs.namenode.list.encryption.zones.num.responses";
public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI = "dfs.encryption.key.provider.uri"; public static final String DFS_ENCRYPTION_KEY_PROVIDER_URI =
HdfsClientConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI;
// Journal-node related configs. These are read on the JN side. // Journal-node related configs. These are read on the JN side.
public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir"; public static final String DFS_JOURNALNODE_EDITS_DIR_KEY = "dfs.journalnode.edits.dir";

View File

@ -364,7 +364,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
ClientDatanodeProtocol cdp = null; ClientDatanodeProtocol cdp = null;
try { try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, cdp = DFSUtilClient.createClientDatanodeProtocolProxy(datanode,
dfsClient.getConfiguration(), conf.getSocketTimeout(), dfsClient.getConfiguration(), conf.getSocketTimeout(),
conf.isConnectToDnViaHostname(), locatedblock); conf.isConnectToDnViaHostname(), locatedblock);

View File

@ -53,8 +53,6 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import javax.net.SocketFactory;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser; import org.apache.commons.cli.CommandLineParser;
@ -75,12 +73,8 @@ import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.http.HttpConfig;
@ -934,29 +928,6 @@ public class DFSUtil {
public static int roundBytesToGB(long bytes) { public static int roundBytesToGB(long bytes) {
return Math.round((float)bytes/ 1024 / 1024 / 1024); return Math.round((float)bytes/ 1024 / 1024 / 1024);
} }
/** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
connectToDnViaHostname, locatedBlock);
}
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
boolean connectToDnViaHostname) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(
datanodeid, conf, socketTimeout, connectToDnViaHostname);
}
/** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
SocketFactory factory) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(addr, ticket, conf, factory);
}
/** /**
* Get nameservice Id for the {@link NameNode} based on namenode RPC address * Get nameservice Id for the {@link NameNode} based on namenode RPC address
@ -1484,41 +1455,6 @@ public class DFSUtil {
} }
} }
/**
* Creates a new KeyProvider from the given Configuration.
*
* @param conf Configuration
* @return new KeyProvider, or null if no provider was found.
* @throws IOException if the KeyProvider is improperly specified in
* the Configuration
*/
public static KeyProvider createKeyProvider(
final Configuration conf) throws IOException {
final String providerUriStr =
conf.getTrimmed(DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI, "");
// No provider set in conf
if (providerUriStr.isEmpty()) {
return null;
}
final URI providerUri;
try {
providerUri = new URI(providerUriStr);
} catch (URISyntaxException e) {
throw new IOException(e);
}
KeyProvider keyProvider = KeyProviderFactory.get(providerUri, conf);
if (keyProvider == null) {
throw new IOException("Could not instantiate KeyProvider from " +
DFSConfigKeys.DFS_ENCRYPTION_KEY_PROVIDER_URI + " setting of '" +
providerUriStr +"'");
}
if (keyProvider.isTransient()) {
throw new IOException("KeyProvider " + keyProvider.toString()
+ " was found but it is a transient provider.");
}
return keyProvider;
}
/** /**
* Creates a new KeyProviderCryptoExtension by wrapping the * Creates a new KeyProviderCryptoExtension by wrapping the
* KeyProvider specified in the given Configuration. * KeyProvider specified in the given Configuration.
@ -1530,7 +1466,7 @@ public class DFSUtil {
*/ */
public static KeyProviderCryptoExtension createKeyProviderCryptoExtension( public static KeyProviderCryptoExtension createKeyProviderCryptoExtension(
final Configuration conf) throws IOException { final Configuration conf) throws IOException {
KeyProvider keyProvider = createKeyProvider(conf); KeyProvider keyProvider = DFSUtilClient.createKeyProvider(conf);
if (keyProvider == null) { if (keyProvider == null) {
return null; return null;
} }

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCustomProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReplaceBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto;
@ -115,7 +114,7 @@ public abstract class Receiver implements DataTransferProtocol {
TraceScope traceScope = continueTraceSpan(proto.getHeader(), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName()); proto.getClass().getSimpleName());
try { try {
readBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), readBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(), proto.getHeader().getClientName(),
proto.getOffset(), proto.getOffset(),
@ -136,7 +135,7 @@ public abstract class Receiver implements DataTransferProtocol {
TraceScope traceScope = continueTraceSpan(proto.getHeader(), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName()); proto.getClass().getSimpleName());
try { try {
writeBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), writeBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelperClient.convertStorageType(proto.getStorageType()), PBHelperClient.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(), proto.getHeader().getClientName(),
@ -167,7 +166,7 @@ public abstract class Receiver implements DataTransferProtocol {
TraceScope traceScope = continueTraceSpan(proto.getHeader(), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName()); proto.getClass().getSimpleName());
try { try {
transferBlock(PBHelper.convert(proto.getHeader().getBaseHeader().getBlock()), transferBlock(PBHelperClient.convert(proto.getHeader().getBaseHeader().getBlock()),
PBHelper.convert(proto.getHeader().getBaseHeader().getToken()), PBHelper.convert(proto.getHeader().getBaseHeader().getToken()),
proto.getHeader().getClientName(), proto.getHeader().getClientName(),
targets, targets,
@ -186,7 +185,7 @@ public abstract class Receiver implements DataTransferProtocol {
TraceScope traceScope = continueTraceSpan(proto.getHeader(), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName()); proto.getClass().getSimpleName());
try { try {
requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), requestShortCircuitFds(PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken()), PBHelper.convert(proto.getHeader().getToken()),
slotId, proto.getMaxVersion(), slotId, proto.getMaxVersion(),
proto.getSupportsReceiptVerification()); proto.getSupportsReceiptVerification());
@ -228,7 +227,7 @@ public abstract class Receiver implements DataTransferProtocol {
TraceScope traceScope = continueTraceSpan(proto.getHeader(), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName()); proto.getClass().getSimpleName());
try { try {
replaceBlock(PBHelper.convert(proto.getHeader().getBlock()), replaceBlock(PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelperClient.convertStorageType(proto.getStorageType()), PBHelperClient.convertStorageType(proto.getStorageType()),
PBHelper.convert(proto.getHeader().getToken()), PBHelper.convert(proto.getHeader().getToken()),
proto.getDelHint(), proto.getDelHint(),
@ -244,7 +243,7 @@ public abstract class Receiver implements DataTransferProtocol {
TraceScope traceScope = continueTraceSpan(proto.getHeader(), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName()); proto.getClass().getSimpleName());
try { try {
copyBlock(PBHelper.convert(proto.getHeader().getBlock()), copyBlock(PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken())); PBHelper.convert(proto.getHeader().getToken()));
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();
@ -257,7 +256,7 @@ public abstract class Receiver implements DataTransferProtocol {
TraceScope traceScope = continueTraceSpan(proto.getHeader(), TraceScope traceScope = continueTraceSpan(proto.getHeader(),
proto.getClass().getSimpleName()); proto.getClass().getSimpleName());
try { try {
blockChecksum(PBHelper.convert(proto.getHeader().getBlock()), blockChecksum(PBHelperClient.convert(proto.getHeader().getBlock()),
PBHelper.convert(proto.getHeader().getToken())); PBHelper.convert(proto.getHeader().getToken()));
} finally { } finally {
if (traceScope != null) traceScope.close(); if (traceScope != null) traceScope.close();

View File

@ -95,7 +95,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
throws ServiceException { throws ServiceException {
long len; long len;
try { try {
len = impl.getReplicaVisibleLength(PBHelper.convert(request.getBlock())); len = impl.getReplicaVisibleLength(PBHelperClient.convert(request.getBlock()));
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
@ -132,7 +132,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
throws ServiceException { throws ServiceException {
BlockLocalPathInfo resp; BlockLocalPathInfo resp;
try { try {
resp = impl.getBlockLocalPathInfo(PBHelper.convert(request.getBlock()), PBHelper.convert(request.getToken())); resp = impl.getBlockLocalPathInfo(PBHelperClient.convert(request.getBlock()), PBHelper.convert(request.getToken()));
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
@ -150,7 +150,7 @@ public class ClientDatanodeProtocolServerSideTranslatorPB implements
try { try {
String poolId = request.getBlockPoolId(); String poolId = request.getBlockPoolId();
List<Token<BlockTokenIdentifier>> tokens = List<Token<BlockTokenIdentifier>> tokens =
new ArrayList<Token<BlockTokenIdentifier>>(request.getTokensCount()); new ArrayList<Token<BlockTokenIdentifier>>(request.getTokensCount());
for (TokenProto b : request.getTokensList()) { for (TokenProto b : request.getTokensList()) {
tokens.add(PBHelper.convert(b)); tokens.add(PBHelper.convert(b));

View File

@ -477,7 +477,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public AbandonBlockResponseProto abandonBlock(RpcController controller, public AbandonBlockResponseProto abandonBlock(RpcController controller,
AbandonBlockRequestProto req) throws ServiceException { AbandonBlockRequestProto req) throws ServiceException {
try { try {
server.abandonBlock(PBHelper.convert(req.getB()), req.getFileId(), server.abandonBlock(PBHelperClient.convert(req.getB()), req.getFileId(),
req.getSrc(), req.getHolder()); req.getSrc(), req.getHolder());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
@ -495,7 +495,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
LocatedBlock result = server.addBlock( LocatedBlock result = server.addBlock(
req.getSrc(), req.getSrc(),
req.getClientName(), req.getClientName(),
req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null, req.hasPrevious() ? PBHelperClient.convert(req.getPrevious()) : null,
(excl == null || excl.size() == 0) ? null : PBHelper.convert(excl (excl == null || excl.size() == 0) ? null : PBHelper.convert(excl
.toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(), .toArray(new DatanodeInfoProto[excl.size()])), req.getFileId(),
(favor == null || favor.size() == 0) ? null : favor (favor == null || favor.size() == 0) ? null : favor
@ -516,7 +516,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
List<String> existingStorageIDsList = req.getExistingStorageUuidsList(); List<String> existingStorageIDsList = req.getExistingStorageUuidsList();
List<DatanodeInfoProto> excludesList = req.getExcludesList(); List<DatanodeInfoProto> excludesList = req.getExcludesList();
LocatedBlock result = server.getAdditionalDatanode(req.getSrc(), LocatedBlock result = server.getAdditionalDatanode(req.getSrc(),
req.getFileId(), PBHelper.convert(req.getBlk()), req.getFileId(), PBHelperClient.convert(req.getBlk()),
PBHelper.convert(existingList.toArray( PBHelper.convert(existingList.toArray(
new DatanodeInfoProto[existingList.size()])), new DatanodeInfoProto[existingList.size()])),
existingStorageIDsList.toArray( existingStorageIDsList.toArray(
@ -538,7 +538,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
try { try {
boolean result = boolean result =
server.complete(req.getSrc(), req.getClientName(), server.complete(req.getSrc(), req.getClientName(),
req.hasLast() ? PBHelper.convert(req.getLast()) : null, req.hasLast() ? PBHelperClient.convert(req.getLast()) : null,
req.hasFileId() ? req.getFileId() : HdfsConstants.GRANDFATHER_INODE_ID); req.hasFileId() ? req.getFileId() : HdfsConstants.GRANDFATHER_INODE_ID);
return CompleteResponseProto.newBuilder().setResult(result).build(); return CompleteResponseProto.newBuilder().setResult(result).build();
} catch (IOException e) { } catch (IOException e) {
@ -956,7 +956,7 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throws ServiceException { throws ServiceException {
try { try {
LocatedBlockProto result = PBHelper.convert(server LocatedBlockProto result = PBHelper.convert(server
.updateBlockForPipeline(PBHelper.convert(req.getBlock()), .updateBlockForPipeline(PBHelperClient.convert(req.getBlock()),
req.getClientName())); req.getClientName()));
return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result) return UpdateBlockForPipelineResponseProto.newBuilder().setBlock(result)
.build(); .build();
@ -972,8 +972,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
List<DatanodeIDProto> newNodes = req.getNewNodesList(); List<DatanodeIDProto> newNodes = req.getNewNodesList();
List<String> newStorageIDs = req.getStorageIDsList(); List<String> newStorageIDs = req.getStorageIDsList();
server.updatePipeline(req.getClientName(), server.updatePipeline(req.getClientName(),
PBHelper.convert(req.getOldBlock()), PBHelperClient.convert(req.getOldBlock()),
PBHelper.convert(req.getNewBlock()), PBHelperClient.convert(req.getNewBlock()),
PBHelper.convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])), PBHelper.convert(newNodes.toArray(new DatanodeIDProto[newNodes.size()])),
newStorageIDs.toArray(new String[newStorageIDs.size()])); newStorageIDs.toArray(new String[newStorageIDs.size()]));
return VOID_UPDATEPIPELINE_RESPONSE; return VOID_UPDATEPIPELINE_RESPONSE;

View File

@ -281,7 +281,7 @@ public class DatanodeProtocolServerSideTranslatorPB implements
final List<String> sidprotos = request.getNewTargetStoragesList(); final List<String> sidprotos = request.getNewTargetStoragesList();
final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]); final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]);
try { try {
impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()), impl.commitBlockSynchronization(PBHelperClient.convert(request.getBlock()),
request.getNewGenStamp(), request.getNewLength(), request.getNewGenStamp(), request.getNewLength(),
request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs); request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs);
} catch (IOException e) { } catch (IOException e) {

View File

@ -76,7 +76,7 @@ public class InterDatanodeProtocolServerSideTranslatorPB implements
final String storageID; final String storageID;
try { try {
storageID = impl.updateReplicaUnderRecovery( storageID = impl.updateReplicaUnderRecovery(
PBHelper.convert(request.getBlock()), request.getRecoveryId(), PBHelperClient.convert(request.getBlock()), request.getRecoveryId(),
request.getNewBlockId(), request.getNewLength()); request.getNewBlockId(), request.getNewLength());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);

View File

@ -23,9 +23,7 @@ import static org.apache.hadoop.hdfs.protocol.proto.EncryptionZonesProtos
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherSuiteProto; import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CipherSuiteProto;
import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto; import static org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.CryptoProtocolVersionProto;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
@ -110,7 +108,6 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsS
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
@ -146,7 +143,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto.StorageState; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeStorageProto.StorageState;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DirectoryListingProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExportedBlockKeysProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsPermissionProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsPermissionProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsServerDefaultsProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FsServerDefaultsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto;
@ -219,20 +215,16 @@ import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.hdfs.util.ExactSizeInputStream;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.primitives.Shorts; import com.google.common.primitives.Shorts;
import com.google.protobuf.ByteString; import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
/** /**
* Utilities for converting protobuf classes to and from implementation classes * Utilities for converting protobuf classes to and from implementation classes
@ -575,13 +567,7 @@ public class PBHelper {
return new NamenodeCommand(cmd.getAction()); return new NamenodeCommand(cmd.getAction());
} }
} }
public static ExtendedBlock convert(ExtendedBlockProto eb) {
if (eb == null) return null;
return new ExtendedBlock( eb.getPoolId(), eb.getBlockId(), eb.getNumBytes(),
eb.getGenerationStamp());
}
public static RecoveringBlockProto convert(RecoveringBlock b) { public static RecoveringBlockProto convert(RecoveringBlock b) {
if (b == null) { if (b == null) {
return null; return null;
@ -595,7 +581,7 @@ public class PBHelper {
} }
public static RecoveringBlock convert(RecoveringBlockProto b) { public static RecoveringBlock convert(RecoveringBlockProto b) {
ExtendedBlock block = convert(b.getBlock().getB()); ExtendedBlock block = PBHelperClient.convert(b.getBlock().getB());
DatanodeInfo[] locs = convert(b.getBlock().getLocsList()); DatanodeInfo[] locs = convert(b.getBlock().getLocsList());
return (b.hasTruncateBlock()) ? return (b.hasTruncateBlock()) ?
new RecoveringBlock(block, locs, PBHelper.convert(b.getTruncateBlock())) : new RecoveringBlock(block, locs, PBHelper.convert(b.getTruncateBlock())) :
@ -741,7 +727,7 @@ public class PBHelper {
} }
} }
LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets, LocatedBlock lb = new LocatedBlock(PBHelperClient.convert(proto.getB()), targets,
storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(), storageIDs, storageTypes, proto.getOffset(), proto.getCorrupt(),
cachedLocs.toArray(new DatanodeInfo[0])); cachedLocs.toArray(new DatanodeInfo[0]));
lb.setBlockToken(PBHelper.convert(proto.getBlockToken())); lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
@ -2120,12 +2106,6 @@ public class PBHelper {
return builder.build(); return builder.build();
} }
public static DatanodeLocalInfo convert(DatanodeLocalInfoProto proto) {
return new DatanodeLocalInfo(proto.getSoftwareVersion(),
proto.getConfigVersion(), proto.getUptime());
}
private static AclEntryScopeProto convert(AclEntryScope v) { private static AclEntryScopeProto convert(AclEntryScope v) {
return AclEntryScopeProto.valueOf(v.ordinal()); return AclEntryScopeProto.valueOf(v.ordinal());
} }

View File

@ -125,8 +125,8 @@ public class DNConf {
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
readaheadLength = conf.getLong( readaheadLength = conf.getLong(
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
dropCacheBehindWrites = conf.getBoolean( dropCacheBehindWrites = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);

View File

@ -52,6 +52,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.shell.Command; import org.apache.hadoop.fs.shell.Command;
import org.apache.hadoop.fs.shell.CommandFormat; import org.apache.hadoop.fs.shell.CommandFormat;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient; import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.client.BlockReportOptions; import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -1940,7 +1941,7 @@ public class DFSAdmin extends FsShell {
// Create the client // Create the client
ClientDatanodeProtocol dnProtocol = ClientDatanodeProtocol dnProtocol =
DFSUtil.createClientDatanodeProtocolProxy(datanodeAddr, getUGI(), conf, DFSUtilClient.createClientDatanodeProtocolProxy(datanodeAddr, getUGI(), conf,
NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class)); NetUtils.getSocketFactory(conf, ClientDatanodeProtocol.class));
return dnProtocol; return dnProtocol;
} }

View File

@ -247,7 +247,7 @@ public class TestBlockReaderLocal {
@Test @Test
public void testBlockReaderSimpleReads() throws IOException { public void testBlockReaderSimpleReads() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
@ -259,7 +259,7 @@ public class TestBlockReaderLocal {
@Test @Test
public void testBlockReaderSimpleReadsNoChecksum() throws IOException { public void testBlockReaderSimpleReadsNoChecksum() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
@ -297,14 +297,14 @@ public class TestBlockReaderLocal {
@Test @Test
public void testBlockReaderLocalArrayReads2() throws IOException { public void testBlockReaderLocalArrayReads2() throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
public void testBlockReaderLocalArrayReads2NoChecksum() public void testBlockReaderLocalArrayReads2NoChecksum()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(),
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
@ -341,7 +341,7 @@ public class TestBlockReaderLocal {
public void testBlockReaderLocalByteBufferReads() public void testBlockReaderLocalByteBufferReads()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(),
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
@ -349,7 +349,7 @@ public class TestBlockReaderLocal {
throws IOException { throws IOException {
runBlockReaderLocalTest( runBlockReaderLocalTest(
new TestBlockReaderLocalByteBufferReads(), new TestBlockReaderLocalByteBufferReads(),
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
@ -473,7 +473,7 @@ public class TestBlockReaderLocal {
public void testBlockReaderLocalReadCorruptStart() public void testBlockReaderLocalReadCorruptStart()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true, runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
private static class TestBlockReaderLocalReadCorrupt private static class TestBlockReaderLocalReadCorrupt
@ -524,14 +524,14 @@ public class TestBlockReaderLocal {
public void testBlockReaderLocalReadCorrupt() public void testBlockReaderLocalReadCorrupt()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
public void testBlockReaderLocalReadCorruptNoChecksum() public void testBlockReaderLocalReadCorruptNoChecksum()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false,
DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
@ -576,14 +576,14 @@ public class TestBlockReaderLocal {
public void testBlockReaderLocalWithMlockChanges() public void testBlockReaderLocalWithMlockChanges()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
public void testBlockReaderLocalWithMlockChangesNoChecksum() public void testBlockReaderLocalWithMlockChangesNoChecksum()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(),
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
@ -649,14 +649,14 @@ public class TestBlockReaderLocal {
public void testBlockReaderLocalOnFileWithoutChecksum() public void testBlockReaderLocalOnFileWithoutChecksum()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum() public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(),
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
@ -677,14 +677,14 @@ public class TestBlockReaderLocal {
public void testBlockReaderLocalReadZeroBytes() public void testBlockReaderLocalReadZeroBytes()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
true, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test
public void testBlockReaderLocalReadZeroBytesNoChecksum() public void testBlockReaderLocalReadZeroBytesNoChecksum()
throws IOException { throws IOException {
runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(),
false, DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
} }
@Test @Test

View File

@ -182,7 +182,7 @@ public class TestBlockReaderLocalLegacy {
{ {
final LocatedBlock lb = cluster.getNameNode().getRpcServer() final LocatedBlock lb = cluster.getNameNode().getRpcServer()
.getBlockLocations(path.toString(), 0, 1).get(0); .getBlockLocations(path.toString(), 0, 1).get(0);
proxy = DFSUtil.createClientDatanodeProtocolProxy( proxy = DFSUtilClient.createClientDatanodeProtocolProxy(
lb.getLocations()[0], conf, 60000, false); lb.getLocations()[0], conf, 60000, false);
token = lb.getBlockToken(); token = lb.getBlockToken();

View File

@ -856,7 +856,7 @@ public class TestDFSClientRetries {
ClientDatanodeProtocol proxy = null; ClientDatanodeProtocol proxy = null;
try { try {
proxy = DFSUtil.createClientDatanodeProtocolProxy( proxy = DFSUtilClient.createClientDatanodeProtocolProxy(
fakeDnId, conf, 500, false, fakeBlock); fakeDnId, conf, 500, false, fakeBlock);
proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1)); proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1));

View File

@ -333,12 +333,12 @@ public class TestPBHelper {
public void testConvertExtendedBlock() { public void testConvertExtendedBlock() {
ExtendedBlock b = getExtendedBlock(); ExtendedBlock b = getExtendedBlock();
ExtendedBlockProto bProto = PBHelperClient.convert(b); ExtendedBlockProto bProto = PBHelperClient.convert(b);
ExtendedBlock b1 = PBHelper.convert(bProto); ExtendedBlock b1 = PBHelperClient.convert(bProto);
assertEquals(b, b1); assertEquals(b, b1);
b.setBlockId(-1); b.setBlockId(-1);
bProto = PBHelperClient.convert(b); bProto = PBHelperClient.convert(b);
b1 = PBHelper.convert(bProto); b1 = PBHelperClient.convert(bProto);
assertEquals(b, b1); assertEquals(b, b1);
} }

View File

@ -43,7 +43,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -57,7 +57,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.Client
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.GetReplicaVisibleLengthResponseProto;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.io.TestWritable; import org.apache.hadoop.io.TestWritable;
import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
@ -138,7 +138,7 @@ public class TestBlockToken {
BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId; BlockTokenIdentifier id = (BlockTokenIdentifier) tokenId;
LOG.info("Got: " + id.toString()); LOG.info("Got: " + id.toString());
assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id)); assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id));
sm.checkAccess(id, null, PBHelper.convert(req.getBlock()), sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()),
BlockTokenIdentifier.AccessMode.WRITE); BlockTokenIdentifier.AccessMode.WRITE);
result = id.getBlockId(); result = id.getBlockId();
} }
@ -259,7 +259,7 @@ public class TestBlockToken {
ClientDatanodeProtocol proxy = null; ClientDatanodeProtocol proxy = null;
try { try {
proxy = DFSUtil.createClientDatanodeProtocolProxy(addr, ticket, conf, proxy = DFSUtilClient.createClientDatanodeProtocolProxy(addr, ticket, conf,
NetUtils.getDefaultSocketFactory(conf)); NetUtils.getDefaultSocketFactory(conf));
assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3)); assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
} finally { } finally {
@ -313,7 +313,7 @@ public class TestBlockToken {
try { try {
long endTime = Time.now() + 3000; long endTime = Time.now() + 3000;
while (Time.now() < endTime) { while (Time.now() < endTime) {
proxy = DFSUtil.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000, proxy = DFSUtilClient.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000,
false, fakeBlock); false, fakeBlock);
assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3)); assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
if (proxy != null) { if (proxy != null) {

View File

@ -41,7 +41,7 @@ import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
@ -367,7 +367,7 @@ public class TestShortCircuitLocalRead {
Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken(); Token<BlockTokenIdentifier> token = lb.get(0).getBlockToken();
final DatanodeInfo dnInfo = lb.get(0).getLocations()[0]; final DatanodeInfo dnInfo = lb.get(0).getLocations()[0];
ClientDatanodeProtocol proxy = ClientDatanodeProtocol proxy =
DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false); DFSUtilClient.createClientDatanodeProtocolProxy(dnInfo, conf, 60000, false);
try { try {
proxy.getBlockLocalPathInfo(blk, token); proxy.getBlockLocalPathInfo(blk, token);
Assert.fail("The call should have failed as this user " Assert.fail("The call should have failed as this user "