HDFS-2161. Move createNamenode(..), createClientDatanodeProtocolProxy(..) and Random object creation to DFSUtil; move DFSClient.stringifyToken(..) to DelegationTokenIdentifier.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1148348 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6c0cb4d151
commit
710e5a960e
|
@ -575,6 +575,10 @@ Trunk (unreleased changes)
|
||||||
HDFS-2141. Remove NameNode roles Active and Standby (they become
|
HDFS-2141. Remove NameNode roles Active and Standby (they become
|
||||||
states of the namenode). (suresh)
|
states of the namenode). (suresh)
|
||||||
|
|
||||||
|
HDFS-2161. Move createNamenode(..), createClientDatanodeProtocolProxy(..)
|
||||||
|
and Random object creation to DFSUtil; move DFSClient.stringifyToken(..)
|
||||||
|
to DelegationTokenIdentifier. (szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
|
||||||
|
|
|
@ -19,7 +19,6 @@
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import java.io.BufferedOutputStream;
|
import java.io.BufferedOutputStream;
|
||||||
import java.io.ByteArrayInputStream;
|
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
|
@ -31,8 +30,6 @@ import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
|
@ -56,12 +53,9 @@ import org.apache.hadoop.fs.ParentNotDirectoryException;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -89,9 +83,6 @@ import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.MD5Hash;
|
import org.apache.hadoop.io.MD5Hash;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
|
||||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
|
||||||
import org.apache.hadoop.io.retry.RetryProxy;
|
|
||||||
import org.apache.hadoop.ipc.Client;
|
import org.apache.hadoop.ipc.Client;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -124,7 +115,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
||||||
volatile boolean clientRunning = true;
|
volatile boolean clientRunning = true;
|
||||||
private volatile FsServerDefaults serverDefaults;
|
private volatile FsServerDefaults serverDefaults;
|
||||||
private volatile long serverDefaultsLastUpdate;
|
private volatile long serverDefaultsLastUpdate;
|
||||||
static Random r = new Random();
|
|
||||||
final String clientName;
|
final String clientName;
|
||||||
Configuration conf;
|
Configuration conf;
|
||||||
SocketFactory socketFactory;
|
SocketFactory socketFactory;
|
||||||
|
@ -217,79 +207,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
||||||
private final Map<String, DFSOutputStream> filesBeingWritten
|
private final Map<String, DFSOutputStream> filesBeingWritten
|
||||||
= new HashMap<String, DFSOutputStream>();
|
= new HashMap<String, DFSOutputStream>();
|
||||||
|
|
||||||
/** Create a {@link NameNode} proxy */
|
|
||||||
public static ClientProtocol createNamenode(Configuration conf) throws IOException {
|
|
||||||
return createNamenode(NameNode.getAddress(conf), conf);
|
|
||||||
}
|
|
||||||
|
|
||||||
public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
|
|
||||||
Configuration conf) throws IOException {
|
|
||||||
return createNamenode(createRPCNamenode(nameNodeAddr, conf,
|
|
||||||
UserGroupInformation.getCurrentUser()));
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
|
|
||||||
Configuration conf, UserGroupInformation ugi)
|
|
||||||
throws IOException {
|
|
||||||
return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
|
|
||||||
ClientProtocol.versionID, nameNodeAddr, ugi, conf,
|
|
||||||
NetUtils.getSocketFactory(conf, ClientProtocol.class));
|
|
||||||
}
|
|
||||||
|
|
||||||
private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
|
|
||||||
throws IOException {
|
|
||||||
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
|
||||||
5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
|
|
||||||
|
|
||||||
Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
|
|
||||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
||||||
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
|
|
||||||
|
|
||||||
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
|
|
||||||
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
|
||||||
exceptionToPolicyMap.put(RemoteException.class,
|
|
||||||
RetryPolicies.retryByRemoteException(
|
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
|
|
||||||
RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
|
||||||
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
|
|
||||||
|
|
||||||
methodNameToPolicyMap.put("create", methodPolicy);
|
|
||||||
|
|
||||||
return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
|
|
||||||
rpcNamenode, methodNameToPolicyMap);
|
|
||||||
}
|
|
||||||
|
|
||||||
static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
|
||||||
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
|
||||||
LocatedBlock locatedBlock)
|
|
||||||
throws IOException {
|
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(
|
|
||||||
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
|
|
||||||
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
|
|
||||||
ClientDatanodeProtocol.LOG.debug("ClientDatanodeProtocol addr=" + addr);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Since we're creating a new UserGroupInformation here, we know that no
|
|
||||||
// future RPC proxies will be able to re-use the same connection. And
|
|
||||||
// usages of this proxy tend to be one-off calls.
|
|
||||||
//
|
|
||||||
// This is a temporary fix: callers should really achieve this by using
|
|
||||||
// RPC.stopProxy() on the resulting object, but this is currently not
|
|
||||||
// working in trunk. See the discussion on HDFS-1965.
|
|
||||||
Configuration confWithNoIpcIdle = new Configuration(conf);
|
|
||||||
confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic
|
|
||||||
.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
|
|
||||||
|
|
||||||
UserGroupInformation ticket = UserGroupInformation
|
|
||||||
.createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
|
|
||||||
ticket.addToken(locatedBlock.getBlockToken());
|
|
||||||
return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
|
|
||||||
ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle,
|
|
||||||
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Same as this(NameNode.getAddress(conf), conf);
|
* Same as this(NameNode.getAddress(conf), conf);
|
||||||
* @see #DFSClient(InetSocketAddress, Configuration)
|
* @see #DFSClient(InetSocketAddress, Configuration)
|
||||||
|
@ -342,8 +259,8 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
||||||
this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
|
this.clientName = leaserenewer.getClientName(dfsClientConf.taskId);
|
||||||
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
|
this.socketCache = new SocketCache(dfsClientConf.socketCacheCapacity);
|
||||||
if (nameNodeAddr != null && rpcNamenode == null) {
|
if (nameNodeAddr != null && rpcNamenode == null) {
|
||||||
this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
|
this.rpcNamenode = DFSUtil.createRPCNamenode(nameNodeAddr, conf, ugi);
|
||||||
this.namenode = createNamenode(this.rpcNamenode);
|
this.namenode = DFSUtil.createNamenode(this.rpcNamenode);
|
||||||
} else if (nameNodeAddr == null && rpcNamenode != null) {
|
} else if (nameNodeAddr == null && rpcNamenode != null) {
|
||||||
//This case is used for testing.
|
//This case is used for testing.
|
||||||
this.namenode = this.rpcNamenode = rpcNamenode;
|
this.namenode = this.rpcNamenode = rpcNamenode;
|
||||||
|
@ -506,27 +423,6 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
||||||
return serverDefaults;
|
return serverDefaults;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A test method for printing out tokens
|
|
||||||
* @param token
|
|
||||||
* @return Stringify version of the token
|
|
||||||
*/
|
|
||||||
public static String stringifyToken(Token<DelegationTokenIdentifier> token)
|
|
||||||
throws IOException {
|
|
||||||
DelegationTokenIdentifier ident = new DelegationTokenIdentifier();
|
|
||||||
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
|
||||||
DataInputStream in = new DataInputStream(buf);
|
|
||||||
ident.readFields(in);
|
|
||||||
String str = ident.getKind() + " token " + ident.getSequenceNumber() +
|
|
||||||
" for " + ident.getUser().getShortUserName();
|
|
||||||
if (token.getService().getLength() > 0) {
|
|
||||||
return (str + " on " + token.getService());
|
|
||||||
} else {
|
|
||||||
return str;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @see ClientProtocol#getDelegationToken(Text)
|
* @see ClientProtocol#getDelegationToken(Text)
|
||||||
*/
|
*/
|
||||||
|
@ -534,7 +430,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Token<DelegationTokenIdentifier> result =
|
Token<DelegationTokenIdentifier> result =
|
||||||
namenode.getDelegationToken(renewer);
|
namenode.getDelegationToken(renewer);
|
||||||
LOG.info("Created " + stringifyToken(result));
|
LOG.info("Created " + DelegationTokenIdentifier.stringifyToken(result));
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -543,7 +439,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
||||||
*/
|
*/
|
||||||
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||||
throws InvalidToken, IOException {
|
throws InvalidToken, IOException {
|
||||||
LOG.info("Renewing " + stringifyToken(token));
|
LOG.info("Renewing " + DelegationTokenIdentifier.stringifyToken(token));
|
||||||
try {
|
try {
|
||||||
return namenode.renewDelegationToken(token);
|
return namenode.renewDelegationToken(token);
|
||||||
} catch (RemoteException re) {
|
} catch (RemoteException re) {
|
||||||
|
@ -557,7 +453,7 @@ public class DFSClient implements FSConstants, java.io.Closeable {
|
||||||
*/
|
*/
|
||||||
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
|
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
|
||||||
throws InvalidToken, IOException {
|
throws InvalidToken, IOException {
|
||||||
LOG.info("Cancelling " + stringifyToken(token));
|
LOG.info("Cancelling " + DelegationTokenIdentifier.stringifyToken(token));
|
||||||
try {
|
try {
|
||||||
namenode.cancelDelegationToken(token);
|
namenode.cancelDelegationToken(token);
|
||||||
} catch (RemoteException re) {
|
} catch (RemoteException re) {
|
||||||
|
|
|
@ -35,13 +35,13 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.ChecksumException;
|
import org.apache.hadoop.fs.ChecksumException;
|
||||||
import org.apache.hadoop.fs.FSInputStream;
|
import org.apache.hadoop.fs.FSInputStream;
|
||||||
import org.apache.hadoop.fs.UnresolvedLinkException;
|
import org.apache.hadoop.fs.UnresolvedLinkException;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||||
|
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
@ -157,7 +157,7 @@ public class DFSInputStream extends FSInputStream {
|
||||||
ClientDatanodeProtocol cdp = null;
|
ClientDatanodeProtocol cdp = null;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
cdp = DFSClient.createClientDatanodeProtocolProxy(
|
cdp = DFSUtil.createClientDatanodeProtocolProxy(
|
||||||
datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock);
|
datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock);
|
||||||
|
|
||||||
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
|
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
|
||||||
|
@ -625,7 +625,7 @@ public class DFSInputStream extends FSInputStream {
|
||||||
// will wait 6000ms grace period before retry and the waiting window is
|
// will wait 6000ms grace period before retry and the waiting window is
|
||||||
// expanded to 9000ms.
|
// expanded to 9000ms.
|
||||||
double waitTime = timeWindow * failures + // grace period for the last round of attempt
|
double waitTime = timeWindow * failures + // grace period for the last round of attempt
|
||||||
timeWindow * (failures + 1) * dfsClient.r.nextDouble(); // expanding time window for each failure
|
timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
|
||||||
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
|
DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
|
||||||
Thread.sleep((long)waitTime);
|
Thread.sleep((long)waitTime);
|
||||||
} catch (InterruptedException iex) {
|
} catch (InterruptedException iex) {
|
||||||
|
|
|
@ -18,30 +18,62 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICES;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_FEDERATION_NAMESERVICE_ID;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.UnsupportedEncodingException;
|
import java.io.UnsupportedEncodingException;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Random;
|
||||||
import java.util.StringTokenizer;
|
import java.util.StringTokenizer;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.BlockLocation;
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||||
|
import org.apache.hadoop.io.retry.RetryProxy;
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.NodeBase;
|
import org.apache.hadoop.net.NodeBase;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class DFSUtil {
|
public class DFSUtil {
|
||||||
|
private static final ThreadLocal<Random> RANDOM = new ThreadLocal<Random>() {
|
||||||
|
@Override
|
||||||
|
protected Random initialValue() {
|
||||||
|
return new Random();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
/** @return a pseudorandom number generator. */
|
||||||
|
public static Random getRandom() {
|
||||||
|
return RANDOM.get();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Compartor for sorting DataNodeInfo[] based on decommissioned states.
|
* Compartor for sorting DataNodeInfo[] based on decommissioned states.
|
||||||
|
@ -586,4 +618,82 @@ public class DFSUtil {
|
||||||
public static int roundBytesToGB(long bytes) {
|
public static int roundBytesToGB(long bytes) {
|
||||||
return Math.round((float)bytes/ 1024 / 1024 / 1024);
|
return Math.round((float)bytes/ 1024 / 1024 / 1024);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/** Create a {@link NameNode} proxy */
|
||||||
|
public static ClientProtocol createNamenode(Configuration conf) throws IOException {
|
||||||
|
return createNamenode(NameNode.getAddress(conf), conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Create a {@link NameNode} proxy */
|
||||||
|
public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
|
||||||
|
Configuration conf) throws IOException {
|
||||||
|
return createNamenode(createRPCNamenode(nameNodeAddr, conf,
|
||||||
|
UserGroupInformation.getCurrentUser()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Create a {@link NameNode} proxy */
|
||||||
|
static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
|
||||||
|
Configuration conf, UserGroupInformation ugi)
|
||||||
|
throws IOException {
|
||||||
|
return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
|
||||||
|
ClientProtocol.versionID, nameNodeAddr, ugi, conf,
|
||||||
|
NetUtils.getSocketFactory(conf, ClientProtocol.class));
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Create a {@link NameNode} proxy */
|
||||||
|
static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
|
||||||
|
throws IOException {
|
||||||
|
RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
|
||||||
|
5, FSConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
|
||||||
|
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||||
|
remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
|
||||||
|
|
||||||
|
Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
|
||||||
|
new HashMap<Class<? extends Exception>, RetryPolicy>();
|
||||||
|
exceptionToPolicyMap.put(RemoteException.class,
|
||||||
|
RetryPolicies.retryByRemoteException(
|
||||||
|
RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
|
||||||
|
RetryPolicy methodPolicy = RetryPolicies.retryByException(
|
||||||
|
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
||||||
|
Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
|
||||||
|
|
||||||
|
methodNameToPolicyMap.put("create", methodPolicy);
|
||||||
|
|
||||||
|
return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
|
||||||
|
rpcNamenode, methodNameToPolicyMap);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Create a {@link ClientDatanodeProtocol} proxy */
|
||||||
|
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
||||||
|
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
||||||
|
LocatedBlock locatedBlock)
|
||||||
|
throws IOException {
|
||||||
|
InetSocketAddress addr = NetUtils.createSocketAddr(
|
||||||
|
datanodeid.getHost() + ":" + datanodeid.getIpcPort());
|
||||||
|
if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
|
||||||
|
ClientDatanodeProtocol.LOG.debug("ClientDatanodeProtocol addr=" + addr);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Since we're creating a new UserGroupInformation here, we know that no
|
||||||
|
// future RPC proxies will be able to re-use the same connection. And
|
||||||
|
// usages of this proxy tend to be one-off calls.
|
||||||
|
//
|
||||||
|
// This is a temporary fix: callers should really achieve this by using
|
||||||
|
// RPC.stopProxy() on the resulting object, but this is currently not
|
||||||
|
// working in trunk. See the discussion on HDFS-1965.
|
||||||
|
Configuration confWithNoIpcIdle = new Configuration(conf);
|
||||||
|
confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic
|
||||||
|
.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
|
||||||
|
|
||||||
|
UserGroupInformation ticket = UserGroupInformation
|
||||||
|
.createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
|
||||||
|
ticket.addToken(locatedBlock.getBlockToken());
|
||||||
|
return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
|
||||||
|
ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle,
|
||||||
|
NetUtils.getDefaultSocketFactory(conf), socketTimeout);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ import java.security.PrivilegedExceptionAction;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.TimeZone;
|
import java.util.TimeZone;
|
||||||
import java.util.concurrent.DelayQueue;
|
import java.util.concurrent.DelayQueue;
|
||||||
import java.util.concurrent.Delayed;
|
import java.util.concurrent.Delayed;
|
||||||
|
@ -49,7 +48,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
@ -88,7 +86,6 @@ public class HftpFileSystem extends FileSystem {
|
||||||
private URI hdfsURI;
|
private URI hdfsURI;
|
||||||
protected InetSocketAddress nnAddr;
|
protected InetSocketAddress nnAddr;
|
||||||
protected UserGroupInformation ugi;
|
protected UserGroupInformation ugi;
|
||||||
protected final Random ran = new Random();
|
|
||||||
|
|
||||||
public static final String HFTP_TIMEZONE = "UTC";
|
public static final String HFTP_TIMEZONE = "UTC";
|
||||||
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
|
||||||
|
|
|
@ -155,7 +155,7 @@ class LeaseRenewer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private final String clienNamePostfix = DFSClient.r.nextInt()
|
private final String clienNamePostfix = DFSUtil.getRandom().nextInt()
|
||||||
+ "_" + Thread.currentThread().getId();
|
+ "_" + Thread.currentThread().getId();
|
||||||
|
|
||||||
/** The time in milliseconds that the map became empty. */
|
/** The time in milliseconds that the map became empty. */
|
||||||
|
|
|
@ -18,8 +18,13 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.security.token.delegation;
|
package org.apache.hadoop.hdfs.security.token.delegation;
|
||||||
|
|
||||||
|
import java.io.ByteArrayInputStream;
|
||||||
|
import java.io.DataInputStream;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -51,4 +56,23 @@ public class DelegationTokenIdentifier
|
||||||
return HDFS_DELEGATION_KIND;
|
return HDFS_DELEGATION_KIND;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return getKind() + " token " + getSequenceNumber()
|
||||||
|
+ " for " + getUser().getShortUserName();
|
||||||
|
}
|
||||||
|
|
||||||
|
/** @return a string representation of the token */
|
||||||
|
public static String stringifyToken(final Token<?> token) throws IOException {
|
||||||
|
DelegationTokenIdentifier ident = new DelegationTokenIdentifier();
|
||||||
|
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
|
||||||
|
DataInputStream in = new DataInputStream(buf);
|
||||||
|
ident.readFields(in);
|
||||||
|
|
||||||
|
if (token.getService().getLength() > 0) {
|
||||||
|
return ident + " on " + token.getService();
|
||||||
|
} else {
|
||||||
|
return ident.toString();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -186,7 +186,6 @@ public class Balancer {
|
||||||
private final NameNodeConnector nnc;
|
private final NameNodeConnector nnc;
|
||||||
private final BalancingPolicy policy;
|
private final BalancingPolicy policy;
|
||||||
private final double threshold;
|
private final double threshold;
|
||||||
private final static Random rnd = new Random();
|
|
||||||
|
|
||||||
// all data node lists
|
// all data node lists
|
||||||
private Collection<Source> overUtilizedDatanodes
|
private Collection<Source> overUtilizedDatanodes
|
||||||
|
@ -780,7 +779,7 @@ public class Balancer {
|
||||||
/* Shuffle datanode array */
|
/* Shuffle datanode array */
|
||||||
static private void shuffleArray(DatanodeInfo[] datanodes) {
|
static private void shuffleArray(DatanodeInfo[] datanodes) {
|
||||||
for (int i=datanodes.length; i>1; i--) {
|
for (int i=datanodes.length; i>1; i--) {
|
||||||
int randomIndex = rnd.nextInt(i);
|
int randomIndex = DFSUtil.getRandom().nextInt(i);
|
||||||
DatanodeInfo tmp = datanodes[randomIndex];
|
DatanodeInfo tmp = datanodes[randomIndex];
|
||||||
datanodes[randomIndex] = datanodes[i-1];
|
datanodes[randomIndex] = datanodes[i-1];
|
||||||
datanodes[i-1] = tmp;
|
datanodes[i-1] = tmp;
|
||||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -79,7 +79,7 @@ class NameNodeConnector {
|
||||||
) throws IOException {
|
) throws IOException {
|
||||||
this.namenodeAddress = namenodeAddress;
|
this.namenodeAddress = namenodeAddress;
|
||||||
this.namenode = createNamenode(namenodeAddress, conf);
|
this.namenode = createNamenode(namenodeAddress, conf);
|
||||||
this.client = DFSClient.createNamenode(conf);
|
this.client = DFSUtil.createNamenode(conf);
|
||||||
this.fs = FileSystem.get(NameNode.getUri(namenodeAddress), conf);
|
this.fs = FileSystem.get(NameNode.getUri(namenodeAddress), conf);
|
||||||
|
|
||||||
final NamespaceInfo namespaceinfo = namenode.versionRequest();
|
final NamespaceInfo namespaceinfo = namenode.versionRequest();
|
||||||
|
|
|
@ -28,7 +28,6 @@ import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
@ -37,6 +36,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
|
||||||
|
@ -154,7 +154,6 @@ public class BlockManager {
|
||||||
* Last block index used for replication work.
|
* Last block index used for replication work.
|
||||||
*/
|
*/
|
||||||
private int replIndex = 0;
|
private int replIndex = 0;
|
||||||
Random r = new Random();
|
|
||||||
|
|
||||||
// for block replicas placement
|
// for block replicas placement
|
||||||
public final BlockPlacementPolicy replicator;
|
public final BlockPlacementPolicy replicator;
|
||||||
|
@ -752,12 +751,12 @@ public class BlockManager {
|
||||||
int remainingNodes = numOfNodes - nodesToProcess;
|
int remainingNodes = numOfNodes - nodesToProcess;
|
||||||
if (nodesToProcess < remainingNodes) {
|
if (nodesToProcess < remainingNodes) {
|
||||||
for(int i=0; i<nodesToProcess; i++) {
|
for(int i=0; i<nodesToProcess; i++) {
|
||||||
int keyIndex = r.nextInt(numOfNodes-i)+i;
|
int keyIndex = DFSUtil.getRandom().nextInt(numOfNodes-i)+i;
|
||||||
Collections.swap(keyArray, keyIndex, i); // swap to front
|
Collections.swap(keyArray, keyIndex, i); // swap to front
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for(int i=0; i<remainingNodes; i++) {
|
for(int i=0; i<remainingNodes; i++) {
|
||||||
int keyIndex = r.nextInt(numOfNodes-i);
|
int keyIndex = DFSUtil.getRandom().nextInt(numOfNodes-i);
|
||||||
Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
|
Collections.swap(keyArray, keyIndex, numOfNodes-i-1); // swap to end
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1096,7 +1095,7 @@ public class BlockManager {
|
||||||
// switch to a different node randomly
|
// switch to a different node randomly
|
||||||
// this to prevent from deterministically selecting the same node even
|
// this to prevent from deterministically selecting the same node even
|
||||||
// if the node failed to replicate the block on previous iterations
|
// if the node failed to replicate the block on previous iterations
|
||||||
if(r.nextBoolean())
|
if(DFSUtil.getRandom().nextBoolean())
|
||||||
srcNode = node;
|
srcNode = node;
|
||||||
}
|
}
|
||||||
if(numReplicas != null)
|
if(numReplicas != null)
|
||||||
|
|
|
@ -18,12 +18,12 @@
|
||||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.locks.ReadWriteLock;
|
import java.util.concurrent.locks.ReadWriteLock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
|
||||||
/** A map from host names to datanode descriptors. */
|
/** A map from host names to datanode descriptors. */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -31,7 +31,6 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||||
class Host2NodesMap {
|
class Host2NodesMap {
|
||||||
private HashMap<String, DatanodeDescriptor[]> map
|
private HashMap<String, DatanodeDescriptor[]> map
|
||||||
= new HashMap<String, DatanodeDescriptor[]>();
|
= new HashMap<String, DatanodeDescriptor[]>();
|
||||||
private Random r = new Random();
|
|
||||||
private ReadWriteLock hostmapLock = new ReentrantReadWriteLock();
|
private ReadWriteLock hostmapLock = new ReentrantReadWriteLock();
|
||||||
|
|
||||||
/** Check if node is already in the map. */
|
/** Check if node is already in the map. */
|
||||||
|
@ -151,7 +150,7 @@ class Host2NodesMap {
|
||||||
return nodes[0];
|
return nodes[0];
|
||||||
}
|
}
|
||||||
// more than one node
|
// more than one node
|
||||||
return nodes[r.nextInt(nodes.length)];
|
return nodes[DFSUtil.getRandom().nextInt(nodes.length)];
|
||||||
} finally {
|
} finally {
|
||||||
hostmapLock.readLock().unlock();
|
hostmapLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,7 +31,6 @@ import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import javax.servlet.ServletContext;
|
import javax.servlet.ServletContext;
|
||||||
|
@ -72,8 +71,6 @@ public class JspHelper {
|
||||||
"=";
|
"=";
|
||||||
private static final Log LOG = LogFactory.getLog(JspHelper.class);
|
private static final Log LOG = LogFactory.getLog(JspHelper.class);
|
||||||
|
|
||||||
static final Random rand = new Random();
|
|
||||||
|
|
||||||
/** Private constructor for preventing creating JspHelper object. */
|
/** Private constructor for preventing creating JspHelper object. */
|
||||||
private JspHelper() {}
|
private JspHelper() {}
|
||||||
|
|
||||||
|
@ -152,7 +149,7 @@ public class JspHelper {
|
||||||
if (chosenNode == null) {
|
if (chosenNode == null) {
|
||||||
do {
|
do {
|
||||||
if (doRandom) {
|
if (doRandom) {
|
||||||
index = rand.nextInt(nodes.length);
|
index = DFSUtil.getRandom().nextInt(nodes.length);
|
||||||
} else {
|
} else {
|
||||||
index++;
|
index++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||||
|
@ -99,8 +100,6 @@ class BlockPoolSliceScanner {
|
||||||
|
|
||||||
private LogFileHandler verificationLog;
|
private LogFileHandler verificationLog;
|
||||||
|
|
||||||
private Random random = new Random();
|
|
||||||
|
|
||||||
private DataTransferThrottler throttler = null;
|
private DataTransferThrottler throttler = null;
|
||||||
|
|
||||||
private static enum ScanType {
|
private static enum ScanType {
|
||||||
|
@ -254,7 +253,7 @@ class BlockPoolSliceScanner {
|
||||||
long period = Math.min(scanPeriod,
|
long period = Math.min(scanPeriod,
|
||||||
Math.max(blockMap.size(),1) * 600 * 1000L);
|
Math.max(blockMap.size(),1) * 600 * 1000L);
|
||||||
return System.currentTimeMillis() - scanPeriod +
|
return System.currentTimeMillis() - scanPeriod +
|
||||||
random.nextInt((int)period);
|
DFSUtil.getRandom().nextInt((int)period);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Adds block to list of blocks */
|
/** Adds block to list of blocks */
|
||||||
|
|
|
@ -71,7 +71,6 @@ import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
@ -398,8 +397,6 @@ public class DataNode extends Configured
|
||||||
/** Activated plug-ins. */
|
/** Activated plug-ins. */
|
||||||
private List<ServicePlugin> plugins;
|
private List<ServicePlugin> plugins;
|
||||||
|
|
||||||
private static final Random R = new Random();
|
|
||||||
|
|
||||||
// For InterDataNodeProtocol
|
// For InterDataNodeProtocol
|
||||||
public Server ipcServer;
|
public Server ipcServer;
|
||||||
|
|
||||||
|
@ -844,7 +841,7 @@ public class DataNode extends Configured
|
||||||
void scheduleBlockReport(long delay) {
|
void scheduleBlockReport(long delay) {
|
||||||
if (delay > 0) { // send BR after random delay
|
if (delay > 0) { // send BR after random delay
|
||||||
lastBlockReport = System.currentTimeMillis()
|
lastBlockReport = System.currentTimeMillis()
|
||||||
- ( blockReportInterval - R.nextInt((int)(delay)));
|
- ( blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
|
||||||
} else { // send at next heartbeat
|
} else { // send at next heartbeat
|
||||||
lastBlockReport = lastHeartbeat - blockReportInterval;
|
lastBlockReport = lastHeartbeat - blockReportInterval;
|
||||||
}
|
}
|
||||||
|
@ -965,7 +962,7 @@ public class DataNode extends Configured
|
||||||
// If we have sent the first block report, then wait a random
|
// If we have sent the first block report, then wait a random
|
||||||
// time before we start the periodic block reports.
|
// time before we start the periodic block reports.
|
||||||
if (resetBlockReportTime) {
|
if (resetBlockReportTime) {
|
||||||
lastBlockReport = startTime - R.nextInt((int)(blockReportInterval));
|
lastBlockReport = startTime - DFSUtil.getRandom().nextInt((int)(blockReportInterval));
|
||||||
resetBlockReportTime = false;
|
resetBlockReportTime = false;
|
||||||
} else {
|
} else {
|
||||||
/* say the last block report was at 8:20:14. The current report
|
/* say the last block report was at 8:20:14. The current report
|
||||||
|
|
|
@ -26,7 +26,6 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
@ -41,11 +40,11 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
|
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
|
||||||
import org.apache.hadoop.util.Daemon;
|
import org.apache.hadoop.util.Daemon;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Periodically scans the data directories for block and block metadata files.
|
* Periodically scans the data directories for block and block metadata files.
|
||||||
|
@ -240,8 +239,7 @@ public class DirectoryScanner implements Runnable {
|
||||||
|
|
||||||
void start() {
|
void start() {
|
||||||
shouldRun = true;
|
shouldRun = true;
|
||||||
Random rand = new Random();
|
long offset = DFSUtil.getRandom().nextInt((int) (scanPeriodMsecs/1000L)) * 1000L; //msec
|
||||||
long offset = rand.nextInt((int) (scanPeriodMsecs/1000L)) * 1000L; //msec
|
|
||||||
long firstScanTime = System.currentTimeMillis() + offset;
|
long firstScanTime = System.currentTimeMillis() + offset;
|
||||||
LOG.info("Periodic Directory Tree Verification scan starting at "
|
LOG.info("Periodic Directory Tree Verification scan starting at "
|
||||||
+ firstScanTime + " with interval " + scanPeriodMsecs);
|
+ firstScanTime + " with interval " + scanPeriodMsecs);
|
||||||
|
|
|
@ -36,9 +36,8 @@ import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
import javax.management.NotCompliantMBeanException;
|
import javax.management.NotCompliantMBeanException;
|
||||||
import javax.management.ObjectName;
|
import javax.management.ObjectName;
|
||||||
|
@ -50,24 +49,25 @@ import org.apache.hadoop.fs.DF;
|
||||||
import org.apache.hadoop.fs.DU;
|
import org.apache.hadoop.fs.DU;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.DiskChecker;
|
import org.apache.hadoop.util.DiskChecker;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||||
import org.apache.hadoop.util.ReflectionUtils;
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
|
|
||||||
import org.apache.hadoop.io.IOUtils;
|
|
||||||
|
|
||||||
/**************************************************
|
/**************************************************
|
||||||
* FSDataset manages a set of data blocks. Each block
|
* FSDataset manages a set of data blocks. Each block
|
||||||
|
@ -136,7 +136,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
||||||
|
|
||||||
if (lastChildIdx < 0 && resetIdx) {
|
if (lastChildIdx < 0 && resetIdx) {
|
||||||
//reset so that all children will be checked
|
//reset so that all children will be checked
|
||||||
lastChildIdx = random.nextInt(children.length);
|
lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (lastChildIdx >= 0 && children != null) {
|
if (lastChildIdx >= 0 && children != null) {
|
||||||
|
@ -164,7 +164,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
//now pick a child randomly for creating a new set of subdirs.
|
//now pick a child randomly for creating a new set of subdirs.
|
||||||
lastChildIdx = random.nextInt(children.length);
|
lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
|
||||||
return children[ lastChildIdx ].addBlock(b, src, true, false);
|
return children[ lastChildIdx ].addBlock(b, src, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1122,7 +1122,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
||||||
final FSVolumeSet volumes;
|
final FSVolumeSet volumes;
|
||||||
private final int maxBlocksPerDir;
|
private final int maxBlocksPerDir;
|
||||||
final ReplicasMap volumeMap;
|
final ReplicasMap volumeMap;
|
||||||
static final Random random = new Random();
|
|
||||||
final FSDatasetAsyncDiskService asyncDiskService;
|
final FSDatasetAsyncDiskService asyncDiskService;
|
||||||
private final int validVolsRequired;
|
private final int validVolsRequired;
|
||||||
|
|
||||||
|
@ -2178,7 +2177,6 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
||||||
}
|
}
|
||||||
|
|
||||||
private ObjectName mbeanName;
|
private ObjectName mbeanName;
|
||||||
private Random rand = new Random();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register the FSDataset MBean using the name
|
* Register the FSDataset MBean using the name
|
||||||
|
@ -2191,7 +2189,7 @@ public class FSDataset implements FSConstants, FSDatasetInterface {
|
||||||
StandardMBean bean;
|
StandardMBean bean;
|
||||||
String storageName;
|
String storageName;
|
||||||
if (storageId == null || storageId.equals("")) {// Temp fix for the uninitialized storage
|
if (storageId == null || storageId.equals("")) {// Temp fix for the uninitialized storage
|
||||||
storageName = "UndefinedStorageId" + rand.nextInt();
|
storageName = "UndefinedStorageId" + DFSUtil.getRandom().nextInt();
|
||||||
} else {
|
} else {
|
||||||
storageName = storageId;
|
storageName = storageId;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,10 +17,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode.metrics;
|
package org.apache.hadoop.hdfs.server.datanode.metrics;
|
||||||
|
|
||||||
import java.util.Random;
|
import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||||
|
@ -29,7 +31,6 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
import static org.apache.hadoop.metrics2.impl.MsInfo.*;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -72,7 +73,6 @@ public class DataNodeMetrics {
|
||||||
|
|
||||||
final MetricsRegistry registry = new MetricsRegistry("datanode");
|
final MetricsRegistry registry = new MetricsRegistry("datanode");
|
||||||
final String name;
|
final String name;
|
||||||
static final Random rng = new Random();
|
|
||||||
|
|
||||||
public DataNodeMetrics(String name, String sessionId) {
|
public DataNodeMetrics(String name, String sessionId) {
|
||||||
this.name = name;
|
this.name = name;
|
||||||
|
@ -84,7 +84,7 @@ public class DataNodeMetrics {
|
||||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||||
JvmMetrics.create("DataNode", sessionId, ms);
|
JvmMetrics.create("DataNode", sessionId, ms);
|
||||||
String name = "DataNodeActivity-"+ (dnName.isEmpty()
|
String name = "DataNodeActivity-"+ (dnName.isEmpty()
|
||||||
? "UndefinedDataNodeName"+ rng.nextInt() : dnName.replace(':', '-'));
|
? "UndefinedDataNodeName"+ DFSUtil.getRandom().nextInt() : dnName.replace(':', '-'));
|
||||||
return ms.register(name, null, new DataNodeMetrics(name, sessionId));
|
return ms.register(name, null, new DataNodeMetrics(name, sessionId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,12 +30,12 @@ import javax.servlet.http.HttpServletResponse;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.znerd.xmlenc.XMLOutputter;
|
import org.znerd.xmlenc.XMLOutputter;
|
||||||
|
@ -82,7 +82,7 @@ abstract class DfsServlet extends HttpServlet {
|
||||||
InetSocketAddress nnAddr = (InetSocketAddress)context.getAttribute("name.node.address");
|
InetSocketAddress nnAddr = (InetSocketAddress)context.getAttribute("name.node.address");
|
||||||
Configuration conf = new HdfsConfiguration(
|
Configuration conf = new HdfsConfiguration(
|
||||||
(Configuration)context.getAttribute(JspHelper.CURRENT_CONF));
|
(Configuration)context.getAttribute(JspHelper.CURRENT_CONF));
|
||||||
return DFSClient.createNamenode(nnAddr, conf);
|
return DFSUtil.createNamenode(nnAddr, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Create a URI for redirecting request to a datanode */
|
/** Create a URI for redirecting request to a datanode */
|
||||||
|
|
|
@ -45,7 +45,6 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.NavigableMap;
|
import java.util.NavigableMap;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -268,8 +267,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
||||||
public final NavigableMap<String, DatanodeDescriptor> datanodeMap =
|
public final NavigableMap<String, DatanodeDescriptor> datanodeMap =
|
||||||
new TreeMap<String, DatanodeDescriptor>();
|
new TreeMap<String, DatanodeDescriptor>();
|
||||||
|
|
||||||
Random r = new Random();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stores a set of DatanodeDescriptor objects.
|
* Stores a set of DatanodeDescriptor objects.
|
||||||
* This is a subset of {@link #datanodeMap}, containing nodes that are
|
* This is a subset of {@link #datanodeMap}, containing nodes that are
|
||||||
|
@ -737,7 +734,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
||||||
return new BlocksWithLocations(new BlockWithLocations[0]);
|
return new BlocksWithLocations(new BlockWithLocations[0]);
|
||||||
}
|
}
|
||||||
Iterator<BlockInfo> iter = node.getBlockIterator();
|
Iterator<BlockInfo> iter = node.getBlockIterator();
|
||||||
int startBlock = r.nextInt(numBlocks); // starting from a random block
|
int startBlock = DFSUtil.getRandom().nextInt(numBlocks); // starting from a random block
|
||||||
// skip blocks
|
// skip blocks
|
||||||
for(int i=0; i<startBlock; i++) {
|
for(int i=0; i<startBlock; i++) {
|
||||||
iter.next();
|
iter.next();
|
||||||
|
@ -1977,8 +1974,6 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static Random randBlockId = new Random();
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Allocate a block at the given pending filename
|
* Allocate a block at the given pending filename
|
||||||
*
|
*
|
||||||
|
@ -1991,9 +1986,9 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
||||||
private Block allocateBlock(String src, INode[] inodes,
|
private Block allocateBlock(String src, INode[] inodes,
|
||||||
DatanodeDescriptor targets[]) throws QuotaExceededException {
|
DatanodeDescriptor targets[]) throws QuotaExceededException {
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
Block b = new Block(FSNamesystem.randBlockId.nextLong(), 0, 0);
|
Block b = new Block(DFSUtil.getRandom().nextLong(), 0, 0);
|
||||||
while(isValidBlock(b)) {
|
while(isValidBlock(b)) {
|
||||||
b.setBlockId(FSNamesystem.randBlockId.nextLong());
|
b.setBlockId(DFSUtil.getRandom().nextLong());
|
||||||
}
|
}
|
||||||
b.setGenerationStamp(getGenerationStamp());
|
b.setGenerationStamp(getGenerationStamp());
|
||||||
b = dir.addBlock(src, inodes, b, targets);
|
b = dir.addBlock(src, inodes, b, targets);
|
||||||
|
@ -2963,7 +2958,7 @@ public class FSNamesystem implements FSConstants, FSNamesystemMBean,
|
||||||
private String newStorageID() {
|
private String newStorageID() {
|
||||||
String newID = null;
|
String newID = null;
|
||||||
while(newID == null) {
|
while(newID == null) {
|
||||||
newID = "DS" + Integer.toString(r.nextInt());
|
newID = "DS" + Integer.toString(DFSUtil.getRandom().nextInt());
|
||||||
if (datanodeMap.get(newID) != null)
|
if (datanodeMap.get(newID) != null)
|
||||||
newID = null;
|
newID = null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,13 +19,14 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
import static org.apache.hadoop.hdfs.server.common.Util.now;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
|
import java.io.DataInputStream;
|
||||||
import java.io.DataOutputStream;
|
import java.io.DataOutputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileInputStream;
|
||||||
import java.io.FileOutputStream;
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.DataInputStream;
|
import java.io.RandomAccessFile;
|
||||||
import java.io.FileInputStream;
|
|
||||||
import java.io.Closeable;
|
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
@ -34,29 +35,26 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
import java.io.RandomAccessFile;
|
|
||||||
import java.util.concurrent.CopyOnWriteArrayList;
|
import java.util.concurrent.CopyOnWriteArrayList;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
import org.apache.hadoop.hdfs.protocol.FSConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
|
||||||
|
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsConstants.NodeType;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
|
import org.apache.hadoop.hdfs.server.namenode.JournalStream.JournalType;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
|
|
||||||
import org.apache.hadoop.io.MD5Hash;
|
import org.apache.hadoop.io.MD5Hash;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
|
|
||||||
|
@ -620,11 +618,9 @@ public class NNStorage extends Storage implements Closeable {
|
||||||
* @return new namespaceID
|
* @return new namespaceID
|
||||||
*/
|
*/
|
||||||
private int newNamespaceID() {
|
private int newNamespaceID() {
|
||||||
Random r = new Random();
|
|
||||||
r.setSeed(now());
|
|
||||||
int newID = 0;
|
int newID = 0;
|
||||||
while(newID == 0)
|
while(newID == 0)
|
||||||
newID = r.nextInt(0x7FFFFFFF); // use 31 bits only
|
newID = DFSUtil.getRandom().nextInt(0x7FFFFFFF); // use 31 bits only
|
||||||
return newID;
|
return newID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1040,9 +1036,8 @@ public class NNStorage extends Storage implements Closeable {
|
||||||
try {
|
try {
|
||||||
rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
|
rand = SecureRandom.getInstance("SHA1PRNG").nextInt(Integer.MAX_VALUE);
|
||||||
} catch (NoSuchAlgorithmException e) {
|
} catch (NoSuchAlgorithmException e) {
|
||||||
final Random R = new Random();
|
|
||||||
LOG.warn("Could not use SecureRandom");
|
LOG.warn("Could not use SecureRandom");
|
||||||
rand = R.nextInt(Integer.MAX_VALUE);
|
rand = DFSUtil.getRandom().nextInt(Integer.MAX_VALUE);
|
||||||
}
|
}
|
||||||
String bpid = "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis();
|
String bpid = "BP-" + rand + "-"+ ip + "-" + System.currentTimeMillis();
|
||||||
return bpid;
|
return bpid;
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.BlockReader;
|
import org.apache.hadoop.hdfs.BlockReader;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
import org.apache.hadoop.hdfs.DFSClient;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -553,7 +554,6 @@ public class NamenodeFsck {
|
||||||
* Pick the best node from which to stream the data.
|
* Pick the best node from which to stream the data.
|
||||||
* That's the local one, if available.
|
* That's the local one, if available.
|
||||||
*/
|
*/
|
||||||
Random r = new Random();
|
|
||||||
private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes,
|
private DatanodeInfo bestNode(DFSClient dfs, DatanodeInfo[] nodes,
|
||||||
TreeSet<DatanodeInfo> deadNodes) throws IOException {
|
TreeSet<DatanodeInfo> deadNodes) throws IOException {
|
||||||
if ((nodes == null) ||
|
if ((nodes == null) ||
|
||||||
|
@ -562,7 +562,7 @@ public class NamenodeFsck {
|
||||||
}
|
}
|
||||||
DatanodeInfo chosenNode;
|
DatanodeInfo chosenNode;
|
||||||
do {
|
do {
|
||||||
chosenNode = nodes[r.nextInt(nodes.length)];
|
chosenNode = nodes[DFSUtil.getRandom().nextInt(nodes.length)];
|
||||||
} while (deadNodes.contains(chosenNode));
|
} while (deadNodes.contains(chosenNode));
|
||||||
return chosenNode;
|
return chosenNode;
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,8 +52,6 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileSystem.Statistics;
|
import org.apache.hadoop.fs.FileSystem.Statistics;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -523,14 +521,6 @@ public class DFSTestUtil {
|
||||||
return ((DFSOutputStream) out.getWrappedStream()).getBlockToken();
|
return ((DFSOutputStream) out.getWrappedStream()).getBlockToken();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
|
|
||||||
DatanodeID datanodeid, Configuration conf, int socketTimeout,
|
|
||||||
LocatedBlock locatedBlock)
|
|
||||||
throws IOException {
|
|
||||||
return DFSClient.createClientDatanodeProtocolProxy(
|
|
||||||
datanodeid, conf, socketTimeout, locatedBlock);
|
|
||||||
}
|
|
||||||
|
|
||||||
static void setLogLevel2All(org.apache.commons.logging.Log log) {
|
static void setLogLevel2All(org.apache.commons.logging.Log log) {
|
||||||
((org.apache.commons.logging.impl.Log4JLogger)log
|
((org.apache.commons.logging.impl.Log4JLogger)log
|
||||||
).getLogger().setLevel(org.apache.log4j.Level.ALL);
|
).getLogger().setLevel(org.apache.log4j.Level.ALL);
|
||||||
|
|
|
@ -74,7 +74,7 @@ public class TestReplication extends TestCase {
|
||||||
private void checkFile(FileSystem fileSys, Path name, int repl)
|
private void checkFile(FileSystem fileSys, Path name, int repl)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Configuration conf = fileSys.getConf();
|
Configuration conf = fileSys.getConf();
|
||||||
ClientProtocol namenode = DFSClient.createNamenode(conf);
|
ClientProtocol namenode = DFSUtil.createNamenode(conf);
|
||||||
|
|
||||||
waitForBlockReplication(name.toString(), namenode,
|
waitForBlockReplication(name.toString(), namenode,
|
||||||
Math.min(numDatanodes, repl), -1);
|
Math.min(numDatanodes, repl), -1);
|
||||||
|
@ -255,7 +255,6 @@ public class TestReplication extends TestCase {
|
||||||
|
|
||||||
//wait for all the blocks to be replicated;
|
//wait for all the blocks to be replicated;
|
||||||
LOG.info("Checking for block replication for " + filename);
|
LOG.info("Checking for block replication for " + filename);
|
||||||
int iters = 0;
|
|
||||||
while (true) {
|
while (true) {
|
||||||
boolean replOk = true;
|
boolean replOk = true;
|
||||||
LocatedBlocks blocks = namenode.getBlockLocations(filename, 0,
|
LocatedBlocks blocks = namenode.getBlockLocations(filename, 0,
|
||||||
|
@ -266,11 +265,8 @@ public class TestReplication extends TestCase {
|
||||||
LocatedBlock block = iter.next();
|
LocatedBlock block = iter.next();
|
||||||
int actual = block.getLocations().length;
|
int actual = block.getLocations().length;
|
||||||
if ( actual < expected ) {
|
if ( actual < expected ) {
|
||||||
if (true || iters > 0) {
|
LOG.info("Not enough replicas for " + block.getBlock()
|
||||||
LOG.info("Not enough replicas for " + block.getBlock() +
|
+ " yet. Expecting " + expected + ", got " + actual + ".");
|
||||||
" yet. Expecting " + expected + ", got " +
|
|
||||||
actual + ".");
|
|
||||||
}
|
|
||||||
replOk = false;
|
replOk = false;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -280,8 +276,6 @@ public class TestReplication extends TestCase {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
iters++;
|
|
||||||
|
|
||||||
if (maxWaitSec > 0 &&
|
if (maxWaitSec > 0 &&
|
||||||
(System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
|
(System.currentTimeMillis() - start) > (maxWaitSec * 1000)) {
|
||||||
throw new IOException("Timedout while waiting for all blocks to " +
|
throw new IOException("Timedout while waiting for all blocks to " +
|
||||||
|
|
|
@ -18,6 +18,19 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.security.token.block;
|
package org.apache.hadoop.hdfs.security.token.block;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Matchers.anyString;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.ByteArrayInputStream;
|
import java.io.ByteArrayInputStream;
|
||||||
import java.io.DataInputStream;
|
import java.io.DataInputStream;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
@ -33,13 +46,12 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
|
@ -58,21 +70,9 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Assume;
|
import org.junit.Assume;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
|
|
||||||
import static org.junit.Assert.*;
|
|
||||||
import static org.mockito.Matchers.any;
|
|
||||||
import static org.mockito.Matchers.anyInt;
|
|
||||||
import static org.mockito.Matchers.anyLong;
|
|
||||||
import static org.mockito.Matchers.anyString;
|
|
||||||
import static org.mockito.Mockito.doAnswer;
|
|
||||||
import static org.mockito.Mockito.doReturn;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -293,7 +293,7 @@ public class TestBlockToken {
|
||||||
try {
|
try {
|
||||||
long endTime = System.currentTimeMillis() + 3000;
|
long endTime = System.currentTimeMillis() + 3000;
|
||||||
while (System.currentTimeMillis() < endTime) {
|
while (System.currentTimeMillis() < endTime) {
|
||||||
proxy = DFSTestUtil.createClientDatanodeProtocolProxy(
|
proxy = DFSUtil.createClientDatanodeProtocolProxy(
|
||||||
fakeDnId, conf, 1000, fakeBlock);
|
fakeDnId, conf, 1000, fakeBlock);
|
||||||
assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
|
assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
|
||||||
if (proxy != null) {
|
if (proxy != null) {
|
||||||
|
|
|
@ -32,9 +32,9 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSClient;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
|
@ -99,7 +99,7 @@ public class TestBalancer extends TestCase {
|
||||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numNodes).build();
|
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numNodes).build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
client = DFSClient.createNamenode(conf);
|
client = DFSUtil.createNamenode(conf);
|
||||||
|
|
||||||
short replicationFactor = (short)(numNodes-1);
|
short replicationFactor = (short)(numNodes-1);
|
||||||
long fileLen = size/replicationFactor;
|
long fileLen = size/replicationFactor;
|
||||||
|
@ -193,7 +193,7 @@ public class TestBalancer extends TestCase {
|
||||||
.simulatedCapacities(capacities)
|
.simulatedCapacities(capacities)
|
||||||
.build();
|
.build();
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
client = DFSClient.createNamenode(conf);
|
client = DFSUtil.createNamenode(conf);
|
||||||
|
|
||||||
for(int i = 0; i < blocksDN.length; i++)
|
for(int i = 0; i < blocksDN.length; i++)
|
||||||
cluster.injectBlocks(i, Arrays.asList(blocksDN[i]));
|
cluster.injectBlocks(i, Arrays.asList(blocksDN[i]));
|
||||||
|
@ -305,7 +305,7 @@ public class TestBalancer extends TestCase {
|
||||||
.build();
|
.build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
client = DFSClient.createNamenode(conf);
|
client = DFSUtil.createNamenode(conf);
|
||||||
|
|
||||||
long totalCapacity = sum(capacities);
|
long totalCapacity = sum(capacities);
|
||||||
|
|
||||||
|
@ -396,7 +396,7 @@ public class TestBalancer extends TestCase {
|
||||||
.build();
|
.build();
|
||||||
try {
|
try {
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
client = DFSClient.createNamenode(conf);
|
client = DFSUtil.createNamenode(conf);
|
||||||
|
|
||||||
long totalCapacity = sum(capacities);
|
long totalCapacity = sum(capacities);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue