diff --git a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java
index c9b21d1c3a1..006cb35b1ea 100644
--- a/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java
+++ b/hadoop-common-project/hadoop-auth/src/main/java/org/apache/hadoop/security/authentication/client/KerberosAuthenticator.java
@@ -185,7 +185,7 @@ public void authenticate(URL url, AuthenticatedURL.Token token)
conn.setRequestMethod(AUTH_HTTP_METHOD);
conn.connect();
- if (conn.getRequestProperty(AUTHORIZATION) != null && conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
+ if (conn.getResponseCode() == HttpURLConnection.HTTP_OK) {
LOG.debug("JDK performed authentication on our behalf.");
// If the JDK already did the SPNEGO back-and-forth for
// us, just pull out the token.
diff --git a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/client/AuthenticatorTestCase.java b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/client/AuthenticatorTestCase.java
index ba7b43343d6..4e4ecc483eb 100644
--- a/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/client/AuthenticatorTestCase.java
+++ b/hadoop-common-project/hadoop-auth/src/test/java/org/apache/hadoop/security/authentication/client/AuthenticatorTestCase.java
@@ -136,7 +136,6 @@ protected void _testAuthentication(Authenticator authenticator, boolean doPost)
TestConnectionConfigurator connConf = new TestConnectionConfigurator();
AuthenticatedURL aUrl = new AuthenticatedURL(authenticator, connConf);
HttpURLConnection conn = aUrl.openConnection(url, token);
- Assert.assertTrue(token.isSet());
Assert.assertTrue(connConf.invoked);
String tokenStr = token.toString();
if (doPost) {
diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index 6fd5c4be0f9..8de10273445 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -284,7 +284,7 @@ Trunk (Unreleased)
HADOOP-7761. Improve the performance of raw comparisons. (todd)
- HADOOP-8589 ViewFs tests fail when tests and home dirs are nested (sanjay Radia)
+ HADOOP-8589. ViewFs tests fail when tests and home dirs are nested (sanjay Radia)
Release 2.3.0 - UNRELEASED
@@ -375,6 +375,14 @@ Release 2.3.0 - UNRELEASED
HADOOP-10064. Upgrade to maven antrun plugin version 1.7 (Arpit Agarwal via
jeagles)
+ HADOOP-9594. Update apache commons math dependency (Timothy St. Clair via
+ stevel)
+
+ HADOOP-10095. In CodecPool, synchronize pool and codecList separately in
+ order to reduce lock contention. (Nicolas Liochon via szetszwo)
+
+ HADOOP-10067. Missing POM dependency on jsr305 (Robert Rati via stevel)
+
OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
@@ -426,6 +434,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-10093. hadoop-env.cmd sets HADOOP_CLIENT_OPTS with a max heap size
that is too small. (Shanyu Zhao via cnauroth)
+ HADOOP-10094. NPE in GenericOptionsParser#preProcessForWindows().
+ (Enis Soztutar via cnauroth)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
@@ -476,6 +487,8 @@ Release 2.2.1 - UNRELEASED
as [-Dkey, value] which breaks GenericsOptionParser.
(Enis Soztutar via cnauroth)
+ HADOOP-10078. KerberosAuthenticator always does SPNEGO. (rkanter via tucu)
+
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
@@ -2298,6 +2311,10 @@ Release 0.23.10 - UNRELEASED
OPTIMIZATIONS
+ HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)
+
+ HADOOP-9955. RPC idle connection closing is extremely inefficient (daryn)
+
BUG FIXES
Release 0.23.9 - 2013-07-08
diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml
index 939ce20094d..7ebf9b4cf08 100644
--- a/hadoop-common-project/hadoop-common/pom.xml
+++ b/hadoop-common-project/hadoop-common/pom.xml
@@ -55,7 +55,7 @@
org.apache.commons
- commons-math
+ commons-math3
compile
@@ -83,6 +83,11 @@
commons-net
compile
+
+ commons-collections
+ commons-collections
+ compile
+
javax.servlet
servlet-api
@@ -213,6 +218,11 @@
com.jcraft
jsch
+
+ com.google.code.findbugs
+ jsr305
+ compile
+
org.apache.zookeeper
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index dcab369e385..e52659dd6e7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -65,6 +65,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
+ /** Number of pending connections that may be queued per socket reader */
+ public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY =
+ "ipc.server.read.connection-queue.size";
+ /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
+ public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
+ 100;
+
public static final String IPC_MAXIMUM_DATA_LENGTH =
"ipc.maximum.data.length";
@@ -219,4 +226,10 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY = "ipc.client.fallback-to-simple-auth-allowed";
public static final boolean IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT = false;
+ /** How often the server scans for idle connections */
+ public static final String IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY =
+ "ipc.client.connection.idle-scan-interval.ms";
+ /** Default value for IPC_SERVER_CONNECTION_IDLE_SCAN_INTERVAL_KEY */
+ public static final int IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT =
+ 10000;
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
index 63e3a09c307..11d88f13cd4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/CodecPool.java
@@ -85,16 +85,15 @@ private static T borrow(Map, List> pool,
T codec = null;
// Check if an appropriate codec is available
+ List codecList;
synchronized (pool) {
- if (pool.containsKey(codecClass)) {
- List codecList = pool.get(codecClass);
-
- if (codecList != null) {
- synchronized (codecList) {
- if (!codecList.isEmpty()) {
- codec = codecList.remove(codecList.size()-1);
- }
- }
+ codecList = pool.get(codecClass);
+ }
+
+ if (codecList != null) {
+ synchronized (codecList) {
+ if (!codecList.isEmpty()) {
+ codec = codecList.remove(codecList.size() - 1);
}
}
}
@@ -105,15 +104,17 @@ private static T borrow(Map, List> pool,
private static void payback(Map, List> pool, T codec) {
if (codec != null) {
Class codecClass = ReflectionUtils.getClass(codec);
+ List codecList;
synchronized (pool) {
- if (!pool.containsKey(codecClass)) {
- pool.put(codecClass, new ArrayList());
+ codecList = pool.get(codecClass);
+ if (codecList == null) {
+ codecList = new ArrayList();
+ pool.put(codecClass, codecList);
}
+ }
- List codecList = pool.get(codecClass);
- synchronized (codecList) {
- codecList.add(codec);
- }
+ synchronized (codecList) {
+ codecList.add(codec);
}
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
index 820f1068977..8a77bede4f2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/nativeio/NativeIO.java
@@ -114,7 +114,17 @@ public static class POSIX {
public static interface CacheTracker {
public void fadvise(String identifier, long offset, long len, int flags);
}
-
+
+ public static CacheManipulator cacheManipulator = new CacheManipulator();
+
+ @VisibleForTesting
+ public static class CacheManipulator {
+ public void mlock(String identifier, ByteBuffer buffer,
+ long len) throws IOException {
+ POSIX.mlock(buffer, len);
+ }
+ }
+
static {
if (NativeCodeLoader.isNativeCodeLoaded()) {
try {
@@ -249,7 +259,7 @@ static native void munlock_native(
*
* @throws NativeIOException
*/
- public static void mlock(ByteBuffer buffer, long len)
+ static void mlock(ByteBuffer buffer, long len)
throws IOException {
assertCodeLoaded();
if (!buffer.isDirect()) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 89f9501d980..b8dc754c74f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -51,11 +51,13 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
@@ -345,17 +347,8 @@ public static boolean isRpcInvocation() {
private int port; // port we listen on
private int handlerCount; // number of handler threads
private int readThreads; // number of read threads
+ private int readerPendingConnectionQueue; // number of connections to queue per read thread
private Class extends Writable> rpcRequestClass; // class used for deserializing the rpc request
- private int maxIdleTime; // the maximum idle time after
- // which a client may be disconnected
- private int thresholdIdleConnections; // the number of idle connections
- // after which we will start
- // cleaning up idle
- // connections
- int maxConnectionsToNuke; // the max number of
- // connections to nuke
- //during a cleanup
-
protected RpcMetrics rpcMetrics;
protected RpcDetailedMetrics rpcDetailedMetrics;
@@ -373,13 +366,10 @@ public static boolean isRpcInvocation() {
volatile private boolean running = true; // true while server runs
private BlockingQueue callQueue; // queued calls
- private List connectionList =
- Collections.synchronizedList(new LinkedList());
- //maintain a list
- //of client connections
+ // maintains the set of client connections and handles idle timeouts
+ private ConnectionManager connectionManager;
private Listener listener = null;
private Responder responder = null;
- private int numConnections = 0;
private Handler[] handlers = null;
/**
@@ -449,8 +439,8 @@ Iterable extends Thread> getHandlers() {
}
@VisibleForTesting
- List getConnections() {
- return connectionList;
+ Connection[] getConnections() {
+ return connectionManager.toArray();
}
/**
@@ -518,11 +508,6 @@ private class Listener extends Thread {
private Reader[] readers = null;
private int currentReader = 0;
private InetSocketAddress address; //the address we bind at
- private Random rand = new Random();
- private long lastCleanupRunTime = 0; //the last time when a cleanup connec-
- //-tion (for idle connections) ran
- private long cleanupInterval = 10000; //the minimum interval between
- //two cleanup runs
private int backlogLength = conf.getInt(
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,
CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT);
@@ -553,12 +538,14 @@ public Listener() throws IOException {
}
private class Reader extends Thread {
- private volatile boolean adding = false;
+ final private BlockingQueue pendingConnections;
private final Selector readSelector;
Reader(String name) throws IOException {
super(name);
+ this.pendingConnections =
+ new LinkedBlockingQueue(readerPendingConnectionQueue);
this.readSelector = Selector.open();
}
@@ -580,10 +567,14 @@ private synchronized void doRunLoop() {
while (running) {
SelectionKey key = null;
try {
+ // consume as many connections as currently queued to avoid
+ // unbridled acceptance of connections that starves the select
+ int size = pendingConnections.size();
+ for (int i=size; i>0; i--) {
+ Connection conn = pendingConnections.take();
+ conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
+ }
readSelector.select();
- while (adding) {
- this.wait(1000);
- }
Iterator iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) {
@@ -607,26 +598,14 @@ private synchronized void doRunLoop() {
}
/**
- * This gets reader into the state that waits for the new channel
- * to be registered with readSelector. If it was waiting in select()
- * the thread will be woken up, otherwise whenever select() is called
- * it will return even if there is nothing to read and wait
- * in while(adding) for finishAdd call
+ * Updating the readSelector while it's being used is not thread-safe,
+ * so the connection must be queued. The reader will drain the queue
+ * and update its readSelector before performing the next select
*/
- public void startAdd() {
- adding = true;
+ public void addConnection(Connection conn) throws InterruptedException {
+ pendingConnections.put(conn);
readSelector.wakeup();
}
-
- public synchronized SelectionKey registerChannel(SocketChannel channel)
- throws IOException {
- return channel.register(readSelector, SelectionKey.OP_READ);
- }
-
- public synchronized void finishAdd() {
- adding = false;
- this.notify();
- }
void shutdown() {
assert !running;
@@ -638,58 +617,12 @@ void shutdown() {
}
}
}
- /** cleanup connections from connectionList. Choose a random range
- * to scan and also have a limit on the number of the connections
- * that will be cleanedup per run. The criteria for cleanup is the time
- * for which the connection was idle. If 'force' is true then all
- * connections will be looked at for the cleanup.
- */
- private void cleanupConnections(boolean force) {
- if (force || numConnections > thresholdIdleConnections) {
- long currentTime = Time.now();
- if (!force && (currentTime - lastCleanupRunTime) < cleanupInterval) {
- return;
- }
- int start = 0;
- int end = numConnections - 1;
- if (!force) {
- start = rand.nextInt() % numConnections;
- end = rand.nextInt() % numConnections;
- int temp;
- if (end < start) {
- temp = start;
- start = end;
- end = temp;
- }
- }
- int i = start;
- int numNuked = 0;
- while (i <= end) {
- Connection c;
- synchronized (connectionList) {
- try {
- c = connectionList.get(i);
- } catch (Exception e) {return;}
- }
- if (c.timedOut(currentTime)) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
- closeConnection(c);
- numNuked++;
- end--;
- c = null;
- if (!force && numNuked == maxConnectionsToNuke) break;
- }
- else i++;
- }
- lastCleanupRunTime = Time.now();
- }
- }
@Override
public void run() {
LOG.info(getName() + ": starting");
SERVER.set(Server.this);
+ connectionManager.startIdleScan();
while (running) {
SelectionKey key = null;
try {
@@ -713,12 +646,11 @@ public void run() {
// some thread(s) a chance to finish
LOG.warn("Out of Memory in server select", e);
closeCurrentConnection(key, e);
- cleanupConnections(true);
+ connectionManager.closeIdle(true);
try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) {
closeCurrentConnection(key, e);
}
- cleanupConnections(false);
}
LOG.info("Stopping " + this.getName());
@@ -731,10 +663,9 @@ public void run() {
selector= null;
acceptChannel= null;
- // clean up all connections
- while (!connectionList.isEmpty()) {
- closeConnection(connectionList.remove(0));
- }
+ // close all connections
+ connectionManager.stopIdleScan();
+ connectionManager.closeAll();
}
}
@@ -742,8 +673,6 @@ private void closeCurrentConnection(SelectionKey key, Throwable e) {
if (key != null) {
Connection c = (Connection)key.attachment();
if (c != null) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": disconnecting client " + c.getHostAddress());
closeConnection(c);
c = null;
}
@@ -754,8 +683,7 @@ InetSocketAddress getAddress() {
return (InetSocketAddress)acceptChannel.socket().getLocalSocketAddress();
}
- void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
- Connection c = null;
+ void doAccept(SelectionKey key) throws InterruptedException, IOException, OutOfMemoryError {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel channel;
while ((channel = server.accept()) != null) {
@@ -765,22 +693,9 @@ void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
channel.socket().setKeepAlive(true);
Reader reader = getReader();
- try {
- reader.startAdd();
- SelectionKey readKey = reader.registerChannel(channel);
- c = new Connection(readKey, channel, Time.now());
- readKey.attach(c);
- synchronized (connectionList) {
- connectionList.add(numConnections, c);
- numConnections++;
- }
- if (LOG.isDebugEnabled())
- LOG.debug("Server connection from " + c.toString() +
- "; # active connections: " + numConnections +
- "; # queued calls: " + callQueue.size());
- } finally {
- reader.finishAdd();
- }
+ Connection c = connectionManager.register(channel);
+ key.attach(c); // so closeCurrentConnection can get the object
+ reader.addConnection(c);
}
}
@@ -808,10 +723,6 @@ void doRead(SelectionKey key) throws InterruptedException {
count = -1; //so that the (count < 0) block is executed
}
if (count < 0) {
- if (LOG.isDebugEnabled())
- LOG.debug(getName() + ": disconnecting client " +
- c + ". Number of active connections: "+
- numConnections);
closeConnection(c);
c = null;
}
@@ -1190,8 +1101,7 @@ public class Connection {
private boolean sentNegotiate = false;
private boolean useWrap = false;
- public Connection(SelectionKey key, SocketChannel channel,
- long lastContact) {
+ public Connection(SocketChannel channel, long lastContact) {
this.channel = channel;
this.lastContact = lastContact;
this.data = null;
@@ -1253,12 +1163,6 @@ private void incRpcCount() {
rpcCount++;
}
- private boolean timedOut(long currentTime) {
- if (isIdle() && currentTime - lastContact > maxIdleTime)
- return true;
- return false;
- }
-
private UserGroupInformation getAuthorizedUgi(String authorizedId)
throws InvalidToken, AccessControlException {
if (authMethod == AuthMethod.TOKEN) {
@@ -2189,16 +2093,10 @@ protected Server(String bindAddress, int port,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
}
+ this.readerPendingConnectionQueue = conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
this.callQueue = new LinkedBlockingQueue(maxQueueSize);
- this.maxIdleTime = 2 * conf.getInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
- CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
- this.maxConnectionsToNuke = conf.getInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
- CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
- this.thresholdIdleConnections = conf.getInt(
- CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
- CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
this.secretManager = (SecretManager) secretManager;
this.authorize =
conf.getBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
@@ -2219,6 +2117,7 @@ protected Server(String bindAddress, int port,
// Create the responder here
responder = new Responder();
+ connectionManager = new ConnectionManager();
if (secretManager != null) {
SaslRpcServer.init(conf);
@@ -2277,11 +2176,7 @@ private List getAuthMethods(SecretManager> secretManager,
}
private void closeConnection(Connection connection) {
- synchronized (connectionList) {
- if (connectionList.remove(connection))
- numConnections--;
- }
- connection.close();
+ connectionManager.close(connection);
}
/**
@@ -2536,7 +2431,7 @@ public int getPort() {
* @return the number of open rpc connections
*/
public int getNumOpenConnections() {
- return numConnections;
+ return connectionManager.size();
}
/**
@@ -2646,4 +2541,151 @@ private static int channelIO(ReadableByteChannel readCh,
int nBytes = initialRemaining - buf.remaining();
return (nBytes > 0) ? nBytes : ret;
}
+
+ private class ConnectionManager {
+ final private AtomicInteger count = new AtomicInteger();
+ final private Set connections;
+
+ final private Timer idleScanTimer;
+ final private int idleScanThreshold;
+ final private int idleScanInterval;
+ final private int maxIdleTime;
+ final private int maxIdleToClose;
+
+ ConnectionManager() {
+ this.idleScanTimer = new Timer(
+ "IPC Server idle connection scanner for port " + getPort(), true);
+ this.idleScanThreshold = conf.getInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_KEY,
+ CommonConfigurationKeysPublic.IPC_CLIENT_IDLETHRESHOLD_DEFAULT);
+ this.idleScanInterval = conf.getInt(
+ CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_CONNECTION_IDLESCANINTERVAL_DEFAULT);
+ this.maxIdleTime = 2 * conf.getInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,
+ CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_DEFAULT);
+ this.maxIdleToClose = conf.getInt(
+ CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_KEY,
+ CommonConfigurationKeysPublic.IPC_CLIENT_KILL_MAX_DEFAULT);
+ // create a set with concurrency -and- a thread-safe iterator, add 2
+ // for listener and idle closer threads
+ this.connections = Collections.newSetFromMap(
+ new ConcurrentHashMap(
+ maxQueueSize, 0.75f, readThreads+2));
+ }
+
+ private boolean add(Connection connection) {
+ boolean added = connections.add(connection);
+ if (added) {
+ count.getAndIncrement();
+ }
+ return added;
+ }
+
+ private boolean remove(Connection connection) {
+ boolean removed = connections.remove(connection);
+ if (removed) {
+ count.getAndDecrement();
+ }
+ return removed;
+ }
+
+ int size() {
+ return count.get();
+ }
+
+ Connection[] toArray() {
+ return connections.toArray(new Connection[0]);
+ }
+
+ Connection register(SocketChannel channel) {
+ Connection connection = new Connection(channel, Time.now());
+ add(connection);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Server connection from " + connection +
+ "; # active connections: " + size() +
+ "; # queued calls: " + callQueue.size());
+ }
+ return connection;
+ }
+
+ boolean close(Connection connection) {
+ boolean exists = remove(connection);
+ if (exists) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Thread.currentThread().getName() +
+ ": disconnecting client " + connection +
+ ". Number of active connections: "+ size());
+ }
+ // only close if actually removed to avoid double-closing due
+ // to possible races
+ connection.close();
+ }
+ return exists;
+ }
+
+ // synch'ed to avoid explicit invocation upon OOM from colliding with
+ // timer task firing
+ synchronized void closeIdle(boolean scanAll) {
+ long minLastContact = Time.now() - maxIdleTime;
+ // concurrent iterator might miss new connections added
+ // during the iteration, but that's ok because they won't
+ // be idle yet anyway and will be caught on next scan
+ int closed = 0;
+ for (Connection connection : connections) {
+ // stop if connections dropped below threshold unless scanning all
+ if (!scanAll && size() < idleScanThreshold) {
+ break;
+ }
+ // stop if not scanning all and max connections are closed
+ if (connection.isIdle() &&
+ connection.getLastContact() < minLastContact &&
+ close(connection) &&
+ !scanAll && (++closed == maxIdleToClose)) {
+ break;
+ }
+ }
+ }
+
+ void closeAll() {
+ // use a copy of the connections to be absolutely sure the concurrent
+ // iterator doesn't miss a connection
+ for (Connection connection : toArray()) {
+ close(connection);
+ }
+ }
+
+ void startIdleScan() {
+ scheduleIdleScanTask();
+ }
+
+ void stopIdleScan() {
+ idleScanTimer.cancel();
+ }
+
+ private void scheduleIdleScanTask() {
+ if (!running) {
+ return;
+ }
+ TimerTask idleScanTask = new TimerTask(){
+ @Override
+ public void run() {
+ if (!running) {
+ return;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(Thread.currentThread().getName()+": task running");
+ }
+ try {
+ closeIdle(false);
+ } finally {
+ // explicitly reschedule so next execution occurs relative
+ // to the end of this scan, not the beginning
+ scheduleIdleScanTask();
+ }
+ }
+ };
+ idleScanTimer.schedule(idleScanTask, idleScanInterval);
+ }
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
index 259d329c4be..e386ce608f0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSystemImpl.java
@@ -38,7 +38,7 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.commons.math.util.MathUtils;
+import org.apache.commons.math3.util.ArithmeticUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsCollector;
@@ -457,7 +457,7 @@ private synchronized void configureSinks() {
MetricsConfig conf = entry.getValue();
int sinkPeriod = conf.getInt(PERIOD_KEY, PERIOD_DEFAULT);
confPeriod = confPeriod == 0 ? sinkPeriod
- : MathUtils.gcd(confPeriod, sinkPeriod);
+ : ArithmeticUtils.gcd(confPeriod, sinkPeriod);
String clsName = conf.getClassName("");
if (clsName == null) continue; // sink can be registered later on
String sinkName = entry.getKey();
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GenericOptionsParser.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GenericOptionsParser.java
index 6048f8e8185..67818555393 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GenericOptionsParser.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/GenericOptionsParser.java
@@ -431,6 +431,9 @@ private String[] preProcessForWindows(String[] args) {
if (!Shell.WINDOWS) {
return args;
}
+ if (args == null) {
+ return null;
+ }
List newArgs = new ArrayList(args.length);
for (int i=0; i < args.length; i++) {
String prop = null;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java
index bd08efb5cda..d23df79d27f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ZKUtil.java
@@ -71,6 +71,20 @@ private static int getPermFromString(String permString) {
return perm;
}
+ /**
+ * Helper method to remove a subset of permissions (remove) from a
+ * given set (perms).
+ * @param perms The permissions flag to remove from. Should be an OR of a
+ * some combination of {@link ZooDefs.Perms}
+ * @param remove The permissions to be removed. Should be an OR of a
+ * some combination of {@link ZooDefs.Perms}
+ * @return A permissions flag that is an OR of {@link ZooDefs.Perms}
+ * present in perms and not present in remove
+ */
+ public static int removeSpecificPerms(int perms, int remove) {
+ return perms ^ remove;
+ }
+
/**
* Parse comma separated list of ACL entries to secure generated nodes, e.g.
* sasl:hdfs/host1@MY.DOMAIN:cdrwa,sasl:hdfs/host2@MY.DOMAIN:cdrwa
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 33fb799c0c8..02516a183aa 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -44,12 +44,19 @@
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
@@ -64,8 +71,10 @@
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.StringUtils;
+import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Assume;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -79,7 +88,7 @@ public class TestIPC {
public static final Log LOG =
LogFactory.getLog(TestIPC.class);
- final private static Configuration conf = new Configuration();
+ private static Configuration conf;
final static private int PING_INTERVAL = 1000;
final static private int MIN_SLEEP_TIME = 1000;
/**
@@ -89,7 +98,9 @@ public class TestIPC {
static boolean WRITABLE_FAULTS_ENABLED = true;
static int WRITABLE_FAULTS_SLEEP = 0;
- static {
+ @Before
+ public void setupConf() {
+ conf = new Configuration();
Client.setPingInterval(conf, PING_INTERVAL);
}
@@ -613,6 +624,255 @@ public void testIpcWithServiceClass() throws IOException {
server.stop();
}
+ private static class TestServerQueue extends Server {
+ final CountDownLatch firstCallLatch = new CountDownLatch(1);
+ final CountDownLatch callBlockLatch = new CountDownLatch(1);
+
+ TestServerQueue(int expectedCalls, int readers, int callQ, int handlers,
+ Configuration conf) throws IOException {
+ super(ADDRESS, 0, LongWritable.class, handlers, readers, callQ, conf, null, null);
+ }
+
+ @Override
+ public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
+ long receiveTime) throws IOException {
+ firstCallLatch.countDown();
+ try {
+ callBlockLatch.await();
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ return param;
+ }
+ }
+
+ /**
+ * Check that reader queueing works
+ * @throws BrokenBarrierException
+ * @throws InterruptedException
+ */
+ @Test(timeout=60000)
+ public void testIpcWithReaderQueuing() throws Exception {
+ // 1 reader, 1 connectionQ slot, 1 callq
+ for (int i=0; i < 10; i++) {
+ checkBlocking(1, 1, 1);
+ }
+ // 4 readers, 5 connectionQ slots, 2 callq
+ for (int i=0; i < 10; i++) {
+ checkBlocking(4, 5, 2);
+ }
+ }
+
+ // goal is to jam a handler with a connection, fill the callq with
+ // connections, in turn jamming the readers - then flood the server and
+ // ensure that the listener blocks when the reader connection queues fill
+ private void checkBlocking(int readers, int readerQ, int callQ) throws Exception {
+ int handlers = 1; // makes it easier
+
+ final Configuration conf = new Configuration();
+ conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ);
+
+ // send in enough clients to block up the handlers, callq, and readers
+ int initialClients = readers + callQ + handlers;
+ // max connections we should ever end up accepting at once
+ int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener
+ // stress it with 2X the max
+ int clients = maxAccept*2;
+
+ final AtomicInteger failures = new AtomicInteger(0);
+ final CountDownLatch callFinishedLatch = new CountDownLatch(clients);
+
+ // start server
+ final TestServerQueue server =
+ new TestServerQueue(clients, readers, callQ, handlers, conf);
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+
+ Client.setConnectTimeout(conf, 10000);
+
+ // instantiate the threads, will start in batches
+ Thread[] threads = new Thread[clients];
+ for (int i=0; i 1; i -= killMax) {
+ Thread.sleep(cleanupInterval);
+ assertFalse(error.get());
+ assertEquals(i, server.getNumOpenConnections());
+ }
+
+ // connection for the first blocked call should still be open
+ Thread.sleep(cleanupInterval);
+ assertFalse(error.get());
+ assertEquals(1, server.getNumOpenConnections());
+
+ // wake up call and ensure connection times out
+ firstCallBarrier.await();
+ Thread.sleep(maxIdle*2);
+ assertFalse(error.get());
+ assertEquals(0, server.getNumOpenConnections());
+ } finally {
+ for (Thread t : threads) {
+ if (t != null) {
+ t.interrupt();
+ t.join();
+ }
+ server.stop();
+ }
+ }
+ }
+
/**
* Make a call from a client and verify if header info is changed in server side
*/
@@ -622,7 +882,7 @@ private void callAndVerify(Server server, InetSocketAddress addr,
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
- Connection connection = server.getConnections().get(0);
+ Connection connection = server.getConnections()[0];
int serviceClass2 = connection.getServiceClass();
assertFalse(noChanged ^ serviceClass == serviceClass2);
client.stop();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGenericOptionsParser.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGenericOptionsParser.java
index 87bc83b4192..48a419b3a52 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGenericOptionsParser.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestGenericOptionsParser.java
@@ -282,4 +282,12 @@ private void assertDOptionParsing(String[] args,
Arrays.toString(remainingArgs) + Arrays.toString(expectedRemainingArgs),
expectedRemainingArgs, remainingArgs);
}
+
+ /** Test passing null as args. Some classes still call
+ * Tool interface from java passing null.
+ */
+ public void testNullArgs() throws IOException {
+ GenericOptionsParser parser = new GenericOptionsParser(conf, null);
+ parser.getRemainingArgs();
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java
index 1d14326d2ab..52d10ca2fca 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestZKUtil.java
@@ -24,7 +24,6 @@
import java.io.IOException;
import java.util.List;
-import org.apache.hadoop.util.ZKUtil;
import org.apache.hadoop.util.ZKUtil.BadAclFormatException;
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
import org.apache.zookeeper.ZooDefs.Perms;
@@ -75,6 +74,14 @@ private static void badAcl(String acls, String expectedErr) {
}
}
+ @Test
+ public void testRemoveSpecificPerms() {
+ int perms = Perms.ALL;
+ int remove = Perms.CREATE;
+ int newPerms = ZKUtil.removeSpecificPerms(perms, remove);
+ assertEquals("Removal failed", 0, newPerms & Perms.CREATE);
+ }
+
@Test
public void testGoodACLs() {
List result = ZKUtil.parseACLs(
diff --git a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh
index 02e1a7172c7..621a2fa96f5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh
+++ b/hadoop-hdfs-project/hadoop-hdfs-httpfs/src/main/libexec/httpfs-config.sh
@@ -55,7 +55,7 @@ print "Setting HTTPFS_HOME: ${HTTPFS_HOME}"
#
if [ -e "${HTTPFS_HOME}/bin/httpfs-env.sh" ]; then
print "Sourcing: ${HTTPFS_HOME}/bin/httpfs-env.sh"
- source ${HTTPFS_HOME}/bin/HTTPFS-env.sh
+ source ${HTTPFS_HOME}/bin/httpfs-env.sh
grep "^ *export " ${HTTPFS_HOME}/bin/httpfs-env.sh | sed 's/ *export/ setting/'
fi
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 7586f950a56..2eef17a5056 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -13,6 +13,9 @@ Trunk (Unreleased)
HDFS-3125. Add JournalService to enable Journal Daemon. (suresh)
+ HDFS-5444. Choose default web UI based on browser capabilities. (Haohui Mai
+ via jing9)
+
IMPROVEMENTS
HDFS-4665. Move TestNetworkTopologyWithNodeGroup to common.
@@ -194,6 +197,8 @@ Trunk (Unreleased)
HDFS-5485. Add command-line support for modifyDirective. (cmccabe)
+ HDFS-5366. recaching improvements (cmccabe)
+
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
@@ -475,6 +480,20 @@ Release 2.3.0 - UNRELEASED
HDFS-5495. Remove further JUnit3 usages from HDFS.
(Jarek Jarcec Cecho via wang)
+ HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9)
+
+ HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9)
+
+ HDFS-5440. Extract the logic of handling delegation tokens in HftpFileSystem
+ to the TokenAspect class. (Haohui Mai via jing9)
+
+ HDFS-5487. Introduce unit test for TokenAspect. (Haohui Mai via jing9)
+
+ HDFS-4995. Make getContentSummary less expensive. (kihwal)
+
+ HDFS-5506. Use URLConnectionFactory in DelegationTokenFetcher. (Haohui Mai
+ via jing9)
+
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@@ -529,13 +548,20 @@ Release 2.3.0 - UNRELEASED
HDFS-5476. Snapshot: clean the blocks/files/directories under a renamed
file/directory while deletion. (jing9)
- HDFS-5325. Remove WebHdfsFileSystem#ConnRunner. (Haohui Mai via jing9)
-
- HDFS-5488. Clean up TestHftpURLTimeout. (Haohui Mai via jing9)
-
HDFS-5425. Renaming underconstruction file with snapshots can make NN failure on
restart. (jing9 and Vinay)
+ HDFS-5474. Deletesnapshot can make Namenode in safemode on NN restarts.
+ (Sathish via jing9)
+
+ HDFS-5075. httpfs-config.sh calls out incorrect env script name
+ (Timothy St. Clair via stevel)
+
+ HDFS-5504. In HA mode, OP_DELETE_SNAPSHOT is not decrementing the safemode threshold,
+ leads to NN safemode. (Vinay via jing9)
+
+ HDFS-5438. Flaws in block report processing can cause data loss. (kihwal)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 1738a4c1bc7..c5c6d5c5492 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -42,4 +42,8 @@ public boolean corruptPacket() {
public boolean uncorruptPacket() {
return false;
}
+
+ public boolean failPacket() {
+ return false;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 0483ce8de0b..f3489361240 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -198,6 +198,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_LIST_LIMIT = "dfs.ls.limit";
public static final int DFS_LIST_LIMIT_DEFAULT = 1000;
+ public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit";
+ public static final int DFS_CONTENT_SUMMARY_LIMIT_DEFAULT = 0;
public static final String DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY = "dfs.datanode.failed.volumes.tolerated";
public static final int DFS_DATANODE_FAILED_VOLUMES_TOLERATED_DEFAULT = 0;
public static final String DFS_DATANODE_SYNCONCLOSE_KEY = "dfs.datanode.synconclose";
@@ -342,6 +344,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
public static final long DFS_HEARTBEAT_INTERVAL_DEFAULT = 3;
+ public static final String DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS = "dfs.namenode.path.based.cache.retry.interval.ms";
+ public static final long DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT = 60000L;
public static final String DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY = "dfs.namenode.decommission.interval";
public static final int DFS_NAMENODE_DECOMMISSION_INTERVAL_DEFAULT = 30;
public static final String DFS_NAMENODE_DECOMMISSION_NODES_PER_INTERVAL_KEY = "dfs.namenode.decommission.nodes.per.interval";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 7f5b6ed5323..c8bbf8fd841 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -151,6 +151,7 @@ public class DFSOutputStream extends FSOutputSummer
private final short blockReplication; // replication factor of file
private boolean shouldSyncBlock = false; // force blocks to disk upon close
private CachingStrategy cachingStrategy;
+ private boolean failPacket = false;
private static class Packet {
private static final long HEART_BEAT_SEQNO = -1L;
@@ -752,6 +753,16 @@ public void run() {
one.seqno + " but received " + seqno);
}
isLastPacketInBlock = one.lastPacketInBlock;
+
+ // Fail the packet write for testing in order to force a
+ // pipeline recovery.
+ if (DFSClientFaultInjector.get().failPacket() &&
+ isLastPacketInBlock) {
+ failPacket = true;
+ throw new IOException(
+ "Failing the last packet for testing.");
+ }
+
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
@@ -1044,7 +1055,18 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
accessToken = lb.getBlockToken();
// set up the pipeline again with the remaining nodes
- success = createBlockOutputStream(nodes, newGS, isRecovery);
+ if (failPacket) { // for testing
+ success = createBlockOutputStream(nodes, newGS-1, isRecovery);
+ failPacket = false;
+ try {
+ // Give DNs time to send in bad reports. In real situations,
+ // good reports should follow bad ones, if client committed
+ // with those nodes.
+ Thread.sleep(2000);
+ } catch (InterruptedException ie) {}
+ } else {
+ success = createBlockOutputStream(nodes, newGS, isRecovery);
+ }
}
if (success) {
@@ -1904,7 +1926,9 @@ public synchronized void close() throws IOException {
// be called during unit tests
private void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.now();
+ long localTimeout = 400;
boolean fileComplete = false;
+ int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
while (!fileComplete) {
fileComplete =
dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
@@ -1920,7 +1944,13 @@ private void completeFile(ExtendedBlock last) throws IOException {
throw new IOException(msg);
}
try {
- Thread.sleep(400);
+ Thread.sleep(localTimeout);
+ if (retries == 0) {
+ throw new IOException("Unable to close file because the last block"
+ + " does not have enough number of replicas.");
+ }
+ retries--;
+ localTimeout *= 2;
if (Time.now() - localstart > 5000) {
DFSClient.LOG.info("Could not complete " + src + " retrying...");
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
index a2202feefba..b13d7e3252d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java
@@ -229,6 +229,29 @@ public long getBlockRecoveryId() {
return blockRecoveryId;
}
+ /**
+ * Process the recorded replicas. When about to commit or finish the
+ * pipeline recovery sort out bad replicas.
+ * @param genStamp The final generation stamp for the block.
+ */
+ public void setGenerationStampAndVerifyReplicas(long genStamp) {
+ if (replicas == null)
+ return;
+
+ // Remove the replicas with wrong gen stamp.
+ // The replica list is unchanged.
+ for (ReplicaUnderConstruction r : replicas) {
+ if (genStamp != r.getGenerationStamp()) {
+ r.getExpectedStorageLocation().removeBlock(this);
+ NameNode.blockStateChangeLog.info("BLOCK* Removing stale replica "
+ + "from location: " + r);
+ }
+ }
+
+ // Set the generation stamp for the block.
+ setGenerationStamp(genStamp);
+ }
+
/**
* Commit block's length and generation stamp as reported by the client.
* Set block state to {@link BlockUCState#COMMITTED}.
@@ -301,6 +324,8 @@ void addReplicaIfNotPresent(DatanodeStorageInfo storage,
while (it.hasNext()) {
ReplicaUnderConstruction r = it.next();
if(r.getExpectedStorageLocation() == storage) {
+ // Record the gen stamp from the report
+ r.setGenerationStamp(block.getGenerationStamp());
return;
} else if (r.getExpectedStorageLocation().getDatanodeDescriptor() ==
storage.getDatanodeDescriptor()) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index dc540d4e675..af21b7c44cb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -59,6 +59,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.hdfs.server.blockmanagement.CorruptReplicasMap.Reason;
import org.apache.hadoop.hdfs.server.blockmanagement.PendingDataNodeMessages.ReportedBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -1051,7 +1052,8 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
+ blk + " not found");
return;
}
- markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn, storageID);
+ markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason,
+ Reason.CORRUPTION_REPORTED), dn, storageID);
}
private void markBlockAsCorrupt(BlockToMarkCorrupt b,
@@ -1074,7 +1076,8 @@ private void markBlockAsCorrupt(BlockToMarkCorrupt b,
node.addBlock(storageID, b.stored);
// Add this replica to corruptReplicas Map
- corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason);
+ corruptReplicas.addToCorruptReplicasMap(b.corrupted, node, b.reason,
+ b.reasonCode);
if (countNodes(b.stored).liveReplicas() >= bc.getBlockReplication()) {
// the block is over-replicated so invalidate the replicas immediately
invalidateBlock(b, node);
@@ -1574,22 +1577,27 @@ private static class BlockToMarkCorrupt {
final BlockInfo stored;
/** The reason to mark corrupt. */
final String reason;
-
- BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+ /** The reason code to be stored */
+ final Reason reasonCode;
+
+ BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason,
+ Reason reasonCode) {
Preconditions.checkNotNull(corrupted, "corrupted is null");
Preconditions.checkNotNull(stored, "stored is null");
this.corrupted = corrupted;
this.stored = stored;
this.reason = reason;
+ this.reasonCode = reasonCode;
}
- BlockToMarkCorrupt(BlockInfo stored, String reason) {
- this(stored, stored, reason);
+ BlockToMarkCorrupt(BlockInfo stored, String reason, Reason reasonCode) {
+ this(stored, stored, reason, reasonCode);
}
- BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
- this(new BlockInfo(stored), stored, reason);
+ BlockToMarkCorrupt(BlockInfo stored, long gs, String reason,
+ Reason reasonCode) {
+ this(new BlockInfo(stored), stored, reason, reasonCode);
//the corrupted block in datanode has a different generation stamp
corrupted.setGenerationStamp(gs);
}
@@ -1946,9 +1954,11 @@ private BlockInfo processReportedBlock(final DatanodeDescriptor dn,
return storedBlock;
}
- //add replica if appropriate
+ // Add replica if appropriate. If the replica was previously corrupt
+ // but now okay, it might need to be updated.
if (reportedState == ReplicaState.FINALIZED
- && storedBlock.findDatanode(dn) < 0) {
+ && (storedBlock.findDatanode(dn) < 0
+ || corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
toAdd.add(storedBlock);
}
return storedBlock;
@@ -2039,12 +2049,13 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
return new BlockToMarkCorrupt(storedBlock, reportedGS,
"block is " + ucState + " and reported genstamp " + reportedGS
+ " does not match genstamp in block map "
- + storedBlock.getGenerationStamp());
+ + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else if (storedBlock.getNumBytes() != reported.getNumBytes()) {
return new BlockToMarkCorrupt(storedBlock,
"block is " + ucState + " and reported length " +
reported.getNumBytes() + " does not match " +
- "length in block map " + storedBlock.getNumBytes());
+ "length in block map " + storedBlock.getNumBytes(),
+ Reason.SIZE_MISMATCH);
} else {
return null; // not corrupt
}
@@ -2060,7 +2071,7 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
return new BlockToMarkCorrupt(storedBlock, reportedGS,
"reported " + reportedState + " replica with genstamp " + reportedGS
+ " does not match COMPLETE block's genstamp in block map "
- + storedBlock.getGenerationStamp());
+ + storedBlock.getGenerationStamp(), Reason.GENSTAMP_MISMATCH);
} else { // COMPLETE block, same genstamp
if (reportedState == ReplicaState.RBW) {
// If it's a RBW report for a COMPLETE block, it may just be that
@@ -2073,7 +2084,8 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
return null;
} else {
return new BlockToMarkCorrupt(storedBlock,
- "reported replica has invalid state " + reportedState);
+ "reported replica has invalid state " + reportedState,
+ Reason.INVALID_STATE);
}
}
case RUR: // should not be reported
@@ -2084,7 +2096,7 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
" on " + dn + " size " + storedBlock.getNumBytes();
// log here at WARN level since this is really a broken HDFS invariant
LOG.warn(msg);
- return new BlockToMarkCorrupt(storedBlock, msg);
+ return new BlockToMarkCorrupt(storedBlock, msg, Reason.INVALID_STATE);
}
}
@@ -2201,6 +2213,11 @@ private Block addStoredBlock(final BlockInfo block,
logAddStoredBlock(storedBlock, node);
}
} else {
+ // if the same block is added again and the replica was corrupt
+ // previously because of a wrong gen stamp, remove it from the
+ // corrupt block list.
+ corruptReplicas.removeFromCorruptReplicasMap(block, node,
+ Reason.GENSTAMP_MISMATCH);
curReplicaDelta = 0;
blockLog.warn("BLOCK* addStoredBlock: "
+ "Redundant addStoredBlock request received for " + storedBlock
@@ -2297,7 +2314,8 @@ private void invalidateCorruptReplicas(BlockInfo blk) {
DatanodeDescriptor[] nodesCopy = nodes.toArray(new DatanodeDescriptor[0]);
for (DatanodeDescriptor node : nodesCopy) {
try {
- if (!invalidateBlock(new BlockToMarkCorrupt(blk, null), node)) {
+ if (!invalidateBlock(new BlockToMarkCorrupt(blk, null,
+ Reason.ANY), node)) {
removedFromBlocksMap = false;
}
} catch (IOException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
index 5d9c39c16a0..1cef3ee9251 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
@@ -204,6 +204,7 @@ private void rescan() {
namesystem.writeLock();
try {
rescanCachedBlockMap();
+ blockManager.getDatanodeManager().resetLastCachingDirectiveSentTime();
} finally {
namesystem.writeUnlock();
}
@@ -316,17 +317,21 @@ private void rescanCachedBlockMap() {
int numCached = cached.size();
if (numCached >= neededCached) {
// If we have enough replicas, drop all pending cached.
- for (DatanodeDescriptor datanode : pendingCached) {
+ for (Iterator iter = pendingCached.iterator();
+ iter.hasNext(); ) {
+ DatanodeDescriptor datanode = iter.next();
datanode.getPendingCached().remove(cblock);
+ iter.remove();
}
- pendingCached.clear();
}
if (numCached < neededCached) {
// If we don't have enough replicas, drop all pending uncached.
- for (DatanodeDescriptor datanode : pendingUncached) {
+ for (Iterator iter = pendingUncached.iterator();
+ iter.hasNext(); ) {
+ DatanodeDescriptor datanode = iter.next();
datanode.getPendingUncached().remove(cblock);
+ iter.remove();
}
- pendingUncached.clear();
}
int neededUncached = numCached -
(pendingUncached.size() + neededCached);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
index 4613199ee6e..cb9f79ab448 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CorruptReplicasMap.java
@@ -36,8 +36,18 @@
@InterfaceAudience.Private
public class CorruptReplicasMap{
- private SortedMap> corruptReplicasMap =
- new TreeMap>();
+ /** The corruption reason code */
+ public static enum Reason {
+ NONE, // not specified.
+ ANY, // wildcard reason
+ GENSTAMP_MISMATCH, // mismatch in generation stamps
+ SIZE_MISMATCH, // mismatch in sizes
+ INVALID_STATE, // invalid state
+ CORRUPTION_REPORTED // client or datanode reported the corruption
+ }
+
+ private SortedMap> corruptReplicasMap =
+ new TreeMap>();
/**
* Mark the block belonging to datanode as corrupt.
@@ -48,9 +58,22 @@ public class CorruptReplicasMap{
*/
public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
String reason) {
- Collection nodes = getNodes(blk);
+ addToCorruptReplicasMap(blk, dn, reason, Reason.NONE);
+ }
+
+ /**
+ * Mark the block belonging to datanode as corrupt.
+ *
+ * @param blk Block to be added to CorruptReplicasMap
+ * @param dn DatanodeDescriptor which holds the corrupt replica
+ * @param reason a textual reason (for logging purposes)
+ * @param reasonCode the enum representation of the reason
+ */
+ public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
+ String reason, Reason reasonCode) {
+ Map nodes = corruptReplicasMap.get(blk);
if (nodes == null) {
- nodes = new TreeSet();
+ nodes = new HashMap();
corruptReplicasMap.put(blk, nodes);
}
@@ -61,8 +84,7 @@ public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
reasonText = "";
}
- if (!nodes.contains(dn)) {
- nodes.add(dn);
+ if (!nodes.keySet().contains(dn)) {
NameNode.blockStateChangeLog.info("BLOCK NameSystem.addToCorruptReplicasMap: "+
blk.getBlockName() +
" added as corrupt on " + dn +
@@ -76,6 +98,8 @@ public void addToCorruptReplicasMap(Block blk, DatanodeDescriptor dn,
" by " + Server.getRemoteIp() +
reasonText);
}
+ // Add the node or update the reason.
+ nodes.put(dn, reasonCode);
}
/**
@@ -97,10 +121,24 @@ void removeFromCorruptReplicasMap(Block blk) {
false if the replica is not in the map
*/
boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
- Collection datanodes = corruptReplicasMap.get(blk);
+ return removeFromCorruptReplicasMap(blk, datanode, Reason.ANY);
+ }
+
+ boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode,
+ Reason reason) {
+ Map datanodes = corruptReplicasMap.get(blk);
+ boolean removed = false;
if (datanodes==null)
return false;
- if (datanodes.remove(datanode)) { // remove the replicas
+
+ // if reasons can be compared but don't match, return false.
+ Reason storedReason = datanodes.get(datanode);
+ if (reason != Reason.ANY && storedReason != null &&
+ reason != storedReason) {
+ return false;
+ }
+
+ if (datanodes.remove(datanode) != null) { // remove the replicas
if (datanodes.isEmpty()) {
// remove the block if there is no more corrupted replicas
corruptReplicasMap.remove(blk);
@@ -118,7 +156,10 @@ boolean removeFromCorruptReplicasMap(Block blk, DatanodeDescriptor datanode) {
* @return collection of nodes. Null if does not exists
*/
Collection getNodes(Block blk) {
- return corruptReplicasMap.get(blk);
+ Map nodes = corruptReplicasMap.get(blk);
+ if (nodes == null)
+ return null;
+ return nodes.keySet();
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index d0aef08cb4b..5d64e0ca235 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -170,6 +170,21 @@ public CachedBlocksList getPendingUncached() {
return pendingUncached;
}
+ /**
+ * The time when the last batch of caching directives was sent, in
+ * monotonic milliseconds.
+ */
+ private long lastCachingDirectiveSentTimeMs;
+
+ /**
+ * Head of the list of blocks on the datanode
+ */
+ private volatile BlockInfo blockList = null;
+ /**
+ * Number of blocks on the datanode
+ */
+ private int numBlocks = 0;
+
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
public boolean isAlive = false;
@@ -661,5 +676,21 @@ public DatanodeStorageInfo updateStorage(DatanodeStorage s) {
return storage;
}
}
+
+ /**
+ * @return The time at which we last sent caching directives to this
+ * DataNode, in monotonic milliseconds.
+ */
+ public long getLastCachingDirectiveSentTimeMs() {
+ return this.lastCachingDirectiveSentTimeMs;
+ }
+
+ /**
+ * @param time The time at which we last sent caching directives to this
+ * DataNode, in monotonic milliseconds.
+ */
+ public void setLastCachingDirectiveSentTimeMs(long time) {
+ this.lastCachingDirectiveSentTimeMs = time;
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index f5560b680bc..3bb1f150bcb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -149,7 +149,7 @@ public class DatanodeManager {
* Whether we should tell datanodes what to cache in replies to
* heartbeat messages.
*/
- private boolean sendCachingCommands = false;
+ private boolean shouldSendCachingCommands = false;
/**
* The number of datanodes for each software version. This list should change
@@ -159,6 +159,16 @@ public class DatanodeManager {
private HashMap datanodesSoftwareVersions =
new HashMap(4, 0.75f);
+ /**
+ * The minimum time between resending caching directives to Datanodes,
+ * in milliseconds.
+ *
+ * Note that when a rescan happens, we will send the new directives
+ * as soon as possible. This timeout only applies to resending
+ * directives that we've already sent.
+ */
+ private final long timeBetweenResendingCachingDirectivesMs;
+
DatanodeManager(final BlockManager blockManager, final Namesystem namesystem,
final Configuration conf) throws IOException {
this.namesystem = namesystem;
@@ -241,6 +251,9 @@ public class DatanodeManager {
DFSConfigKeys.DFS_NAMENODE_USE_STALE_DATANODE_FOR_WRITE_RATIO_KEY +
" = '" + ratioUseStaleDataNodesForWrite + "' is invalid. " +
"It should be a positive non-zero float value, not greater than 1.0f.");
+ this.timeBetweenResendingCachingDirectivesMs = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS,
+ DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_RETRY_INTERVAL_MS_DEFAULT);
}
private static long getStaleIntervalFromConf(Configuration conf,
@@ -1297,17 +1310,28 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
- DatanodeCommand pendingCacheCommand =
- getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
- DatanodeProtocol.DNA_CACHE, blockPoolId);
- if (pendingCacheCommand != null) {
- cmds.add(pendingCacheCommand);
- }
- DatanodeCommand pendingUncacheCommand =
- getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
- DatanodeProtocol.DNA_UNCACHE, blockPoolId);
- if (pendingUncacheCommand != null) {
- cmds.add(pendingUncacheCommand);
+ boolean sendingCachingCommands = false;
+ long nowMs = Time.monotonicNow();
+ if (shouldSendCachingCommands &&
+ ((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
+ timeBetweenResendingCachingDirectivesMs)) {
+ DatanodeCommand pendingCacheCommand =
+ getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
+ DatanodeProtocol.DNA_CACHE, blockPoolId);
+ if (pendingCacheCommand != null) {
+ cmds.add(pendingCacheCommand);
+ sendingCachingCommands = true;
+ }
+ DatanodeCommand pendingUncacheCommand =
+ getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
+ DatanodeProtocol.DNA_UNCACHE, blockPoolId);
+ if (pendingUncacheCommand != null) {
+ cmds.add(pendingUncacheCommand);
+ sendingCachingCommands = true;
+ }
+ if (sendingCachingCommands) {
+ nodeinfo.setLastCachingDirectiveSentTimeMs(nowMs);
+ }
}
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
@@ -1345,19 +1369,13 @@ private DatanodeCommand getCacheCommand(CachedBlocksList list,
if (length == 0) {
return null;
}
- // Read and clear the existing cache commands.
+ // Read the existing cache commands.
long[] blockIds = new long[length];
int i = 0;
for (Iterator iter = list.iterator();
iter.hasNext(); ) {
CachedBlock cachedBlock = iter.next();
blockIds[i++] = cachedBlock.getBlockId();
- iter.remove();
- }
- if (!sendCachingCommands) {
- // Do not send caching commands unless the FSNamesystem told us we
- // should.
- return null;
}
return new BlockIdCommand(action, poolId, blockIds);
}
@@ -1408,13 +1426,25 @@ public void clearPendingQueues() {
}
}
+ /**
+ * Reset the lastCachingDirectiveSentTimeMs field of all the DataNodes we
+ * know about.
+ */
+ public void resetLastCachingDirectiveSentTime() {
+ synchronized (datanodeMap) {
+ for (DatanodeDescriptor dn : datanodeMap.values()) {
+ dn.setLastCachingDirectiveSentTimeMs(0L);
+ }
+ }
+ }
+
@Override
public String toString() {
return getClass().getSimpleName() + ": " + host2DatanodeMap;
}
- public void setSendCachingCommands(boolean sendCachingCommands) {
- this.sendCachingCommands = sendCachingCommands;
+ public void setShouldSendCachingCommands(boolean shouldSendCachingCommands) {
+ this.shouldSendCachingCommands = shouldSendCachingCommands;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
index 3e12168e22d..48ac9507b17 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
@@ -289,6 +289,10 @@ synchronized void cacheBlock(long blockId, String bpid,
mappableBlockMap.put(key, new Value(null, State.CACHING));
volumeExecutor.execute(
new CachingTask(key, blockFileName, length, genstamp));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Initiating caching for Block with id " + blockId +
+ ", pool " + bpid);
+ }
}
synchronized void uncacheBlock(String bpid, long blockId) {
@@ -427,6 +431,10 @@ public void run() {
mappableBlock.close();
}
numBlocksFailedToCache.incrementAndGet();
+
+ synchronized (FsDatasetCache.this) {
+ mappableBlockMap.remove(key);
+ }
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
index 09d2ed6d5e4..29d472335a4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/MappableBlock.java
@@ -44,20 +44,6 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class MappableBlock implements Closeable {
- public static interface Mlocker {
- void mlock(MappedByteBuffer mmap, long length) throws IOException;
- }
-
- private static class PosixMlocker implements Mlocker {
- public void mlock(MappedByteBuffer mmap, long length)
- throws IOException {
- NativeIO.POSIX.mlock(mmap, length);
- }
- }
-
- @VisibleForTesting
- public static Mlocker mlocker = new PosixMlocker();
-
private MappedByteBuffer mmap;
private final long length;
@@ -96,7 +82,7 @@ public static MappableBlock load(long length,
throw new IOException("Block InputStream has no FileChannel.");
}
mmap = blockChannel.map(MapMode.READ_ONLY, 0, length);
- mlocker.mlock(mmap, length);
+ NativeIO.POSIX.cacheManipulator.mlock(blockFileName, mmap, length);
verifyChecksum(length, metaIn, blockChannel, blockFileName);
mappableBlock = new MappableBlock(mmap, length);
} finally {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
new file mode 100644
index 00000000000..dab64ec769b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ContentSummaryComputationContext.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class ContentSummaryComputationContext {
+ private FSDirectory dir = null;
+ private FSNamesystem fsn = null;
+ private Content.Counts counts = null;
+ private long nextCountLimit = 0;
+ private long limitPerRun = 0;
+ private long yieldCount = 0;
+
+ /**
+ * Constructor
+ *
+ * @param dir The FSDirectory instance
+ * @param fsn The FSNamesystem instance
+ * @param limitPerRun allowed number of operations in one
+ * locking period. 0 or a negative number means
+ * no limit (i.e. no yielding)
+ */
+ public ContentSummaryComputationContext(FSDirectory dir,
+ FSNamesystem fsn, long limitPerRun) {
+ this.dir = dir;
+ this.fsn = fsn;
+ this.limitPerRun = limitPerRun;
+ this.nextCountLimit = limitPerRun;
+ this.counts = Content.Counts.newInstance();
+ }
+
+ /** Constructor for blocking computation. */
+ public ContentSummaryComputationContext() {
+ this(null, null, 0);
+ }
+
+ /** Return current yield count */
+ public long getYieldCount() {
+ return yieldCount;
+ }
+
+ /**
+ * Relinquish locks held during computation for a short while
+ * and reacquire them. This will give other threads a chance
+ * to acquire the contended locks and run.
+ *
+ * @return true if locks were released and reacquired.
+ */
+ public boolean yield() {
+ // Are we set up to do this?
+ if (limitPerRun <= 0 || dir == null || fsn == null) {
+ return false;
+ }
+
+ // Have we reached the limit?
+ long currentCount = counts.get(Content.FILE) +
+ counts.get(Content.SYMLINK) +
+ counts.get(Content.DIRECTORY) +
+ counts.get(Content.SNAPSHOTTABLE_DIRECTORY);
+ if (currentCount <= nextCountLimit) {
+ return false;
+ }
+
+ // Update the next limit
+ nextCountLimit = currentCount + limitPerRun;
+
+ boolean hadDirReadLock = dir.hasReadLock();
+ boolean hadDirWriteLock = dir.hasWriteLock();
+ boolean hadFsnReadLock = fsn.hasReadLock();
+ boolean hadFsnWriteLock = fsn.hasWriteLock();
+
+ // sanity check.
+ if (!hadDirReadLock || !hadFsnReadLock || hadDirWriteLock ||
+ hadFsnWriteLock || dir.getReadHoldCount() != 1 ||
+ fsn.getReadHoldCount() != 1) {
+ // cannot relinquish
+ return false;
+ }
+
+ // unlock
+ dir.readUnlock();
+ fsn.readUnlock();
+
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException ie) {
+ } finally {
+ // reacquire
+ fsn.readLock();
+ dir.readLock();
+ }
+ yieldCount++;
+ return true;
+ }
+
+ /** Get the content counts */
+ public Content.Counts getCounts() {
+ return counts;
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 5402b0478a9..bce528fe7af 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -114,7 +114,9 @@ private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) {
private final int maxComponentLength;
private final int maxDirItems;
private final int lsLimit; // max list limit
+ private final int contentCountLimit; // max content summary counts per run
private final INodeMap inodeMap; // Synchronized by dirLock
+ private long yieldCount = 0; // keep track of lock yield count.
// lock to protect the directory and BlockMap
private ReentrantReadWriteLock dirLock;
@@ -145,6 +147,14 @@ boolean hasReadLock() {
return this.dirLock.getReadHoldCount() > 0;
}
+ public int getReadHoldCount() {
+ return this.dirLock.getReadHoldCount();
+ }
+
+ public int getWriteHoldCount() {
+ return this.dirLock.getWriteHoldCount();
+ }
+
/**
* Caches frequently used file names used in {@link INode} to reuse
* byte[] objects and reduce heap usage.
@@ -161,6 +171,10 @@ boolean hasReadLock() {
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ?
configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
+
+ this.contentCountLimit = conf.getInt(
+ DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY,
+ DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_DEFAULT);
// filesystem limits
this.maxComponentLength = conf.getInt(
@@ -2296,13 +2310,26 @@ ContentSummary getContentSummary(String src)
throw new FileNotFoundException("File does not exist: " + srcs);
}
else {
- return targetNode.computeContentSummary();
+ // Make it relinquish locks everytime contentCountLimit entries are
+ // processed. 0 means disabled. I.e. blocking for the entire duration.
+ ContentSummaryComputationContext cscc =
+
+ new ContentSummaryComputationContext(this, getFSNamesystem(),
+ contentCountLimit);
+ ContentSummary cs = targetNode.computeAndConvertContentSummary(cscc);
+ yieldCount += cscc.getYieldCount();
+ return cs;
}
} finally {
readUnlock();
}
}
+ @VisibleForTesting
+ public long getYieldCount() {
+ return yieldCount;
+ }
+
public INodeMap getINodeMap() {
return inodeMap;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
index aaa82cfa870..ec11e5d8f61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
@@ -593,7 +593,7 @@ private long applyEditLogOp(FSEditLogOp op, FSDirectory fsDir,
fsNamesys.getSnapshotManager().deleteSnapshot(
deleteSnapshotOp.snapshotRoot, deleteSnapshotOp.snapshotName,
collectedBlocks, removedINodes);
- fsNamesys.removeBlocks(collectedBlocks);
+ fsNamesys.removeBlocksAndUpdateSafemodeTotal(collectedBlocks);
collectedBlocks.clear();
fsNamesys.dir.removeFromInodeMap(removedINodes);
removedINodes.clear();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index fddeb6f5ec3..160b8cc2d63 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -1009,7 +1009,7 @@ void startActiveServices() throws IOException {
nnEditLogRoller.start();
cacheManager.activate();
- blockManager.getDatanodeManager().setSendCachingCommands(true);
+ blockManager.getDatanodeManager().setShouldSendCachingCommands(true);
} finally {
writeUnlock();
startingActiveService = false;
@@ -1060,7 +1060,7 @@ void stopActiveServices() {
dir.fsImage.updateLastAppliedTxIdFromWritten();
}
cacheManager.deactivate();
- blockManager.getDatanodeManager().setSendCachingCommands(false);
+ blockManager.getDatanodeManager().setShouldSendCachingCommands(false);
} finally {
writeUnlock();
}
@@ -1297,6 +1297,14 @@ public boolean hasReadOrWriteLock() {
return hasReadLock() || hasWriteLock();
}
+ public int getReadHoldCount() {
+ return this.fsLock.getReadHoldCount();
+ }
+
+ public int getWriteHoldCount() {
+ return this.fsLock.getWriteHoldCount();
+ }
+
NamespaceInfo getNamespaceInfo() {
readLock();
try {
@@ -3305,6 +3313,18 @@ void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks,
return;
}
+ removeBlocksAndUpdateSafemodeTotal(blocks);
+ }
+
+ /**
+ * Removes the blocks from blocksmap and updates the safemode blocks total
+ *
+ * @param blocks
+ * An instance of {@link BlocksMapUpdateInfo} which contains a list
+ * of blocks that need to be removed from blocksMap
+ */
+ void removeBlocksAndUpdateSafemodeTotal(BlocksMapUpdateInfo blocks) {
+ assert hasWriteLock();
// In the case that we are a Standby tailing edits from the
// active while in safe-mode, we need to track the total number
// of blocks and safe blocks in the system.
@@ -3325,9 +3345,9 @@ void removePathAndBlocks(String src, BlocksMapUpdateInfo blocks,
}
if (trackBlockCounts) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Adjusting safe-mode totals for deletion of " + src + ":" +
- "decreasing safeBlocks by " + numRemovedSafe +
- ", totalBlocks by " + numRemovedComplete);
+ LOG.debug("Adjusting safe-mode totals for deletion."
+ + "decreasing safeBlocks by " + numRemovedSafe
+ + ", totalBlocks by " + numRemovedComplete);
}
adjustSafeModeBlockTotals(-numRemovedSafe, -numRemovedComplete);
}
@@ -5883,8 +5903,8 @@ private void updatePipelineInternal(String clientName, ExtendedBlock oldBlock,
}
// Update old block with the new generation stamp and new length
- blockinfo.setGenerationStamp(newBlock.getGenerationStamp());
blockinfo.setNumBytes(newBlock.getNumBytes());
+ blockinfo.setGenerationStampAndVerifyReplicas(newBlock.getGenerationStamp());
// find the DatanodeDescriptor objects
final DatanodeStorageInfo[] storages = blockManager.getDatanodeManager()
@@ -6953,6 +6973,7 @@ void deleteSnapshot(String snapshotRoot, String snapshotName)
return; // Return previous response
}
boolean success = false;
+ BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@@ -6961,7 +6982,6 @@ void deleteSnapshot(String snapshotRoot, String snapshotName)
checkOwner(pc, snapshotRoot);
}
- BlocksMapUpdateInfo collectedBlocks = new BlocksMapUpdateInfo();
List removedINodes = new ChunkedArrayList();
dir.writeLock();
try {
@@ -6972,8 +6992,6 @@ void deleteSnapshot(String snapshotRoot, String snapshotName)
dir.writeUnlock();
}
removedINodes.clear();
- this.removeBlocks(collectedBlocks);
- collectedBlocks.clear();
getEditLog().logDeleteSnapshot(snapshotRoot, snapshotName,
cacheEntry != null);
success = true;
@@ -6982,7 +7000,10 @@ void deleteSnapshot(String snapshotRoot, String snapshotName)
RetryCache.setState(cacheEntry, success);
}
getEditLog().logSync();
-
+
+ removeBlocks(collectedBlocks);
+ collectedBlocks.clear();
+
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
String rootPath = Snapshot.getSnapshotPath(snapshotRoot, snapshotName);
logAuditEvent(true, "deleteSnapshot", rootPath, null, null);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
index 977c801013e..1aff9784aba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INode.java
@@ -371,10 +371,18 @@ public abstract Quota.Counts cleanSubtree(final Snapshot snapshot,
public abstract void destroyAndCollectBlocks(
BlocksMapUpdateInfo collectedBlocks, List removedINodes);
- /** Compute {@link ContentSummary}. */
+ /** Compute {@link ContentSummary}. Blocking call */
public final ContentSummary computeContentSummary() {
- final Content.Counts counts = computeContentSummary(
- Content.Counts.newInstance());
+ return computeAndConvertContentSummary(
+ new ContentSummaryComputationContext());
+ }
+
+ /**
+ * Compute {@link ContentSummary}.
+ */
+ public final ContentSummary computeAndConvertContentSummary(
+ ContentSummaryComputationContext summary) {
+ Content.Counts counts = computeContentSummary(summary).getCounts();
return new ContentSummary(counts.get(Content.LENGTH),
counts.get(Content.FILE) + counts.get(Content.SYMLINK),
counts.get(Content.DIRECTORY), getNsQuota(),
@@ -384,10 +392,12 @@ public final ContentSummary computeContentSummary() {
/**
* Count subtree content summary with a {@link Content.Counts}.
*
- * @param counts The subtree counts for returning.
- * @return The same objects as the counts parameter.
+ * @param summary the context object holding counts for the subtree.
+ * @return The same objects as summary.
*/
- public abstract Content.Counts computeContentSummary(Content.Counts counts);
+ public abstract ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary);
+
/**
* Check and add namespace/diskspace consumed to itself and the ancestors.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
index 2460b687b26..cf9c232f7b9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectory.java
@@ -466,12 +466,45 @@ public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
}
@Override
- public Content.Counts computeContentSummary(final Content.Counts counts) {
- for (INode child : getChildrenList(null)) {
- child.computeContentSummary(counts);
+ public ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary) {
+ ReadOnlyList childrenList = getChildrenList(null);
+ // Explicit traversing is done to enable repositioning after relinquishing
+ // and reacquiring locks.
+ for (int i = 0; i < childrenList.size(); i++) {
+ INode child = childrenList.get(i);
+ byte[] childName = child.getLocalNameBytes();
+
+ long lastYieldCount = summary.getYieldCount();
+ child.computeContentSummary(summary);
+
+ // Check whether the computation was paused in the subtree.
+ // The counts may be off, but traversing the rest of children
+ // should be made safe.
+ if (lastYieldCount == summary.getYieldCount()) {
+ continue;
+ }
+
+ // The locks were released and reacquired. Check parent first.
+ if (getParent() == null) {
+ // Stop further counting and return whatever we have so far.
+ break;
+ }
+
+ // Obtain the children list again since it may have been modified.
+ childrenList = getChildrenList(null);
+ // Reposition in case the children list is changed. Decrement by 1
+ // since it will be incremented when loops.
+ i = nextChild(childrenList, childName) - 1;
}
- counts.add(Content.DIRECTORY, 1);
- return counts;
+
+ // Increment the directory count for this directory.
+ summary.getCounts().add(Content.DIRECTORY, 1);
+
+ // Relinquish and reacquire locks if necessary.
+ summary.yield();
+
+ return summary;
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
index f2b607ccaa7..e18420df4fb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeDirectoryWithQuota.java
@@ -107,12 +107,16 @@ public Quota.Counts computeQuotaUsage(Quota.Counts counts, boolean useCache,
}
@Override
- public Content.Counts computeContentSummary(
- final Content.Counts counts) {
- final long original = counts.get(Content.DISKSPACE);
- super.computeContentSummary(counts);
- checkDiskspace(counts.get(Content.DISKSPACE) - original);
- return counts;
+ public ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ final long original = summary.getCounts().get(Content.DISKSPACE);
+ long oldYieldCount = summary.getYieldCount();
+ super.computeContentSummary(summary);
+ // Check only when the content has not changed in the middle.
+ if (oldYieldCount == summary.getYieldCount()) {
+ checkDiskspace(summary.getCounts().get(Content.DISKSPACE) - original);
+ }
+ return summary;
}
private void checkDiskspace(final long computed) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
index 455d808a37f..1b6478d922f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
@@ -342,11 +342,11 @@ public final Quota.Counts computeQuotaUsage(Quota.Counts counts,
}
@Override
- public final Content.Counts computeContentSummary(
- final Content.Counts counts) {
- computeContentSummary4Snapshot(counts);
- computeContentSummary4Current(counts);
- return counts;
+ public final ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ computeContentSummary4Snapshot(summary.getCounts());
+ computeContentSummary4Current(summary.getCounts());
+ return summary;
}
private void computeContentSummary4Snapshot(final Content.Counts counts) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
index 3b7b4b6e42b..6b5b0f529a2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeMap.java
@@ -107,7 +107,8 @@ public Counts computeQuotaUsage(Counts counts, boolean useCache,
}
@Override
- public Content.Counts computeContentSummary(Content.Counts counts) {
+ public ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary) {
return null;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
index 1f4ff7de18d..a049c012917 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeReference.java
@@ -278,8 +278,9 @@ public void destroyAndCollectBlocks(
}
@Override
- public Content.Counts computeContentSummary(Content.Counts counts) {
- return referred.computeContentSummary(counts);
+ public ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary) {
+ return referred.computeContentSummary(summary);
}
@Override
@@ -444,12 +445,13 @@ public int getLastSnapshotId() {
}
@Override
- public final Content.Counts computeContentSummary(Content.Counts counts) {
+ public final ContentSummaryComputationContext computeContentSummary(
+ ContentSummaryComputationContext summary) {
//only count diskspace for WithName
final Quota.Counts q = Quota.Counts.newInstance();
computeQuotaUsage(q, false, lastSnapshotId);
- counts.add(Content.DISKSPACE, q.get(Quota.DISKSPACE));
- return counts;
+ summary.getCounts().add(Content.DISKSPACE, q.get(Quota.DISKSPACE));
+ return summary;
}
@Override
@@ -688,4 +690,4 @@ private Snapshot getSelfSnapshot(final Snapshot prior) {
}
}
}
-}
\ No newline at end of file
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
index ba41a2ecdb0..21a04a3b1ab 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeSymlink.java
@@ -98,9 +98,10 @@ public Quota.Counts computeQuotaUsage(Quota.Counts counts,
}
@Override
- public Content.Counts computeContentSummary(final Content.Counts counts) {
- counts.add(Content.SYMLINK, 1);
- return counts;
+ public ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ summary.getCounts().add(Content.SYMLINK, 1);
+ return summary;
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
index 75331615c37..1e2c5dd3db2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectorySnapshottable.java
@@ -38,6 +38,7 @@
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.hdfs.server.namenode.Content;
+import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodeFile;
@@ -342,11 +343,12 @@ Snapshot removeSnapshot(String snapshotName,
}
@Override
- public Content.Counts computeContentSummary(final Content.Counts counts) {
- super.computeContentSummary(counts);
- counts.add(Content.SNAPSHOT, snapshotsByNames.size());
- counts.add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
- return counts;
+ public ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ super.computeContentSummary(summary);
+ summary.getCounts().add(Content.SNAPSHOT, snapshotsByNames.size());
+ summary.getCounts().add(Content.SNAPSHOTTABLE_DIRECTORY, 1);
+ return summary;
}
/**
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
index dfbf22c7c05..296cd4c7698 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/INodeDirectoryWithSnapshot.java
@@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.hdfs.server.namenode.Content;
+import org.apache.hadoop.hdfs.server.namenode.ContentSummaryComputationContext;
import org.apache.hadoop.hdfs.server.namenode.FSImageSerialization;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
@@ -883,18 +884,27 @@ public Quota.Counts computeQuotaUsage4CurrentDirectory(Quota.Counts counts) {
}
@Override
- public Content.Counts computeContentSummary(final Content.Counts counts) {
- super.computeContentSummary(counts);
- computeContentSummary4Snapshot(counts);
- return counts;
+ public ContentSummaryComputationContext computeContentSummary(
+ final ContentSummaryComputationContext summary) {
+ // Snapshot summary calc won't be relinquishing locks in the middle.
+ // Do this first and handover to parent.
+ computeContentSummary4Snapshot(summary.getCounts());
+ super.computeContentSummary(summary);
+ return summary;
}
private void computeContentSummary4Snapshot(final Content.Counts counts) {
+ // Create a new blank summary context for blocking processing of subtree.
+ ContentSummaryComputationContext summary =
+ new ContentSummaryComputationContext();
for(DirectoryDiff d : diffs) {
for(INode deleted : d.getChildrenDiff().getList(ListType.DELETED)) {
- deleted.computeContentSummary(counts);
+ deleted.computeContentSummary(summary);
}
}
+ // Add the counts from deleted trees.
+ counts.add(summary.getCounts());
+ // Add the deleted directory count.
counts.add(Content.DIRECTORY, diffs.asList().size());
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
index 0685d860dd6..c420c551680 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
@@ -26,8 +26,8 @@
import java.io.PrintStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
+import java.net.URI;
import java.net.URL;
-import java.net.URLConnection;
import java.security.PrivilegedExceptionAction;
import java.util.Collection;
import java.util.Date;
@@ -47,11 +47,14 @@
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
import org.apache.hadoop.hdfs.web.HftpFileSystem;
+import org.apache.hadoop.hdfs.web.HsftpFileSystem;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.GenericOptionsParser;
@@ -142,11 +145,11 @@ public static void main(final String[] args) throws Exception {
// default to using the local file system
FileSystem local = FileSystem.getLocal(conf);
final Path tokenFile = new Path(local.getWorkingDirectory(), remaining[0]);
+ final URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
// Login the current user
UserGroupInformation.getCurrentUser().doAs(
new PrivilegedExceptionAction