Merge trunk to HDFS-4685.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-4685@1560768 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Chris Nauroth 2014-01-23 17:49:24 +00:00
commit 738b076cc6
58 changed files with 1552 additions and 611 deletions

View File

@ -421,6 +421,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-9420. Add percentile or max metric for rpcQueueTime, processing time.
(Liang Xie via wang)
HADOOP-10143 replace WritableFactories's hashmap with ConcurrentHashMap
(Liang Xie via stack)
OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)
@ -530,6 +533,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10235. Hadoop tarball has 2 versions of stax-api JARs. (tucu)
HADOOP-10252. HttpServer can't start if hostname is not specified. (Jimmy
Xiang via atm)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -455,7 +455,7 @@ public class HttpServer implements FilterContainer {
public HttpServer(String name, String bindAddress, int port,
boolean findPort, Configuration conf, AccessControlList adminsAcl,
Connector connector, String[] pathSpecs) throws IOException {
this(new Builder().setName(name)
this(new Builder().setName(name).hostName(bindAddress)
.addEndpoint(URI.create("http://" + bindAddress + ":" + port))
.setFindPort(findPort).setConf(conf).setACL(adminsAcl)
.setConnector(connector).setPathSpec(pathSpecs));

View File

@ -22,25 +22,26 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/** Factories for non-public writables. Defining a factory permits {@link
* ObjectWritable} to be able to construct instances of non-public classes. */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class WritableFactories {
private static final HashMap<Class, WritableFactory> CLASS_TO_FACTORY =
new HashMap<Class, WritableFactory>();
private static final Map<Class, WritableFactory> CLASS_TO_FACTORY =
new ConcurrentHashMap<Class, WritableFactory>();
private WritableFactories() {} // singleton
/** Define a factory for a class. */
public static synchronized void setFactory(Class c, WritableFactory factory) {
public static void setFactory(Class c, WritableFactory factory) {
CLASS_TO_FACTORY.put(c, factory);
}
/** Define a factory for a class. */
public static synchronized WritableFactory getFactory(Class c) {
public static WritableFactory getFactory(Class c) {
return CLASS_TO_FACTORY.get(c);
}

View File

@ -524,6 +524,17 @@ public class TestHttpServer extends HttpServerFunctionalTest {
Assert.assertFalse(HttpServer.isInstrumentationAccessAllowed(context, request, response));
}
@Test
@SuppressWarnings("deprecation")
public void testOldConstructor() throws Exception {
HttpServer server = new HttpServer("test", "0.0.0.0", 0, false);
try {
server.start();
} finally {
server.stop();
}
}
@Test public void testBindAddress() throws Exception {
checkBindAddress("localhost", 0, false).stop();
// hang onto this one for a bit more testing

View File

@ -554,6 +554,9 @@
<delete file="${httpfs.tomcat.dist.dir}/conf/server.xml"/>
<copy file="${basedir}/src/main/tomcat/server.xml"
toDir="${httpfs.tomcat.dist.dir}/conf"/>
<delete file="${httpfs.tomcat.dist.dir}/conf/ssl-server.xml"/>
<copy file="${basedir}/src/main/tomcat/ssl-server.xml"
toDir="${httpfs.tomcat.dist.dir}/conf"/>
<delete file="${httpfs.tomcat.dist.dir}/conf/logging.properties"/>
<copy file="${basedir}/src/main/tomcat/logging.properties"
toDir="${httpfs.tomcat.dist.dir}/conf"/>

View File

@ -39,3 +39,15 @@
# The hostname HttpFS server runs on
#
# export HTTPFS_HTTP_HOSTNAME=`hostname -f`
# Indicates if HttpFS is using SSL
#
# export HTTPFS_SSL_ENABLED=false
# The location of the SSL keystore if using SSL
#
# export HTTPFS_SSL_KEYSTORE_FILE=${HOME}/.keystore
# The password of the SSL keystore if using SSL
#
# export HTTPFS_SSL_KEYSTORE_PASS=password

View File

@ -243,7 +243,7 @@ public class HttpFSFileSystem extends FileSystem
if (makeQualified) {
path = makeQualified(path);
}
final URL url = HttpFSUtils.createHttpURL(path, params);
final URL url = HttpFSUtils.createURL(path, params);
return doAsRealUserIfNecessary(new Callable<HttpURLConnection>() {
@Override
public HttpURLConnection call() throws Exception {

View File

@ -123,7 +123,7 @@ public class HttpFSKerberosAuthenticator extends KerberosAuthenticator {
Map<String, String> params = new HashMap<String, String>();
params.put(OP_PARAM, op.toString());
params.put(RENEWER_PARAM,renewer);
URL url = HttpFSUtils.createHttpURL(new Path(fsURI), params);
URL url = HttpFSUtils.createURL(new Path(fsURI), params);
AuthenticatedURL aUrl =
new AuthenticatedURL(new HttpFSKerberosAuthenticator());
try {
@ -150,7 +150,7 @@ public class HttpFSKerberosAuthenticator extends KerberosAuthenticator {
params.put(OP_PARAM,
DelegationTokenOperation.RENEWDELEGATIONTOKEN.toString());
params.put(TOKEN_PARAM, dToken.encodeToUrlString());
URL url = HttpFSUtils.createHttpURL(new Path(fsURI), params);
URL url = HttpFSUtils.createURL(new Path(fsURI), params);
AuthenticatedURL aUrl =
new AuthenticatedURL(new HttpFSKerberosAuthenticator());
try {
@ -172,7 +172,7 @@ public class HttpFSKerberosAuthenticator extends KerberosAuthenticator {
params.put(OP_PARAM,
DelegationTokenOperation.CANCELDELEGATIONTOKEN.toString());
params.put(TOKEN_PARAM, dToken.encodeToUrlString());
URL url = HttpFSUtils.createHttpURL(new Path(fsURI), params);
URL url = HttpFSUtils.createURL(new Path(fsURI), params);
AuthenticatedURL aUrl =
new AuthenticatedURL(new HttpFSKerberosAuthenticator());
try {

View File

@ -55,17 +55,21 @@ public class HttpFSUtils {
*
* @return a <code>URL</code> for the HttpFSServer server,
*
* @throws IOException thrown if an IO error occurrs.
* @throws IOException thrown if an IO error occurs.
*/
static URL createHttpURL(Path path, Map<String, String> params)
static URL createURL(Path path, Map<String, String> params)
throws IOException {
URI uri = path.toUri();
String realScheme;
if (uri.getScheme().equalsIgnoreCase(HttpFSFileSystem.SCHEME)) {
realScheme = "http";
} else if (uri.getScheme().equalsIgnoreCase(HttpsFSFileSystem.SCHEME)) {
realScheme = "https";
} else {
throw new IllegalArgumentException(MessageFormat.format(
"Invalid scheme [{0}] it should be 'webhdfs'", uri));
"Invalid scheme [{0}] it should be '" + HttpFSFileSystem.SCHEME + "' " +
"or '" + HttpsFSFileSystem.SCHEME + "'", uri));
}
StringBuilder sb = new StringBuilder();
sb.append(realScheme).append("://").append(uri.getAuthority()).

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.http.client;
/**
* HttpFSServer implementation of the FileSystemAccess FileSystem for SSL.
* <p/>
* This implementation allows a user to access HDFS over HTTPS via a
* HttpFSServer server.
*/
public class HttpsFSFileSystem extends HttpFSFileSystem {
public static final String SCHEME = "swebhdfs";
@Override
public String getScheme() {
return SCHEME;
}
}

View File

@ -94,11 +94,11 @@ public class HttpFSServerWebApp extends ServerWebApp {
*/
@Override
public void init() throws ServerException {
super.init();
if (SERVER != null) {
throw new RuntimeException("HttpFSServer server already initialized");
}
SERVER = this;
super.init();
adminGroup = getConfig().get(getPrefixedName(CONF_ADMIN_GROUP), "admin");
LOG.info("Connects to Namenode [{}]",
get().get(FileSystemAccess.class).getFileSystemConfiguration().

View File

@ -29,30 +29,33 @@ import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdenti
public class DelegationTokenIdentifier
extends AbstractDelegationTokenIdentifier {
public static final Text KIND_NAME = WebHdfsFileSystem.TOKEN_KIND;
private Text kind = WebHdfsFileSystem.TOKEN_KIND;
public DelegationTokenIdentifier() {
public DelegationTokenIdentifier(Text kind) {
this.kind = kind;
}
/**
* Create a new delegation token identifier
*
* @param kind token kind
* @param owner the effective username of the token owner
* @param renewer the username of the renewer
* @param realUser the real username of the token owner
*/
public DelegationTokenIdentifier(Text owner, Text renewer, Text realUser) {
public DelegationTokenIdentifier(Text kind, Text owner, Text renewer,
Text realUser) {
super(owner, renewer, realUser);
this.kind = kind;
}
/**
* Returns the kind, <code>TOKEN_KIND</code>.
* @return returns <code>TOKEN_KIND</code>.
*/
@Override
public Text getKind() {
return KIND_NAME;
return kind;
}
}

View File

@ -19,6 +19,8 @@ package org.apache.hadoop.lib.service.security;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.lib.server.BaseService;
import org.apache.hadoop.lib.server.ServerException;
@ -55,6 +57,8 @@ public class DelegationTokenManagerService extends BaseService
DelegationTokenSecretManager secretManager = null;
private Text tokenKind;
public DelegationTokenManagerService() {
super(PREFIX);
}
@ -70,7 +74,9 @@ public class DelegationTokenManagerService extends BaseService
long updateInterval = getServiceConfig().getLong(UPDATE_INTERVAL, DAY);
long maxLifetime = getServiceConfig().getLong(MAX_LIFETIME, 7 * DAY);
long renewInterval = getServiceConfig().getLong(RENEW_INTERVAL, DAY);
secretManager = new DelegationTokenSecretManager(updateInterval,
tokenKind = (HttpFSServerWebApp.get().isSslEnabled())
? SWebHdfsFileSystem.TOKEN_KIND : WebHdfsFileSystem.TOKEN_KIND;
secretManager = new DelegationTokenSecretManager(tokenKind, updateInterval,
maxLifetime,
renewInterval, HOUR);
try {
@ -122,7 +128,7 @@ public class DelegationTokenManagerService extends BaseService
realUser = new Text(ugi.getRealUser().getUserName());
}
DelegationTokenIdentifier tokenIdentifier =
new DelegationTokenIdentifier(owner, new Text(renewer), realUser);
new DelegationTokenIdentifier(tokenKind, owner, new Text(renewer), realUser);
Token<DelegationTokenIdentifier> token =
new Token<DelegationTokenIdentifier>(tokenIdentifier, secretManager);
try {
@ -188,7 +194,7 @@ public class DelegationTokenManagerService extends BaseService
throws DelegationTokenManagerException {
ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier());
DataInputStream dis = new DataInputStream(buf);
DelegationTokenIdentifier id = new DelegationTokenIdentifier();
DelegationTokenIdentifier id = new DelegationTokenIdentifier(tokenKind);
try {
id.readFields(dis);
dis.close();
@ -203,6 +209,8 @@ public class DelegationTokenManagerService extends BaseService
private static class DelegationTokenSecretManager
extends AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {
private Text tokenKind;
/**
* Create a secret manager
*
@ -215,17 +223,18 @@ public class DelegationTokenManagerService extends BaseService
* scanned
* for expired tokens
*/
public DelegationTokenSecretManager(long delegationKeyUpdateInterval,
public DelegationTokenSecretManager(Text tokenKind, long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.tokenKind = tokenKind;
}
@Override
public DelegationTokenIdentifier createIdentifier() {
return new DelegationTokenIdentifier();
return new DelegationTokenIdentifier(tokenKind);
}
}

View File

@ -44,6 +44,7 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
private static final String TEMP_DIR = ".temp.dir";
private static final String HTTP_HOSTNAME = ".http.hostname";
private static final String HTTP_PORT = ".http.port";
public static final String SSL_ENABLED = ".ssl.enabled";
private static ThreadLocal<String> HOME_DIR_TL = new ThreadLocal<String>();
@ -225,4 +226,12 @@ public abstract class ServerWebApp extends Server implements ServletContextListe
public void setAuthority(InetSocketAddress authority) {
this.authority = authority;
}
/**
*
*/
public boolean isSslEnabled() {
return Boolean.valueOf(System.getProperty(getName() + SSL_ENABLED, "false"));
}
}

View File

@ -143,6 +143,27 @@ else
print "Using HTTPFS_HTTP_HOSTNAME: ${HTTPFS_HTTP_HOSTNAME}"
fi
if [ "${HTTPFS_SSL_ENABLED}" = "" ]; then
export HTTPFS_SSL_ENABLED="false"
print "Setting HTTPFS_SSL_ENABLED: ${HTTPFS_SSL_ENABLED}"
else
print "Using HTTPFS_SSL_ENABLED: ${HTTPFS_SSL_ENABLED}"
fi
if [ "${HTTPFS_SSL_KEYSTORE_FILE}" = "" ]; then
export HTTPFS_SSL_KEYSTORE_FILE=${HOME}/.keystore
print "Setting HTTPFS_SSL_KEYSTORE_FILE: ${HTTPFS_SSL_KEYSTORE_FILE}"
else
print "Using HTTPFS_SSL_KEYSTORE_FILE: ${HTTPFS_SSL_KEYSTORE_FILE}"
fi
if [ "${HTTPFS_SSL_KEYSTORE_PASS}" = "" ]; then
export HTTPFS_SSL_KEYSTORE_PASS=password
print "Setting HTTPFS_SSL_KEYSTORE_PASS: ${HTTPFS_SSL_KEYSTORE_PASS}"
else
print "Using HTTPFS_SSL_KEYSTORE_PASS: ${HTTPFS_SSL_KEYSTORE_PASS}"
fi
if [ "${CATALINA_BASE}" = "" ]; then
export CATALINA_BASE=${HTTPFS_HOME}/share/hadoop/httpfs/tomcat
print "Setting CATALINA_BASE: ${CATALINA_BASE}"

View File

@ -43,6 +43,9 @@ catalina_opts="${catalina_opts} -Dhttpfs.temp.dir=${HTTPFS_TEMP}";
catalina_opts="${catalina_opts} -Dhttpfs.admin.port=${HTTPFS_ADMIN_PORT}";
catalina_opts="${catalina_opts} -Dhttpfs.http.port=${HTTPFS_HTTP_PORT}";
catalina_opts="${catalina_opts} -Dhttpfs.http.hostname=${HTTPFS_HTTP_HOSTNAME}";
catalina_opts="${catalina_opts} -Dhttpfs.ssl.enabled=${HTTPFS_SSL_ENABLED}";
catalina_opts="${catalina_opts} -Dhttpfs.ssl.keystore.file=${HTTPFS_SSL_KEYSTORE_FILE}";
catalina_opts="${catalina_opts} -Dhttpfs.ssl.keystore.pass=${HTTPFS_SSL_KEYSTORE_PASS}";
print "Adding to CATALINA_OPTS: ${catalina_opts}"

View File

@ -0,0 +1,135 @@
<?xml version='1.0' encoding='utf-8'?>
<!--
All Rights Reserved.
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<!-- Note: A "Server" is not itself a "Container", so you may not
define subcomponents such as "Valves" at this level.
Documentation at /docs/config/server.html
-->
<Server port="${httpfs.admin.port}" shutdown="SHUTDOWN">
<!--APR library loader. Documentation at /docs/apr.html -->
<Listener className="org.apache.catalina.core.AprLifecycleListener"
SSLEngine="on"/>
<!--Initialize Jasper prior to webapps are loaded. Documentation at /docs/jasper-howto.html -->
<Listener className="org.apache.catalina.core.JasperListener"/>
<!-- Prevent memory leaks due to use of particular java/javax APIs-->
<Listener
className="org.apache.catalina.core.JreMemoryLeakPreventionListener"/>
<!-- JMX Support for the Tomcat server. Documentation at /docs/non-existent.html -->
<Listener className="org.apache.catalina.mbeans.ServerLifecycleListener"/>
<Listener
className="org.apache.catalina.mbeans.GlobalResourcesLifecycleListener"/>
<!-- Global JNDI resources
Documentation at /docs/jndi-resources-howto.html
-->
<GlobalNamingResources>
<!-- Editable user database that can also be used by
UserDatabaseRealm to authenticate users
-->
<Resource name="UserDatabase" auth="Container"
type="org.apache.catalina.UserDatabase"
description="User database that can be updated and saved"
factory="org.apache.catalina.users.MemoryUserDatabaseFactory"
pathname="conf/tomcat-users.xml"/>
</GlobalNamingResources>
<!-- A "Service" is a collection of one or more "Connectors" that share
a single "Container" Note: A "Service" is not itself a "Container",
so you may not define subcomponents such as "Valves" at this level.
Documentation at /docs/config/service.html
-->
<Service name="Catalina">
<!--The connectors can use a shared executor, you can define one or more named thread pools-->
<!--
<Executor name="tomcatThreadPool" namePrefix="catalina-exec-"
maxThreads="150" minSpareThreads="4"/>
-->
<!-- Define a SSL HTTP/1.1 Connector on port 8443
This connector uses the JSSE configuration, when using APR, the
connector should be using the OpenSSL style configuration
described in the APR documentation -->
<Connector port="${httpfs.http.port}" protocol="HTTP/1.1" SSLEnabled="true"
maxThreads="150" scheme="https" secure="true"
clientAuth="false" sslProtocol="TLS"
keystoreFile="${httpfs.ssl.keystore.file}"
keystorePass="${httpfs.ssl.keystore.pass}"/>
<!-- Define an AJP 1.3 Connector on port 8009 -->
<!-- An Engine represents the entry point (within Catalina) that processes
every request. The Engine implementation for Tomcat stand alone
analyzes the HTTP headers included with the request, and passes them
on to the appropriate Host (virtual host).
Documentation at /docs/config/engine.html -->
<!-- You should set jvmRoute to support load-balancing via AJP ie :
<Engine name="Catalina" defaultHost="localhost" jvmRoute="jvm1">
-->
<Engine name="Catalina" defaultHost="localhost">
<!--For clustering, please take a look at documentation at:
/docs/cluster-howto.html (simple how to)
/docs/config/cluster.html (reference documentation) -->
<!--
<Cluster className="org.apache.catalina.ha.tcp.SimpleTcpCluster"/>
-->
<!-- The request dumper valve dumps useful debugging information about
the request and response data received and sent by Tomcat.
Documentation at: /docs/config/valve.html -->
<!--
<Valve className="org.apache.catalina.valves.RequestDumperValve"/>
-->
<!-- This Realm uses the UserDatabase configured in the global JNDI
resources under the key "UserDatabase". Any edits
that are performed against this UserDatabase are immediately
available for use by the Realm. -->
<Realm className="org.apache.catalina.realm.UserDatabaseRealm"
resourceName="UserDatabase"/>
<!-- Define the default virtual host
Note: XML Schema validation will not work with Xerces 2.2.
-->
<Host name="localhost" appBase="webapps"
unpackWARs="true" autoDeploy="true"
xmlValidation="false" xmlNamespaceAware="false">
<!-- SingleSignOn valve, share authentication between web applications
Documentation at: /docs/config/valve.html -->
<!--
<Valve className="org.apache.catalina.authenticator.SingleSignOn" />
-->
<!-- Access log processes all example.
Documentation at: /docs/config/valve.html -->
<!--
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
prefix="localhost_access_log." suffix=".txt" pattern="common" resolveHosts="false"/>
-->
</Host>
</Engine>
</Service>
</Server>

View File

@ -118,4 +118,46 @@ Transfer-Encoding: chunked
HttpFS supports the following {{{./httpfs-default.html}configuration properties}}
in the HttpFS's <<<conf/httpfs-site.xml>>> configuration file.
* HttpFS over HTTPS (SSL)
To configure HttpFS to work over SSL edit the {{httpfs-env.sh}} script in the
configuration directory setting the {{HTTPFS_SSL_ENABLED}} to {{true}}.
In addition, the following 2 properties may be defined (shown with default
values):
* HTTPFS_SSL_KEYSTORE_FILE=${HOME}/.keystore
* HTTPFS_SSL_KEYSTORE_PASS=password
In the HttpFS <<<tomcat/conf>>> directory, replace the <<<server.xml>>> file
with the <<<ssl-server.xml>>> file.
You need to create an SSL certificate for the HttpFS server. As the
<<<httpfs>>> Unix user, using the Java <<<keytool>>> command to create the
SSL certificate:
+---+
$ keytool -genkey -alias tomcat -keyalg RSA
+---+
You will be asked a series of questions in an interactive prompt. It will
create the keystore file, which will be named <<.keystore>> and located in the
<<<httpfs>>> user home directory.
The password you enter for "keystore password" must match the value of the
<<<HTTPFS_SSL_KEYSTORE_PASS>>> environment variable set in the
<<<httpfs-env.sh>>> script in the configuration directory.
The answer to "What is your first and last name?" (i.e. "CN") must be the
hostname of the machine where the HttpFS Server will be running.
Start HttpFS. It should work over HTTPS.
Using the Hadoop <<<FileSystem>>> API or the Hadoop FS shell, use the
<<<swebhdfs://>>> scheme. Make sure the JVM is picking up the truststore
containing the public key of the SSL certificate if using a self-signed
certificate.
\[ {{{./index.html}Go Back}} \]

View File

@ -116,10 +116,14 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
return HttpFSFileSystem.class;
}
protected String getScheme() {
return "webhdfs";
}
protected FileSystem getHttpFSFileSystem() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.webhdfs.impl", getFileSystemClass().getName());
URI uri = new URI("webhdfs://" +
URI uri = new URI(getScheme() + "://" +
TestJettyHelper.getJettyURL().toURI().getAuthority());
return FileSystem.get(uri, conf);
}
@ -127,7 +131,7 @@ public abstract class BaseTestHttpFSWith extends HFSTestCase {
protected void testGet() throws Exception {
FileSystem fs = getHttpFSFileSystem();
Assert.assertNotNull(fs);
URI uri = new URI("webhdfs://" +
URI uri = new URI(getScheme() + "://" +
TestJettyHelper.getJettyURL().toURI().getAuthority());
Assert.assertEquals(fs.getUri(), uri);
fs.close();

View File

@ -0,0 +1,99 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.http.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
import org.apache.hadoop.test.TestJettyHelper;
import org.junit.AfterClass;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.net.URI;
import java.net.URL;
import java.util.UUID;
@RunWith(value = Parameterized.class)
public class TestHttpFSFWithSWebhdfsFileSystem
extends TestHttpFSWithHttpFSFileSystem {
private static String classpathDir;
private static final String BASEDIR = System.getProperty("test.build.dir",
"target/test-dir") + "/" + UUID.randomUUID();
private static Configuration sslConf;
{
URL url = Thread.currentThread().getContextClassLoader().
getResource("classutils.txt");
classpathDir = url.toExternalForm();
if (classpathDir.startsWith("file:")) {
classpathDir = classpathDir.substring("file:".length());
classpathDir = classpathDir.substring(0,
classpathDir.length() - "/classutils.txt".length());
} else {
throw new RuntimeException("Cannot find test classes dir");
}
File base = new File(BASEDIR);
FileUtil.fullyDelete(base);
base.mkdirs();
String keyStoreDir = new File(BASEDIR).getAbsolutePath();
try {
sslConf = new Configuration();
KeyStoreTestUtil.setupSSLConfig(keyStoreDir, classpathDir, sslConf, false);
} catch (Exception ex) {
throw new RuntimeException(ex);
}
jettyTestHelper = new TestJettyHelper("jks", keyStoreDir + "/serverKS.jks",
"serverP");
}
@AfterClass
public static void cleanUp() {
new File(classpathDir, "ssl-client.xml").delete();
new File(classpathDir, "ssl-server.xml").delete();
}
public TestHttpFSFWithSWebhdfsFileSystem(Operation operation) {
super(operation);
}
@Override
protected Class getFileSystemClass() {
return SWebHdfsFileSystem.class;
}
@Override
protected String getScheme() {
return "swebhdfs";
}
@Override
protected FileSystem getHttpFSFileSystem() throws Exception {
Configuration conf = new Configuration(sslConf);
conf.set("fs.swebhdfs.impl", getFileSystemClass().getName());
URI uri = new URI("swebhdfs://" +
TestJettyHelper.getJettyURL().toURI().getAuthority());
return FileSystem.get(uri, conf);
}
}

View File

@ -22,9 +22,13 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.http.client.HttpFSFileSystem;
import org.apache.hadoop.fs.http.client.HttpFSKerberosAuthenticator;
import org.apache.hadoop.fs.http.client.HttpFSKerberosAuthenticator.DelegationTokenOperation;
import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.lib.service.DelegationTokenIdentifier;
import org.apache.hadoop.lib.service.DelegationTokenManager;
import org.apache.hadoop.lib.service.DelegationTokenManagerException;
import org.apache.hadoop.lib.servlet.ServerWebApp;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.server.AuthenticationHandler;
@ -51,7 +55,24 @@ public class TestHttpFSKerberosAuthenticationHandler extends HFSTestCase {
@Test
@TestDir
public void testManagementOperations() throws Exception {
public void testManagementOperationsWebHdfsFileSystem() throws Exception {
testManagementOperations(WebHdfsFileSystem.TOKEN_KIND);
}
@Test
@TestDir
public void testManagementOperationsSWebHdfsFileSystem() throws Exception {
try {
System.setProperty(HttpFSServerWebApp.NAME +
ServerWebApp.SSL_ENABLED, "true");
testManagementOperations(SWebHdfsFileSystem.TOKEN_KIND);
} finally {
System.getProperties().remove(HttpFSServerWebApp.NAME +
ServerWebApp.SSL_ENABLED);
}
}
private void testManagementOperations(Text expectedTokenKind) throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
Configuration httpfsConf = new Configuration(false);
@ -67,8 +88,8 @@ public class TestHttpFSKerberosAuthenticationHandler extends HFSTestCase {
testNonManagementOperation(handler);
testManagementOperationErrors(handler);
testGetToken(handler, null);
testGetToken(handler, "foo");
testGetToken(handler, null, expectedTokenKind);
testGetToken(handler, "foo", expectedTokenKind);
testCancelToken(handler);
testRenewToken(handler);
@ -112,8 +133,8 @@ public class TestHttpFSKerberosAuthenticationHandler extends HFSTestCase {
Mockito.contains("requires SPNEGO"));
}
private void testGetToken(AuthenticationHandler handler, String renewer)
throws Exception {
private void testGetToken(AuthenticationHandler handler, String renewer,
Text expectedTokenKind) throws Exception {
DelegationTokenOperation op = DelegationTokenOperation.GETDELEGATIONTOKEN;
HttpServletRequest request = Mockito.mock(HttpServletRequest.class);
HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
@ -154,6 +175,7 @@ public class TestHttpFSKerberosAuthenticationHandler extends HFSTestCase {
Token<DelegationTokenIdentifier> dt = new Token<DelegationTokenIdentifier>();
dt.decodeFromUrlString(tokenStr);
HttpFSServerWebApp.get().get(DelegationTokenManager.class).verifyToken(dt);
Assert.assertEquals(expectedTokenKind, dt.getKind());
}
private void testCancelToken(AuthenticationHandler handler)

View File

@ -23,6 +23,9 @@ import org.apache.hadoop.fs.http.server.HttpFSServerWebApp;
import org.apache.hadoop.lib.server.Server;
import org.apache.hadoop.lib.service.DelegationTokenManager;
import org.apache.hadoop.lib.service.DelegationTokenManagerException;
import org.apache.hadoop.lib.service.hadoop.FileSystemAccessService;
import org.apache.hadoop.lib.service.instrumentation.InstrumentationService;
import org.apache.hadoop.lib.service.scheduler.SchedulerService;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.HTestCase;
@ -43,9 +46,12 @@ public class TestDelegationTokenManagerService extends HTestCase {
public void service() throws Exception {
String dir = TestDirHelper.getTestDir().getAbsolutePath();
Configuration conf = new Configuration(false);
conf.set("server.services", StringUtils.join(",",
Arrays.asList(DelegationTokenManagerService.class.getName())));
Server server = new Server("server", dir, dir, dir, dir, conf);
conf.set("httpfs.services", StringUtils.join(",",
Arrays.asList(InstrumentationService.class.getName(),
SchedulerService.class.getName(),
FileSystemAccessService.class.getName(),
DelegationTokenManagerService.class.getName())));
Server server = new HttpFSServerWebApp(dir, dir, dir, dir, conf);
server.init();
DelegationTokenManager tm = server.get(DelegationTokenManager.class);
Assert.assertNotNull(tm);

View File

@ -28,31 +28,46 @@ import org.junit.Test;
import org.junit.rules.MethodRule;
import org.junit.runners.model.FrameworkMethod;
import org.junit.runners.model.Statement;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.security.SslSocketConnector;
public class TestJettyHelper implements MethodRule {
private boolean ssl;
private String keyStoreType;
private String keyStore;
private String keyStorePassword;
private Server server;
@Test
public void dummy() {
public TestJettyHelper() {
this.ssl = false;
}
private static ThreadLocal<Server> TEST_SERVLET_TL = new InheritableThreadLocal<Server>();
public TestJettyHelper(String keyStoreType, String keyStore,
String keyStorePassword) {
ssl = true;
this.keyStoreType = keyStoreType;
this.keyStore = keyStore;
this.keyStorePassword = keyStorePassword;
}
private static ThreadLocal<TestJettyHelper> TEST_JETTY_TL =
new InheritableThreadLocal<TestJettyHelper>();
@Override
public Statement apply(final Statement statement, final FrameworkMethod frameworkMethod, final Object o) {
return new Statement() {
@Override
public void evaluate() throws Throwable {
Server server = null;
TestJetty testJetty = frameworkMethod.getAnnotation(TestJetty.class);
if (testJetty != null) {
server = createJettyServer();
}
try {
TEST_SERVLET_TL.set(server);
TEST_JETTY_TL.set(TestJettyHelper.this);
statement.evaluate();
} finally {
TEST_SERVLET_TL.remove();
TEST_JETTY_TL.remove();
if (server != null && server.isRunning()) {
try {
server.stop();
@ -73,8 +88,19 @@ public class TestJettyHelper implements MethodRule {
int port = ss.getLocalPort();
ss.close();
Server server = new Server(0);
server.getConnectors()[0].setHost(host);
server.getConnectors()[0].setPort(port);
if (!ssl) {
server.getConnectors()[0].setHost(host);
server.getConnectors()[0].setPort(port);
} else {
SslSocketConnector c = new SslSocketConnector();
c.setHost(host);
c.setPort(port);
c.setNeedClientAuth(false);
c.setKeystore(keyStore);
c.setKeystoreType(keyStoreType);
c.setKeyPassword(keyStorePassword);
server.setConnectors(new Connector[] {c});
}
return server;
} catch (Exception ex) {
throw new RuntimeException("Could not start embedded servlet container, " + ex.getMessage(), ex);
@ -109,11 +135,11 @@ public class TestJettyHelper implements MethodRule {
* @return a Jetty server ready to be configured and the started.
*/
public static Server getJettyServer() {
Server server = TEST_SERVLET_TL.get();
if (server == null) {
TestJettyHelper helper = TEST_JETTY_TL.get();
if (helper == null || helper.server == null) {
throw new IllegalStateException("This test does not use @TestJetty");
}
return server;
return helper.server;
}
/**
@ -123,12 +149,15 @@ public class TestJettyHelper implements MethodRule {
* @return the base URL (SCHEMA://HOST:PORT) of the test Jetty server.
*/
public static URL getJettyURL() {
Server server = TEST_SERVLET_TL.get();
if (server == null) {
TestJettyHelper helper = TEST_JETTY_TL.get();
if (helper == null || helper.server == null) {
throw new IllegalStateException("This test does not use @TestJetty");
}
try {
return new URL("http://" + server.getConnectors()[0].getHost() + ":" + server.getConnectors()[0].getPort());
String scheme = (helper.ssl) ? "https" : "http";
return new URL(scheme + "://" +
helper.server.getConnectors()[0].getHost() + ":" +
helper.server.getConnectors()[0].getPort());
} catch (MalformedURLException ex) {
throw new RuntimeException("It should never happen, " + ex.getMessage(), ex);
}

View File

@ -120,94 +120,9 @@ Trunk (Unreleased)
HDFS-5041. Add the time of last heartbeat to dead server Web UI (Shinichi
Yamashita via brandonli)
HDFS-5049. Add JNI mlock support. (Andrew Wang via Colin Patrick McCabe)
HDFS-5051. Propagate cache status information from the DataNode to the
NameNode (Andrew Wang via Colin Patrick McCabe)
HDFS-5052. Add cacheRequest/uncacheRequest support to NameNode.
(contributed by Colin Patrick McCabe)
HDFS-5050. Add DataNode support for mlock and munlock
(Andrew Wang via Colin Patrick McCabe)
HDFS-5141. Add cache status information to datanode heartbeat.
(Contributed by Andrew Wang)
HDFS-5121. Add RPCs for creating and manipulating cache pools.
(Contributed by Colin Patrick McCabe)
HDFS-5163. Miscellaneous cache pool RPC fixes. (Contributed by Colin
Patrick McCabe)
HDFS-5120. Add command-line support for manipulating cache pools.
(Contributed by Colin Patrick McCabe)
HDFS-5158. Add command-line support for manipulating cache directives.
(Contributed by Colin Patrick McCabe)
HDFS-5053. NameNode should invoke DataNode APIs to coordinate caching.
(Andrew Wang)
HDFS-5197. Document dfs.cachereport.intervalMsec in hdfs-default.xml.
(cnauroth)
HDFS-5213. Separate PathBasedCacheEntry and PathBasedCacheDirectiveWithId.
(Contributed by Colin Patrick McCabe)
HDFS-5236. Change PathBasedCacheDirective APIs to be a single value
rather than batch. (Contributed by Andrew Wang)
HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more
intuitive. (Contributed by Colin Patrick McCabe)
HDFS-5119. Persist CacheManager state in the edit log.
(Contributed by Andrew Wang)
HDFS-5190. Move cache pool related CLI commands to CacheAdmin.
(Contributed by Andrew Wang)
HDFS-5304. Expose if a block replica is cached in getFileBlockLocations.
(Contributed by Andrew Wang)
HDFS-5224. Refactor PathBasedCache* methods to use a Path rather than a
String. (cnauroth)
HDFS-5358. Add replication field to PathBasedCacheDirective.
(Contributed by Colin Patrick McCabe)
HDFS-5359. Allow LightWeightGSet#Iterator to remove elements.
(Contributed by Colin Patrick McCabe)
HDFS-5096. Automatically cache new data added to a cached path.
(Contributed by Colin Patrick McCabe)
HDFS-5378. In CacheReport, don't send genstamp and length on the wire
(Contributed by Colin Patrick McCabe)
HDFS-5386. Add feature documentation for datanode caching.
(Colin Patrick McCabe via cnauroth)
HDFS-5326. add modifyDirective to cacheAdmin. (cmccabe)
HDFS-5450. Better API for getting the cached blocks locations. (wang)
HDFS-5485. Add command-line support for modifyDirective. (cmccabe)
HDFS-5366. recaching improvements (cmccabe)
HDFS-5511. improve CacheManipulator interface to allow better unit testing
(cmccabe)
HDFS-5451. Add byte and file statistics to PathBasedCacheEntry.
(Colin Patrick McCabe via Andrew Wang)
HDFS-5531. Combine the getNsQuota() and getDsQuota() methods in INode.
(szetszwo)
HDFS-5473. Consistent naming of user-visible caching classes and methods
(cmccabe)
HDFS-5285. Flatten INodeFile hierarchy: Replace INodeFileUnderConstruction
and INodeFileUnderConstructionWithSnapshot with FileUnderContructionFeature.
(jing9 via szetszwo)
@ -215,15 +130,8 @@ Trunk (Unreleased)
HDFS-5286. Flatten INodeDirectory hierarchy: Replace INodeDirectoryWithQuota
with DirectoryWithQuotaFeature. (szetszwo)
HDFS-5556. Add some more NameNode cache statistics, cache pool stats
(cmccabe)
HDFS-5537. Remove FileWithSnapshot interface. (jing9 via szetszwo)
HDFS-5430. Support TTL on CacheDirectives. (wang)
HDFS-5630. Hook up cache directive and pool usage statistics. (wang)
HDFS-5554. Flatten INodeFile hierarchy: Replace INodeFileWithSnapshot with
FileWithSnapshotFeature. (jing9 via szetszwo)
@ -234,14 +142,6 @@ Trunk (Unreleased)
INodeDirectoryWithSnapshot with DirectoryWithSnapshotFeature.
(jing9 via szetszwo)
HDFS-5431. Support cachepool-based limit management in path-based caching
(awang via cmccabe)
HDFS-5636. Enforce a max TTL per cache pool. (awang via cmccabe)
HDFS-5651. Remove dfs.namenode.caching.enabled and improve CRM locking.
(cmccabe via wang)
HDFS-5715. Use Snapshot ID to indicate the corresponding Snapshot for a
FileDiff/DirectoryDiff. (jing9)
@ -250,11 +150,6 @@ Trunk (Unreleased)
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
HDFS-5665. Remove the unnecessary writeLock while initializing CacheManager
in FsNameSystem Ctor. (Uma Maheswara Rao G via Andrew Wang)
BUG FIXES
HADOOP-9635 Fix potential Stack Overflow in DomainSocket.c (V. Karthik Kumar
@ -372,110 +267,12 @@ Trunk (Unreleased)
HDFS-4366. Block Replication Policy Implementation May Skip Higher-Priority
Blocks for Lower-Priority Blocks (Derek Dagit via kihwal)
HDFS-5169. hdfs.c: translateZCRException: null pointer deref when
translating some exceptions. (Contributed by Colin Patrick McCabe)
HDFS-5198. NameNodeRpcServer must not send back DNA_FINALIZE in reply to a
cache report. (Contributed by Colin Patrick McCabe)
HDFS-5195. Prevent passing null pointer to mlock and munlock. (cnauroth)
HDFS-5201. NativeIO: consolidate getrlimit into NativeIO#getMemlockLimit
(Contributed by Colin Patrick McCabe)
HDFS-5210. Fix some failing unit tests on HDFS-4949 branch.
(Contributed by Andrew Wang)
HDFS-5266. ElasticByteBufferPool#Key does not implement equals. (cnauroth)
HDFS-5309. Fix failing caching unit tests. (Andrew Wang)
HDFS-5314. Do not expose CachePool type in AddCachePoolOp (Colin Patrick
McCabe)
HDFS-5348. Fix error message when dfs.datanode.max.locked.memory is
improperly configured. (Colin Patrick McCabe)
HDFS-5373. hdfs cacheadmin -addDirective short usage does not mention
-replication parameter. (cnauroth)
HDFS-5383. fix broken caching unit tests. (Andrew Wang)
HDFS-5388. Loading fsimage fails to find cache pools during namenode
startup. (Chris Nauroth via Colin Patrick McCabe)
HDFS-5203. Concurrent clients that add a cache directive on the same path
may prematurely uncache from each other. (Chris Nauroth via Colin Patrick
McCabe)
HDFS-5385. Caching RPCs are AtMostOnce, but do not persist client ID and
call ID to edit log. (Chris Nauroth via Colin Patrick McCabe)
HDFS-5404. Resolve regressions in Windows compatibility on HDFS-4949
branch. (Chris Nauroth via Andrew Wang)
HDFS-5405. Fix possible RetryCache hang for caching RPC handlers in
FSNamesystem. (wang)
HDFS-5419. Fixup test-patch.sh warnings on HDFS-4949 branch. (wang)
HDFS-5468. CacheAdmin help command does not recognize commands (Stephen
Chu via Colin Patrick McCabe)
HDFS-5394. Fix race conditions in DN caching and uncaching (cmccabe)
HDFS-5482. DistributedFileSystem#listPathBasedCacheDirectives must support
relative paths. (Colin Patrick McCabe via cnauroth)
HDFS-5320. Add datanode caching metrics. (wang)
HDFS-5520. loading cache path directives from edit log doesn't update
nextEntryId (cmccabe)
HDFS-5512. CacheAdmin -listPools fails with NPE when user lacks permissions
to view all pools (wang via cmccabe)
HDFS-5513. CacheAdmin commands fail when using . as the path. (wang)
HDFS-5543. Fix narrow race condition in TestPathBasedCacheRequests
(cmccabe)
HDFS-5565. CacheAdmin help should match against non-dashed commands
(wang via cmccabe)
HDFS-5562. TestCacheDirectives and TestFsDatasetCache should stub out
native mlock. (Colin McCabe and Akira Ajisaka via wang)
HDFS-5555. CacheAdmin commands fail when first listed NameNode is in
Standby (jxiang via cmccabe)
HDFS-5626. dfsadmin -report shows incorrect cache values. (cmccabe)
HDFS-5679. TestCacheDirectives should handle the case where native code
is not available. (wang)
HDFS-5701. Fix the CacheAdmin -addPool -maxTtl option name.
(Stephen Chu via wang)
HDFS-5708. The CacheManager throws a NPE in the DataNode logs when
processing cache reports that refer to a block not known to the
BlockManager. (cmccabe via wang)
HDFS-5659. dfsadmin -report doesn't output cache information properly.
(wang)
HDFS-5705. TestSecondaryNameNodeUpgrade#testChangeNsIDFails may fail due
to ConcurrentModificationException. (Ted Yu via brandonli)
HDFS-5719. FSImage#doRollback() should close prevState before return
(Ted Yu via brandonli)
HDFS-5589. Namenode loops caching and uncaching when data should be
uncached (awang via cmccabe)
HDFS-5724. modifyCacheDirective logging audit log command wrongly as
addCacheDirective (Uma Maheswara Rao G via Colin Patrick McCabe)
HDFS-5726. Fix compilation error in AbstractINodeDiff for JDK7. (jing9)
HDFS-5768. Consolidate the serialization code in DelegationTokenSecretManager
@ -524,6 +321,10 @@ Release 2.4.0 - UNRELEASED
HDFS-5784. reserve space in edit log header and fsimage header for feature
flag section (cmccabe)
HDFS-5703. Add support for HTTPS and swebhdfs to HttpFS. (tucu)
HDFS-4949. Centralized cache management in HDFS. (wang and cmccabe)
IMPROVEMENTS
HDFS-5267. Remove volatile from LightWeightHashSet. (Junping Du via llu)
@ -695,6 +496,12 @@ Release 2.4.0 - UNRELEASED
HDFS-5704. Change OP_UPDATE_BLOCKS with a new OP_ADD_BLOCK. (jing9)
HDFS-5434. Change block placement policy constructors from package private
to protected. (Buddy Taylor via Arpit Agarwal)
HDFS-5788. listLocatedStatus response can be very large. (Nathan Roberts
via kihwal)
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
@ -704,6 +511,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5681. renewLease should not hold fsn write lock. (daryn via Kihwal)
HDFS-5241. Provide alternate queuing audit logger to reduce logging
contention (daryn)
BUG FIXES
HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin
@ -778,6 +588,12 @@ Release 2.4.0 - UNRELEASED
HDFS-5800. Fix a typo in DFSClient.renewLease(). (Kousuke Saruta
via szetszwo)
HDFS-5748. Too much information shown in the dfs health page.
(Haohui Mai via brandonli)
HDFS-5806. balancer should set SoTimeout to avoid indefinite hangs.
(Nathan Roberts via Andrew Wang).
BREAKDOWN OF HDFS-2832 SUBTASKS AND RELATED JIRAS
HDFS-4985. Add storage type to the protocol and expose it in block report
@ -911,6 +727,211 @@ Release 2.4.0 - UNRELEASED
HDFS-5667. Include DatanodeStorage in StorageReport. (Arpit Agarwal)
BREAKDOWN OF HDFS-4949 SUBTASKS AND RELATED JIRAS
HDFS-5049. Add JNI mlock support. (Andrew Wang via Colin Patrick McCabe)
HDFS-5051. Propagate cache status information from the DataNode to the
NameNode (Andrew Wang via Colin Patrick McCabe)
HDFS-5052. Add cacheRequest/uncacheRequest support to NameNode.
(Contributed by Colin Patrick McCabe.)
HDFS-5050. Add DataNode support for mlock and munlock (contributed by
Andrew Wang)
HDFS-5141. Add cache status information to datanode heartbeat. (Contributed
by Andrew Wang)
HDFS-5121. Add RPCs for creating and manipulating cache pools.
(Contributed by Colin Patrick McCabe)
HDFS-5163. Miscellaneous cache pool RPC fixes (Contributed by Colin Patrick
McCabe)
HDFS-5169. hdfs.c: translateZCRException: null pointer deref when
translating some exceptions (Contributed by Colin Patrick McCabe)
HDFS-5120. Add command-line support for manipulating cache pools. (cmccabe)
HDFS-5158. Add command-line support for manipulating cache directives.
(cmccabe)
HDFS-5198. NameNodeRpcServer must not send back DNA_FINALIZE in reply to a
cache report. (cmccabe)
HDFS-5195. Prevent passing null pointer to mlock and munlock. Contributed
by Chris Nauroth.
HDFS-5053. NameNode should invoke DataNode APIs to coordinate caching.
(Andrew Wang)
HDFS-5201. NativeIO: consolidate getrlimit into NativeIO#getMemlockLimit.
(Contributed by Colin Patrick McCabe)
HDFS-5197. Document dfs.cachereport.intervalMsec in hdfs-default.xml.
Contributed by Chris Nauroth.
HDFS-5210. Fix some failing unit tests on HDFS-4949 branch. (Contributed by
Andrew Wang)
HDFS-5213. Separate PathBasedCacheEntry and PathBasedCacheDirectiveWithId.
Contributed by Colin Patrick McCabe.
HDFS-5236. Change PathBasedCacheDirective APIs to be a single value rather
than batch. (Contributed by Andrew Wang)
HDFS-5119. Persist CacheManager state in the edit log. (Contributed by
Andrew Wang)
HDFS-5190. Move cache pool related CLI commands to CacheAdmin. (Contributed
by Andrew Wang)
HDFS-5309. Fix failing caching unit tests. (Andrew Wang)
HDFS-5314. Do not expose CachePool type in AddCachePoolOp (Colin Patrick
McCabe)
HDFS-5304. Expose if a block replica is cached in getFileBlockLocations.
(Contributed by Andrew Wang)
HDFS-5224. Refactor PathBasedCache* methods to use a Path rather than a
String. Contributed by Chris Nauroth.
HDFS-5348. Fix error message when dfs.datanode.max.locked.memory is
improperly configured. (Contributed by Colin Patrick McCabe)
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only (cmccabe)
HDFS-5358. Add replication field to PathBasedCacheDirective. (Contributed
by Colin Patrick McCabe)
HDFS-5359. Allow LightWeightGSet#Iterator to remove elements. (Contributed
by Colin Patrick McCabe)
HDFS-5373. hdfs cacheadmin -addDirective short usage does not mention
-replication parameter. Contributed by Chris Nauroth.
HDFS-5096. Automatically cache new data added to a cached path (contributed
by Colin Patrick McCabe)
HDFS-5383. fix broken caching unit tests (Andrew Wang)
HDFS-5388. Loading fsimage fails to find cache pools during namenode
startup (Chris Nauroth via Colin Patrick McCabe)
HDFS-5203. Concurrent clients that add a cache directive on the same path
may prematurely uncache each other. (Chris Nauroth via Colin Patrick McCabe)
HDFS-5378. In CacheReport, don't send genstamp and length on the wire
(Contributed by Colin Patrick McCabe)
HDFS-5385. Caching RPCs are AtMostOnce, but do not persist client ID and
call ID to edit log. (Chris Nauroth via Colin Patrick McCabe)
HDFS-5404 Resolve regressions in Windows compatibility on HDFS-4949 branch.
Contributed by Chris Nauroth.
HDFS-5405. Fix possible RetryCache hang for caching RPC handlers in
FSNamesystem. (Contributed by Andrew Wang)
HDFS-5419. Fixup test-patch.sh warnings on HDFS-4949 branch. (wang)
HDFS-5386. Add feature documentation for datanode caching. Contributed by
Colin Patrick McCabe.
HDFS-5468. CacheAdmin help command does not recognize commands (Stephen
Chu via Colin Patrick McCabe)
HDFS-5326. add modifyDirective to cacheAdmin (cmccabe)
HDFS-5394: Fix race conditions in DN caching and uncaching (cmccabe)
HDFS-5320. Add datanode caching metrics. Contributed by Andrew Wang.
HDFS-5482. DistributedFileSystem#listPathBasedCacheDirectives must support
relative paths. Contributed by Colin Patrick McCabe.
HDFS-5471. CacheAdmin -listPools fails when user lacks permissions to view
all pools (Andrew Wang via Colin Patrick McCabe)
HDFS-5450. better API for getting the cached blocks locations. Contributed
by Andrew Wang.
HDFS-5485. add command-line support for modifyDirective (cmccabe)
HDFS-5366. recaching improvements (cmccabe)
HDFS-5520. loading cache path directives from edit log doesnt update
nextEntryId (cmccabe)
HDFS-5512. CacheAdmin -listPools fails with NPE when user lacks permissions
to view all pools (awang via cmccabe)
HDFS-5513. CacheAdmin commands fail when using . as the path. Contributed
by Andrew Wang.
HDFS-5511. improve CacheManipulator interface to allow better unit testing
(cmccabe)
HDFS-5451. Add byte and file statistics to PathBasedCacheEntry. Contributed
by Colin Patrick McCabe.
HDFS-5473. Consistent naming of user-visible caching classes and methods
(cmccabe)
HDFS-5543. Fix narrow race condition in TestPathBasedCacheRequests
(cmccabe)
HDFS-5565. CacheAdmin help should match against non-dashed commands (wang
via cmccabe)
HDFS-5556. Add some more NameNode cache statistics, cache pool stats
(cmccabe)
HDFS-5562. TestCacheDirectives and TestFsDatasetCache should stub out
native mlock. Contributed by Colin Patrick McCabe and Akira Ajisaka.
HDFS-5430. Support TTL on CacheDirectives. Contributed by Andrew Wang.
HDFS-5555. CacheAdmin commands fail when first listed NameNode is in
Standby (jxiang via cmccabe)
HDFS-5626. dfsadmin report shows incorrect values (cmccabe)
HDFS-5630. Hook up cache directive and pool usage statistics. (wang)
HDFS-5665. Remove the unnecessary writeLock while initializing CacheManager
in FsNameSystem Ctor. (Uma Maheswara Rao G via Andrew Wang)
HDFS-5431. Support cachepool-based limit management in path-based caching.
(awang via cmccabe)
HDFS-5679. TestCacheDirectives should handle the case where native code is
not available. (wang)
HDFS-5636. Enforce a max TTL per cache pool (awang via cmccabe)
HDFS-5701. Fix the CacheAdmin -addPool -maxTtl option name. Contributed by
Stephen Chu.
HDFS-5708. The CacheManager throws a NPE in the DataNode logs when
processing cache reports that refer to a block not known to the BlockManager.
Contributed by Colin Patrick McCabe.
HDFS-5659. dfsadmin -report doesn't output cache information properly.
Contributed by Andrew Wang.
HDFS-5651. Remove dfs.namenode.caching.enabled and improve CRM locking.
Contributed by Colin Patrick McCabe.
HDFS-5589. Namenode loops caching and uncaching when data should be
uncached. (awang via cmccabe)
HDFS-5724. modifyCacheDirective logging audit log command wrongly as
addCacheDirective (Uma Maheswara Rao G via Colin Patrick McCabe)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES
@ -1111,6 +1132,8 @@ Release 2.3.0 - UNRELEASED
HDFS-5649. Unregister NFS and Mount service when NFS gateway is shutting down.
(brandonli)
HDFS-5789. Some of snapshot APIs missing checkOperation double check in fsn. (umamahesh)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES
@ -1278,7 +1301,7 @@ Release 2.1.1-beta - 2013-09-23
HDFS-5091. Support for spnego keytab separate from the JournalNode keytab
for secure HA. (jing9)
HDFS-5051. nn fails to download checkpointed image from snn in some
HDFS-5055. nn fails to download checkpointed image from snn in some
setups. (Vinay and suresh via suresh)
HDFS-4898. BlockPlacementPolicyWithNodeGroup.chooseRemoteRack() fails to

View File

@ -304,6 +304,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME = "default";
public static final String DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY = "dfs.namenode.audit.log.token.tracking.id";
public static final boolean DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT = false;
public static final String DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY = "dfs.namenode.audit.log.async";
public static final boolean DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT = false;
// Much code in hdfs is not yet updated to use these keys.
public static final String DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY = "dfs.client.block.write.locateFollowingBlock.retries";

View File

@ -337,6 +337,7 @@ public class Balancer {
sock.connect(
NetUtils.createSocketAddr(target.datanode.getXferAddr()),
HdfsServerConstants.READ_TIMEOUT);
sock.setSoTimeout(HdfsServerConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
OutputStream unbufOut = sock.getOutputStream();

View File

@ -79,7 +79,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
*/
protected int tolerateHeartbeatMultiplier;
BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
protected BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
initialize(conf, stats, clusterMap);
}

View File

@ -46,12 +46,12 @@ import org.apache.hadoop.net.NodeBase;
*/
public class BlockPlacementPolicyWithNodeGroup extends BlockPlacementPolicyDefault {
BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
protected BlockPlacementPolicyWithNodeGroup(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
initialize(conf, stats, clusterMap);
}
BlockPlacementPolicyWithNodeGroup() {
protected BlockPlacementPolicyWithNodeGroup() {
}
@Override

View File

@ -176,7 +176,6 @@ public class FSDirectory implements Closeable {
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
this.lsLimit = configuredLimit>0 ?
configuredLimit : DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT;
this.contentCountLimit = conf.getInt(
DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_KEY,
DFSConfigKeys.DFS_CONTENT_SUMMARY_LIMIT_DEFAULT);
@ -1503,6 +1502,11 @@ public class FSDirectory implements Closeable {
/**
* Get a partial listing of the indicated directory
*
* We will stop when any of the following conditions is met:
* 1) this.lsLimit files have been added
* 2) needLocation is true AND enough files have been added such
* that at least this.lsLimit block locations are in the response
*
* @param src the directory name
* @param startAfter the name to start listing after
* @param needLocation if block locations are returned
@ -1534,14 +1538,30 @@ public class FSDirectory implements Closeable {
int startChild = INodeDirectory.nextChild(contents, startAfter);
int totalNumChildren = contents.size();
int numOfListing = Math.min(totalNumChildren-startChild, this.lsLimit);
int locationBudget = this.lsLimit;
int listingCnt = 0;
HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
for (int i=0; i<numOfListing; i++) {
for (int i=0; i<numOfListing && locationBudget>0; i++) {
INode cur = contents.get(startChild+i);
listing[i] = createFileStatus(cur.getLocalNameBytes(), cur,
needLocation, snapshot);
listingCnt++;
if (needLocation) {
// Once we hit lsLimit locations, stop.
// This helps to prevent excessively large response payloads.
// Approximate #locations with locatedBlockCount() * repl_factor
LocatedBlocks blks =
((HdfsLocatedFileStatus)listing[i]).getBlockLocations();
locationBudget -= (blks == null) ? 0 :
blks.locatedBlockCount() * listing[i].getReplication();
}
}
// truncate return array if necessary
if (listingCnt < numOfListing) {
listing = Arrays.copyOf(listing, listingCnt);
}
return new DirectoryListing(
listing, totalNumChildren-startChild-numOfListing);
listing, totalNumChildren-startChild-listingCnt);
} finally {
readUnlock();
}

View File

@ -38,6 +38,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECI
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOGGERS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_TOKEN_TRACKING_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_DEFAULT_AUDIT_LOGGER_NAME;
@ -122,6 +124,7 @@ import javax.management.StandardMBean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@ -252,6 +255,9 @@ import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
import org.apache.log4j.Logger;
import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
@ -656,6 +662,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache)
throws IOException {
if (conf.getBoolean(DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY,
DFS_NAMENODE_AUDIT_LOG_ASYNC_DEFAULT)) {
LOG.info("Enabling async auditlog");
enableAsyncAuditLog();
}
boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true);
LOG.info("fsLock is fair:" + fair);
fsLock = new FSNamesystemLock(fair);
@ -6796,6 +6807,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** Allow snapshot on a directroy. */
void allowSnapshot(String path) throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -6821,6 +6833,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
/** Disallow snapshot on a directory. */
void disallowSnapshot(String path) throws SafeModeException, IOException {
checkOperation(OperationCategory.WRITE);
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -6944,6 +6957,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException {
SnapshottableDirectoryStatus[] status = null;
checkOperation(OperationCategory.READ);
final FSPermissionChecker checker = getPermissionChecker();
readLock();
try {
@ -6977,6 +6991,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
SnapshotDiffReport getSnapshotDiffReport(String path,
String fromSnapshot, String toSnapshot) throws IOException {
SnapshotDiffInfo diffs = null;
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
@ -7498,5 +7513,26 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
auditLog.info(message);
}
}
private static void enableAsyncAuditLog() {
if (!(auditLog instanceof Log4JLogger)) {
LOG.warn("Log4j is required to enable async auditlog");
return;
}
Logger logger = ((Log4JLogger)auditLog).getLogger();
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
// failsafe against trying to async it more than once
if (!appenders.isEmpty() && !(appenders.get(0) instanceof AsyncAppender)) {
AsyncAppender asyncAppender = new AsyncAppender();
// change logger to have an async appender containing all the
// previously configured appenders
for (Appender appender : appenders) {
logger.removeAppender(appender);
asyncAppender.addAppender(appender);
}
logger.addAppender(asyncAppender);
}
}
}

View File

@ -23,25 +23,53 @@
<title>Namenode information</title>
</head>
<body>
<header class="navbar navbar-inverse bs-docs-nav" role="banner">
<div class="container">
<div class="alert alert-danger" id="alert-panel" style="display:none">
<button type="button" class="close" onclick="$('#alert-panel').hide();">&times;</button>
<div class="alert-body" id="alert-panel-body"></div>
<div class="navbar-header">
<a href="http://hadoop.apache.org/core" class="navbar-brand">Hadoop</a>
</div>
<ul class="nav navbar-nav" id="ui-tabs">
<li><a href="#tab-overview">Overview</a></li>
<li><a href="#tab-datanode">Datanodes</a></li>
<li><a href="#tab-snapshot">Snapshot</a></li>
<li><a href="#tab-startup-progress">Startup Progress</a></li>
<li class="dropdown">
<a href="#" class="dropdown-toggle" data-toggle="dropdown">Utilities <b class="caret"></b></a>
<ul class="dropdown-menu">
<li><a href="explorer.html">Browse the file system</a></li>
<li><a href="logs">Logs</a></li>
</ul>
</li>
</ul>
</div>
<div id="panel"></div>
</header>
<div class="container">
<div id="alert-panel">
<div class="alert alert-danger">
<button type="button" class="close" onclick="$('#alert-panel').hide();">&times;</button>
<div class="alert-body" id="alert-panel-body"></div>
</div>
</div>
<div class="tab-content">
<div class="tab-pane" id="tab-overview"></div>
<div class="tab-pane" id="tab-datanode"></div>
<div class="tab-pane" id="tab-snapshot"></div>
<div class="tab-pane" id="tab-startup-progress"></div>
</div>
<div class="row">
<hr />
<div class="col-xs-2"><p><a href="http://hadoop.apache.org/core">Hadoop</a>, 2013.</p></div>
<div class="col-xs-1 pull-right"><a style="color: #ddd" href="dfshealth.jsp">Legacy UI</a></div>
<hr />
<div class="col-xs-2"><p><a href="http://hadoop.apache.org/core">Hadoop</a>, 2014.</p></div>
<div class="col-xs-1 pull-right"><a style="color: #ddd" href="dfshealth.jsp">Legacy UI</a></div>
</div>
</div>
<script type="text/x-dust-template" id="tmpl-dfshealth">
<div class="page-header">
{#nnstat}
<h1>NameNode '{HostAndPort}' ({State})</h1>
{/nnstat}
</div>
{#nn}
{@if cond="{DistinctVersionCount} > 1"}
@ -71,238 +99,193 @@
{/if}
{/nn}
<div class="panel panel-success">
<div class="panel-heading">Overview</div>
<div class="panel-body">
{#nn}
<table class="table table-bordered">
<tr><th>Started:</th><td>{NNStarted}</td></tr>
<tr><th>Version:</th><td>{Version}</td></tr>
<tr><th>Compiled:</th><td>{CompileInfo}</td></tr>
<tr><th>Cluster ID:</th><td>{ClusterId}</td></tr>
<tr><th>Block Pool ID:</th><td>{BlockPoolId}</td></tr>
</table>
{/nn}
</div>
</div>
<div class="page-header"><h1>Overview {#nnstat}<small>'{HostAndPort}' ({State})</small>{/nnstat}</h1></div>
{#nn}
<table class="table table-bordered table-striped">
<tr><th>Started:</th><td>{NNStarted}</td></tr>
<tr><th>Version:</th><td>{Version}</td></tr>
<tr><th>Compiled:</th><td>{CompileInfo}</td></tr>
<tr><th>Cluster ID:</th><td>{ClusterId}</td></tr>
<tr><th>Block Pool ID:</th><td>{BlockPoolId}</td></tr>
</table>
{/nn}
<p><a href="explorer.html">Browse the filesystem</a></p>
<p><a href="/logs/">NameNode Logs</a></p>
<div class="page-header"><h1>Summary</h1></div>
<p>
Security is {#nnstat}{#SecurityEnabled}on{:else}off{/SecurityEnabled}{/nnstat}.</p>
<p>{#nn}{#Safemode}{.}{:else}Safemode is off.{/Safemode}{/nn}</p>
<hr/>
<p>
{#fs}
{FilesTotal} files and directories, {BlocksTotal} blocks = {@math key="{FilesTotal}" method="add" operand="{BlocksTotal}"/} total filesystem object(s).
{#helper_fs_max_objects/}
{/fs}
</p>
{#mem.HeapMemoryUsage}
<p>Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Heap Memory. Max Heap Memory is {max|fmt_bytes}. </p>
{/mem.HeapMemoryUsage}
<div class="panel panel-success">
<div class="panel-heading">Cluster Summary</div>
<div class="panel-body">
{#mem.NonHeapMemoryUsage}
<p>Non Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Commited Non Heap Memory. Max Non Heap Memory is {max|fmt_bytes}. </p>
{/mem.NonHeapMemoryUsage}
<p>
Security is {#nnstat}{#SecurityEnabled}on{:else}off{/SecurityEnabled}{/nnstat}.</p>
<p>{#nn}{#Safemode}{.}{:else}Safemode is off.{/Safemode}{/nn}</p>
<p>
{#fs}
{FilesTotal} files and directories, {BlocksTotal} blocks = {@math key="{FilesTotal}" method="add" operand="{BlocksTotal}"/} total filesystem object(s).
{#helper_fs_max_objects/}
{/fs}
</p>
{#mem.HeapMemoryUsage}
<p>Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Heap Memory. Max Heap Memory is {max|fmt_bytes}. </p>
{/mem.HeapMemoryUsage}
{#mem.NonHeapMemoryUsage}
<p>Non Heap Memory used {used|fmt_bytes} of {committed|fmt_bytes} Commited Non Heap Memory. Max Non Heap Memory is {max|fmt_bytes}. </p>
{/mem.NonHeapMemoryUsage}
{#nn}
<table class="table table-bordered table-striped">
<tr><th> Configured Capacity:</th><td>{Total|fmt_bytes}</td></tr>
<tr><th> DFS Used:</th><td>{Used|fmt_bytes}</td></tr>
<tr><th> Non DFS Used:</th><td>{NonDfsUsedSpace|fmt_bytes}</td></tr>
<tr><th> DFS Remaining:</th><td>{Free|fmt_bytes}</td></tr>
<tr><th> DFS Used%:</th><td>{PercentUsed|fmt_percentage}</td></tr>
<tr><th> DFS Remaining%:</th><td>{PercentRemaining|fmt_percentage}</td></tr>
<tr><th> Block Pool Used:</th><td>{BlockPoolUsedSpace|fmt_bytes}</td></tr>
<tr><th> Block Pool Used%:</th><td>{PercentBlockPoolUsed|fmt_percentage}</td></tr>
<tr><th> DataNodes usages% (Min/Median/Max/stdDev): </th>
{#nn}
<table class="table table-bordered table-striped">
<tr><th> Configured Capacity:</th><td>{Total|fmt_bytes}</td></tr>
<tr><th> DFS Used:</th><td>{Used|fmt_bytes}</td></tr>
<tr><th> Non DFS Used:</th><td>{NonDfsUsedSpace|fmt_bytes}</td></tr>
<tr><th> DFS Remaining:</th><td>{Free|fmt_bytes}</td></tr>
<tr><th> DFS Used%:</th><td>{PercentUsed|fmt_percentage}</td></tr>
<tr><th> DFS Remaining%:</th><td>{PercentRemaining|fmt_percentage}</td></tr>
<tr><th> Block Pool Used:</th><td>{BlockPoolUsedSpace|fmt_bytes}</td></tr>
<tr><th> Block Pool Used%:</th><td>{PercentBlockPoolUsed|fmt_percentage}</td></tr>
<tr><th> DataNodes usages% (Min/Median/Max/stdDev): </th>
<td>{#NodeUsage.nodeUsage}{min} / {median} / {max} / {stdDev}{/NodeUsage.nodeUsage}</td></tr>
{/nn}
{/nn}
{#fs}
<tr><th><a href="#nodelist-operation">Live Nodes</a></th><td>{NumLiveDataNodes} (Decommissioned: {NumDecomLiveDataNodes})</td></tr>
<tr><th><a href="#nodelist-operation">Dead Nodes</a></th><td>{NumDeadDataNodes} (Decommissioned: {NumDecomDeadDataNodes})</td></tr>
<tr><th><a href="#nodelist-decom">Decommissioning Nodes</a></th><td>{NumDecommissioningDataNodes}</td></tr>
<tr><th title="Excludes missing blocks.">Number of Under-Replicated Blocks</th><td>{UnderReplicatedBlocks}</td></tr>
{/fs}
</table>
</div>
</div>
{#fs}
<tr><th><a href="#tab-datanode">Live Nodes</a></th><td>{NumLiveDataNodes} (Decommissioned: {NumDecomLiveDataNodes})</td></tr>
<tr><th><a href="#tab-datanode">Dead Nodes</a></th><td>{NumDeadDataNodes} (Decommissioned: {NumDecomDeadDataNodes})</td></tr>
<tr><th><a href="#tab-datanode">Decommissioning Nodes</a></th><td>{NumDecommissioningDataNodes}</td></tr>
<tr><th title="Excludes missing blocks.">Number of Under-Replicated Blocks</th><td>{UnderReplicatedBlocks}</td></tr>
{/fs}
</table>
<hr/>
<div class="panel panel-success">
<div class="panel-heading">NameNode Journal Status</div>
<div class="panel-body">
<p><b>Current transaction ID:</b> {nn.JournalTransactionInfo.LastAppliedOrWrittenTxId}</p>
<table class="table" title="NameNode Journals">
<thead>
<div class="page-header"><h1>Namenode Journal Status</h1></div>
<p><b>Current transaction ID:</b> {nn.JournalTransactionInfo.LastAppliedOrWrittenTxId}</p>
<table class="table" title="NameNode Journals">
<thead>
<tr><th>Journal Manager</th><th>State</th></tr>
</thead>
<tbody>
</thead>
<tbody>
{#nn.NameJournalStatus}
<tr><td>{manager}</td><td>{stream}</td></tr>
{/nn.NameJournalStatus}
</tbody>
</table>
</div>
</div>
</tbody>
</table>
<hr/>
<div class="panel panel-success">
<div class="panel-heading">NameNode Storage</div>
<div class="panel-body">
<table class="table" title="NameNode Storage">
<thead><tr><td><b>Storage Directory</b></td><td><b>Type</b></td><td><b>State</b></td></tr></thead>
{#nn.NameDirStatuses}
{#active}{#helper_dir_status type="Active"/}{/active}
{#failed}{#helper_dir_status type="Failed"/}{/failed}
{/nn.NameDirStatuses}
</table>
</div>
</div>
<hr/>
<div class="page-header"><h1>NameNode Storage</h1></div>
<table class="table" title="NameNode Storage">
<thead><tr><td><b>Storage Directory</b></td><td><b>Type</b></td><td><b>State</b></td></tr></thead>
{#nn.NameDirStatuses}
{#active}{#helper_dir_status type="Active"/}{/active}
{#failed}{#helper_dir_status type="Failed"/}{/failed}
{/nn.NameDirStatuses}
</table>
</script>
<div class="panel panel-success">
<div class="panel-heading">Snapshot Summary</div>
<div class="panel-body">
{#fs.SnapshotStats}
<table class="table" title="Snapshot Summary">
<thead><tr><td><b>Snapshottable directories</b></td>
<td><b>Snapshotted directories</b></td></tr>
</thead>
<tbody>
<tr>
<td>{SnapshottableDirectories}</td>
<td>{Snapshots}</td>
</tr>
</tbody>
</table>
{/fs.SnapshotStats}
</div>
</div>
<hr/>
<script type="text/x-dust-template" id="tmpl-snapshot">
<div class="page-header"><h1>Snapshot Summary</h1></div>
<p><b>Snapshottable directories</b>: {SnapshottableDirectories}</p>
<p><b>Snapshotted directories</b>: {Snapshots}</p>
</script>
{#startup}
<div class="panel panel-success">
<div class="panel-heading">Startup Progress</div>
<div class="panel-body">
<p>Elapsed Time: {elapsedTime|fmt_time}, Percent Complete: {percentComplete|fmt_percentage}</p>
<table class="table">
<thead>
<tr>
<script type="text/x-dust-template" id="tmpl-datanode">
<div class="page-header"><h1>Datanode Information</h1></div>
<div class="page-header"><h1><small>In operation</small></h1></div>
<small>
<table class="table">
<thead>
<tr>
<th>Node</th>
<th>Last contact</th>
<th>Admin State</th>
<th>Capacity</th>
<th>Used</th>
<th>Non DFS Used</th>
<th>Remaining</th>
<th>Blocks</th>
<th>Block pool used</th>
<th>Failed Volumes</th>
<th>Version</th>
</tr>
</thead>
{#LiveNodes}
<tr>
<td>{name} ({xferaddr})</td>
<td>{lastContact}</td>
<td>{adminState}</td>
<td>{capacity|fmt_bytes}</td>
<td>{used|fmt_bytes}</td>
<td>{nonDfsUsedSpace|fmt_bytes}</td>
<td>{remaining|fmt_bytes}</td>
<td>{numBlocks}</td>
<td>{blockPoolUsed|fmt_bytes} ({blockPoolUsedPercent|fmt_percentage})</td>
<td>{volfails}</td>
<td>{version}</td>
</tr>
{/LiveNodes}
{#DeadNodes}
<tr class="danger">
<td>{name} ({xferaddr})</td>
<td>{lastContact}</td>
<td>Dead{?decomissioned}, Decomissioned{/decomissioned}</td>
<td>-</td>
<td>-</td>
<td>-</td>
<td>-</td>
<td>-</td>
<td>-</td>
<td>-</td>
<td>-</td>
</tr>
{/DeadNodes}
</table>
</small>
<div class="page-header"><h1><small>Decomissioning</small></h1></div>
<small>
<table class="table">
<thead>
<tr>
<th>Node</th>
<th>Last contact</th>
<th>Under replicated blocks</th>
<th>Blocks with no live replicas</th>
<th>Under Replicated Blocks <br/>In files under construction</th>
</tr>
</thead>
{#DecomNodes}
<tr>
<td>{name} ({xferaddr})</td>
<td>{lastContact}</td>
<td>{underReplicatedBlocks}</td>
<td>{decommissionOnlyReplicas}</td>
<td>{underReplicateInOpenFiles}</td>
</tr>
{/DecomNodes}
</table>
</small>
</script>
<script type="text/x-dust-template" id="tmpl-startup-progress">
<div class="page-header"><h1>Startup Progress</h1></div>
<p>Elapsed Time: {elapsedTime|fmt_time}, Percent Complete: {percentComplete|fmt_percentage}</p>
<table class="table">
<thead>
<tr class="active">
<th>Phase</th>
<th>Completion</th>
<th>Elapsed Time</th>
<th style="text-align:center">Completion</th>
<th style="text-align:center">Elapsed Time</th>
</tr>
</thead>
<tbody>
</thead>
<tbody>
{#phases}
<tr class="phase">
<td class="startupdesc">{desc} {file} {size|fmt_bytes}</td>
<td>{percentComplete|fmt_percentage}</td>
<td>{elapsedTime|fmt_time}</td>
<td style="text-align:center">{percentComplete|fmt_percentage}</td>
<td style="text-align:center">{elapsedTime|fmt_time}</td>
</tr>
{#steps root_file=file}
<tr class="step">
<td class="startupdesc">{stepDesc} {stepFile} {stepSize|fmt_bytes} ({count}/{total})</td>
<td>{percentComplete|fmt_percentage}</td>
<td style="text-align:center">{percentComplete|fmt_percentage}</td>
<td></td>
</tr>
{/steps}
{/phases}
</table>
</div>
</div>
{/startup}
<hr/>
<div class="panel panel-success">
<div class="panel-heading">Datanode Information</div>
<div class="panel-body">
<div class="panel panel-default" id="nodelist-operation">
<div class="panel-heading">Nodes in operation</div>
<div class="panel-body">
<table class="table">
<thead>
<tr>
<th>Node</th>
<th>Last contact</th>
<th>Admin State</th>
<th>Capacity</th>
<th>Used</th>
<th>Non DFS Used</th>
<th>Remaining</th>
<th>Blocks</th>
<th>Block pool used</th>
<th>Failed Volumes</th>
</tr>
</thead>
{#nn.LiveNodes}
<tr>
<td>{name} ({xferaddr})</td>
<td>{lastContact}</td>
<td>{adminState}</td>
<td>{capacity|fmt_bytes}</td>
<td>{used|fmt_bytes}</td>
<td>{nonDfsUsedSpace|fmt_bytes}</td>
<td>{remaining|fmt_bytes}</td>
<td>{numBlocks}</td>
<td>{blockPoolUsed|fmt_bytes} ({blockPoolUsedPercent|fmt_percentage})</td>
<td>{volfails}</td>
</tr>
{/nn.LiveNodes}
{#nn.DeadNodes}
<tr class="danger">
<td>{name} ({xferaddr})</td>
<td>{lastContact}</td>
<td>Dead{?decomissioned}, Decomissioned{/decomissioned}</td>
<td>-</td>
<td>-</td>
<td>-</td>
<td>-</td>
<td>-</td>
<td>-</td>
<td>-</td>
</tr>
{/nn.DeadNodes}
</table>
</div>
</div>
<div class="panel panel-default" id="nodelist-decom">
<div class="panel-heading">Nodes being decomissioned</div>
<div class="panel-body">
<table class="table">
<thead>
<tr>
<th>Node</th>
<th>Last contact</th>
<th>Under replicated blocks</th>
<th>Blocks with no live replicas</th>
<th>Under Replicated Blocks <br/>In files under construction</th>
</tr>
</thead>
{#nn.DecomNodes}
<tr>
<td>{name} ({xferaddr})</td>
<td>{lastContact}</td>
<td>{underReplicatedBlocks}</td>
<td>{decommissionOnlyReplicas}</td>
<td>{underReplicateInOpenFiles}</td>
</tr>
{/nn.DecomNodes}
</table>
</div>
</div>
</div>
</div>
</tbody>
</table>
</script>
<script type="text/javascript" src="/static/jquery-1.10.2.min.js">

View File

@ -18,10 +18,20 @@
(function () {
"use strict";
var data = {};
dust.loadSource(dust.compile($('#tmpl-dfshealth').html(), 'dfshealth'));
dust.loadSource(dust.compile($('#tmpl-startup-progress').html(), 'startup-progress'));
dust.loadSource(dust.compile($('#tmpl-datanode').html(), 'datanode-info'));
dust.loadSource(dust.compile($('#tmpl-snapshot').html(), 'snapshot-info'));
function render() {
var helpers = {
function load_overview() {
var BEANS = [
{"name": "nn", "url": "/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"},
{"name": "nnstat", "url": "/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"},
{"name": "fs", "url": "/jmx?qry=Hadoop:service=NameNode,name=FSNamesystemState"},
{"name": "mem", "url": "/jmx?qry=java.lang:type=Memory"},
];
var HELPERS = {
'helper_fs_max_objects': function (chunk, ctx, bodies, params) {
var o = ctx.current();
if (o.MaxObjects > 0) {
@ -37,35 +47,53 @@
}
};
var base = dust.makeBase(helpers);
var data = {};
dust.loadSource(dust.compile($('#tmpl-dfshealth').html(), 'dfshealth'));
dust.render('dfshealth', base.push(data), function(err, out) {
$('#panel').html(out);
});
}
var BEANS = [
{"name": "nn", "url": "/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo"},
{"name": "nnstat", "url": "/jmx?qry=Hadoop:service=NameNode,name=NameNodeStatus"},
{"name": "fs", "url": "/jmx?qry=Hadoop:service=NameNode,name=FSNamesystemState"},
{"name": "mem", "url": "/jmx?qry=java.lang:type=Memory"},
{"name": "startup", "url": "/startupProgress"}
];
// Workarounds for the fact that JMXJsonServlet returns non-standard JSON strings
function data_workaround(d) {
function node_map_to_array(nodes) {
var res = [];
for (var n in nodes) {
var p = nodes[n];
p.name = n;
res.push(p);
}
return res;
// Workarounds for the fact that JMXJsonServlet returns non-standard JSON strings
function data_workaround(d) {
d.nn.JournalTransactionInfo = JSON.parse(d.nn.JournalTransactionInfo);
d.nn.NameJournalStatus = JSON.parse(d.nn.NameJournalStatus);
d.nn.NameDirStatuses = JSON.parse(d.nn.NameDirStatuses);
d.nn.NodeUsage = JSON.parse(d.nn.NodeUsage);
d.nn.CorruptFiles = JSON.parse(d.nn.CorruptFiles);
return d;
}
function startup_progress_workaround(r) {
load_json(
BEANS,
function(d) {
for (var k in d) {
data[k] = d[k].beans[0];
}
data = data_workaround(data);
render();
},
function (url, jqxhr, text, err) {
show_err_msg('<p>Failed to retrieve data from ' + url + ', cause: ' + err + '</p>');
});
function render() {
var base = dust.makeBase(HELPERS);
dust.render('dfshealth', base.push(data), function(err, out) {
$('#tab-overview').html(out);
$('a[href="#tab-datanode"]').click(load_datanode_info);
$('#ui-tabs a[href="#tab-overview"]').tab('show');
});
}
}
$('#ui-tabs a[href="#tab-overview"]').click(load_overview);
function show_err_msg(msg) {
$('#alert-panel-body').html(msg);
$('#alert-panel').show();
}
function ajax_error_handler(url, jqxhr, text, err) {
show_err_msg('<p>Failed to retrieve data from ' + url + ', cause: ' + err + '</p>');
}
function load_startup_progress() {
function workaround(r) {
function rename_property(o, s, d) {
if (o[s] !== undefined) {
o[d] = o[s];
@ -86,36 +114,66 @@
});
return r;
}
d.nn.JournalTransactionInfo = JSON.parse(d.nn.JournalTransactionInfo);
d.nn.NameJournalStatus = JSON.parse(d.nn.NameJournalStatus);
d.nn.NameDirStatuses = JSON.parse(d.nn.NameDirStatuses);
d.nn.NodeUsage = JSON.parse(d.nn.NodeUsage);
d.nn.LiveNodes = node_map_to_array(JSON.parse(d.nn.LiveNodes));
d.nn.DeadNodes = node_map_to_array(JSON.parse(d.nn.DeadNodes));
d.nn.DecomNodes = node_map_to_array(JSON.parse(d.nn.DecomNodes));
d.nn.CorruptFiles = JSON.parse(d.nn.CorruptFiles);
d.fs.SnapshotStats = JSON.parse(d.fs.SnapshotStats);
d.startup = startup_progress_workaround(d.startup);
return d;
$.get('/startupProgress', function (resp) {
var data = workaround(resp);
dust.render('startup-progress', data, function(err, out) {
$('#tab-startup-progress').html(out);
$('#ui-tabs a[href="#tab-startup-progress"]').tab('show');
});
}).error(ajax_error_handler);
}
function show_err_msg(msg) {
$('#alert-panel-body').html(msg);
$('#alert-panel').show();
}
$('#ui-tabs a[href="#tab-startup-progress"]').click(load_startup_progress);
load_json(
BEANS,
function(d) {
for (var k in d) {
data[k] = k === "startup" ? d[k] : d[k].beans[0];
function load_datanode_info() {
function workaround(r) {
function node_map_to_array(nodes) {
var res = [];
for (var n in nodes) {
var p = nodes[n];
p.name = n;
res.push(p);
}
return res;
}
data = data_workaround(data);
render();
},
function (url, jqxhr, text, err) {
show_err_msg('<p>Failed to retrieve data from ' + url + ', cause: ' + err + '</p>');
});
r.LiveNodes = node_map_to_array(JSON.parse(r.LiveNodes));
r.DeadNodes = node_map_to_array(JSON.parse(r.DeadNodes));
r.DecomNodes = node_map_to_array(JSON.parse(r.DecomNodes));
return r;
}
$.get('/jmx?qry=Hadoop:service=NameNode,name=NameNodeInfo', function (resp) {
var data = workaround(resp.beans[0]);
dust.render('datanode-info', data, function(err, out) {
$('#tab-datanode').html(out);
$('#ui-tabs a[href="#tab-datanode"]').tab('show');
});
}).error(ajax_error_handler);
}
$('a[href="#tab-datanode"]').click(load_datanode_info);
function load_snapshot_info() {
$.get('/jmx?qry=Hadoop:service=NameNode,name=FSNamesystemState', function (resp) {
var data = JSON.parse(resp.beans[0].SnapshotStats);
dust.render('snapshot-info', data, function(err, out) {
$('#tab-snapshot').html(out);
$('#ui-tabs a[href="#tab-snapshot"]').tab('show');
});
}).error(ajax_error_handler);
}
$('#ui-tabs a[href="#tab-snapshot"]').click(load_snapshot_info);
var hash = window.location.hash;
if (hash === "#tab-datanode") {
load_datanode_info();
} else if (hash === "#tab-snapshot") {
load_snapshot_info();
} else if (hash === "#tab-startup-progress") {
load_startup_progress();
} else {
load_overview();
}
})();

View File

@ -192,4 +192,24 @@ div.security {
.panel-success > .panel-heading {
color: #fff !important;
background-color: #5FA33E !important;
}
header.bs-docs-nav, header.bs-docs-nav .navbar-brand {
border-radius: 0px;
background-color: #5fa33e;
color: #fff;
}
#ui-tabs > li > a {
color: #dcf0d3;
}
#ui-tabs .active a {
color: #fff;
background-color: #446633;
}
#alert-panel {
margin-top:20px;
display: none;
}

View File

@ -18,16 +18,17 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.junit.Assert.*;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.logging.impl.Log4JLogger;
@ -46,6 +47,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.PathUtils;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@ -54,13 +57,30 @@ import org.apache.log4j.RollingFileAppender;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
/**
* A JUnit test that audit logs are generated
*/
@RunWith(Parameterized.class)
public class TestAuditLogs {
static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log";
boolean useAsyncLog;
@Parameters
public static Collection<Object[]> data() {
Collection<Object[]> params = new ArrayList<Object[]>();
params.add(new Object[]{new Boolean(false)});
params.add(new Object[]{new Boolean(true)});
return params;
}
public TestAuditLogs(boolean useAsyncLog) {
this.useAsyncLog = useAsyncLog;
}
// Pattern for:
// allowed=(true|false) ugi=name ip=/address cmd={cmd} src={path} dst=null perm=null
static final Pattern auditPattern = Pattern.compile(
@ -84,17 +104,28 @@ public class TestAuditLogs {
@Before
public void setupCluster() throws Exception {
// must configure prior to instantiating the namesystem because it
// will reconfigure the logger if async is enabled
configureAuditLogs();
conf = new HdfsConfiguration();
final long precision = 1L;
conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L);
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog);
util = new DFSTestUtil.Builder().setName("TestAuditAllowed").
setNumFiles(20).build();
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build();
fs = cluster.getFileSystem();
util.createFiles(fs, fileName);
// make sure the appender is what it's supposed to be
Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
@SuppressWarnings("unchecked")
List<Appender> appenders = Collections.list(logger.getAllAppenders());
assertEquals(1, appenders.size());
assertEquals(useAsyncLog, appenders.get(0) instanceof AsyncAppender);
fnames = util.getFileNames(fileName);
util.waitReplication(fs, fileName, (short)3);
userGroupInfo = UserGroupInformation.createUserForTesting(username, groups);
@ -203,6 +234,7 @@ public class TestAuditLogs {
try {
hftpFs = (HftpFileSystem) new Path(hftpUri).getFileSystem(conf);
InputStream istream = hftpFs.open(file);
@SuppressWarnings("unused")
int val = istream.read();
istream.close();
@ -234,6 +266,12 @@ public class TestAuditLogs {
/** Sets up log4j logger for auditlogs */
private void setupAuditLogs() throws IOException {
Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
// enable logging now that the test is ready to run
logger.setLevel(Level.INFO);
}
private void configureAuditLogs() throws IOException {
// Shutdown the LogManager to release all logger open file handles.
// Unfortunately, Apache commons logging library does not provide
// means to release underlying loggers. For additional info look up
@ -245,7 +283,8 @@ public class TestAuditLogs {
assertTrue(file.delete());
}
Logger logger = ((Log4JLogger) FSNamesystem.auditLog).getLogger();
logger.setLevel(Level.INFO);
// disable logging while the cluster startup preps files
logger.setLevel(Level.OFF);
PatternLayout layout = new PatternLayout("%m%n");
RollingFileAppender appender = new RollingFileAppender(layout, auditLogFile);
logger.addAppender(appender);

View File

@ -26,6 +26,7 @@ import static org.junit.Assert.fail;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@ -952,7 +953,76 @@ public class TestINodeFile {
}
}
}
@Test
public void testLocationLimitInListingOps() throws Exception {
final Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_LIST_LIMIT, 9); // 3 blocks * 3 replicas
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
cluster.waitActive();
final DistributedFileSystem hdfs = cluster.getFileSystem();
ArrayList<String> source = new ArrayList<String>();
// tmp1 holds files with 3 blocks, 3 replicas
// tmp2 holds files with 3 blocks, 1 replica
hdfs.mkdirs(new Path("/tmp1"));
hdfs.mkdirs(new Path("/tmp2"));
source.add("f1");
source.add("f2");
int numEntries = source.size();
for (int j=0;j<numEntries;j++) {
DFSTestUtil.createFile(hdfs, new Path("/tmp1/"+source.get(j)), 4096,
3*1024-100, 1024, (short) 3, 0);
}
byte[] start = HdfsFileStatus.EMPTY_NAME;
for (int j=0;j<numEntries;j++) {
DirectoryListing dl = cluster.getNameNodeRpc().getListing("/tmp1",
start, true);
assertTrue(dl.getPartialListing().length == 1);
for (int i=0;i<dl.getPartialListing().length; i++) {
source.remove(dl.getPartialListing()[i].getLocalName());
}
start = dl.getLastName();
}
// Verify we have listed all entries in the directory.
assertTrue(source.size() == 0);
// Now create 6 files, each with 3 locations. Should take 2 iterations of 3
source.add("f1");
source.add("f2");
source.add("f3");
source.add("f4");
source.add("f5");
source.add("f6");
numEntries = source.size();
for (int j=0;j<numEntries;j++) {
DFSTestUtil.createFile(hdfs, new Path("/tmp2/"+source.get(j)), 4096,
3*1024-100, 1024, (short) 1, 0);
}
start = HdfsFileStatus.EMPTY_NAME;
for (int j=0;j<numEntries/3;j++) {
DirectoryListing dl = cluster.getNameNodeRpc().getListing("/tmp2",
start, true);
assertTrue(dl.getPartialListing().length == 3);
for (int i=0;i<dl.getPartialListing().length; i++) {
source.remove(dl.getPartialListing()[i].getLocalName());
}
start = dl.getLastName();
}
// Verify we have listed all entries in tmp2.
assertTrue(source.size() == 0);
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test
public void testFilesInGetListingOps() throws Exception {
final Configuration conf = new Configuration();

View File

@ -203,6 +203,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5672. Provide optional RollingFileAppender for container log4j
(syslog) (Gera Shegalov via jlowe)
MAPREDUCE-5725. Make explicit that TestNetworkedJob relies on the Capacity
Scheduler (Sandy Ryza)
OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)
@ -280,6 +283,14 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5724. JobHistoryServer does not start if HDFS is not running.
(tucu)
MAPREDUCE-5729. mapred job -list throws NPE (kasha)
MAPREDUCE-5693. Restore MRv1 behavior for log flush (Gera Shegalov via
jlowe)
MAPREDUCE-5723. MR AM container log can be truncated or empty.
(Mohammad Kamrul Islam via kasha)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -27,6 +27,7 @@ import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -61,7 +62,6 @@ import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.log4j.LogManager;
/**
* The main() for MapReduce task processes.
@ -123,6 +123,7 @@ class YarnChild {
LOG.debug("PID: " + System.getenv().get("JVM_PID"));
Task task = null;
UserGroupInformation childUGI = null;
ScheduledExecutorService logSyncer = null;
try {
int idleLoopCount = 0;
@ -161,6 +162,8 @@ class YarnChild {
// set job classloader if configured before invoking the task
MRApps.setJobClassLoader(job);
logSyncer = TaskLog.createLogSyncer();
// Create a final reference to the task for the doAs block
final Task taskFinal = task;
childUGI.doAs(new PrivilegedExceptionAction<Object>() {
@ -214,10 +217,7 @@ class YarnChild {
} finally {
RPC.stopProxy(umbilical);
DefaultMetricsSystem.shutdown();
// Shutting down log4j of the child-vm...
// This assumes that on return from Task.run()
// there is no more logging done.
LogManager.shutdown();
TaskLog.syncLogsShutdown(logSyncer);
}
}

View File

@ -31,6 +31,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.io.IOUtils;
@ -45,6 +46,7 @@ import org.apache.hadoop.mapred.FileOutputCommitter;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.LocalContainerLauncher;
import org.apache.hadoop.mapred.TaskAttemptListenerImpl;
import org.apache.hadoop.mapred.TaskLog;
import org.apache.hadoop.mapred.TaskUmbilicalProtocol;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
@ -119,6 +121,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringInterner;
@ -212,6 +215,7 @@ public class MRAppMaster extends CompositeService {
boolean errorHappenedShutDown = false;
private String shutDownMessage = null;
JobStateInternal forcedState = null;
private final ScheduledExecutorService logSyncer;
private long recoveredJobStartTime = 0;
@ -240,6 +244,7 @@ public class MRAppMaster extends CompositeService {
this.nmHttpPort = nmHttpPort;
this.metrics = MRAppMetrics.create();
this.maxAppAttempts = maxAppAttempts;
logSyncer = TaskLog.createLogSyncer();
LOG.info("Created MRAppMaster for application " + applicationAttemptId);
}
@ -1078,6 +1083,12 @@ public class MRAppMaster extends CompositeService {
// All components have started, start the job.
startJobs();
}
@Override
public void stop() {
super.stop();
TaskLog.syncLogsShutdown(logSyncer);
}
private void processRecovery() {
if (appAttemptID.getAttemptId() == 1) {
@ -1395,9 +1406,7 @@ public class MRAppMaster extends CompositeService {
initAndStartAppMaster(appMaster, conf, jobUserName);
} catch (Throwable t) {
LOG.fatal("Error starting MRAppMaster", t);
System.exit(1);
} finally {
LogManager.shutdown();
ExitUtil.terminate(1, t);
}
}
@ -1473,4 +1482,11 @@ public class MRAppMaster extends CompositeService {
}
});
}
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
LogManager.shutdown();
}
}

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -445,11 +446,18 @@ public class TypeConverter {
jobStatus.setStartTime(application.getStartTime());
jobStatus.setFinishTime(application.getFinishTime());
jobStatus.setFailureInfo(application.getDiagnostics());
jobStatus.setNeededMem(application.getApplicationResourceUsageReport().getNeededResources().getMemory());
jobStatus.setNumReservedSlots(application.getApplicationResourceUsageReport().getNumReservedContainers());
jobStatus.setNumUsedSlots(application.getApplicationResourceUsageReport().getNumUsedContainers());
jobStatus.setReservedMem(application.getApplicationResourceUsageReport().getReservedResources().getMemory());
jobStatus.setUsedMem(application.getApplicationResourceUsageReport().getUsedResources().getMemory());
ApplicationResourceUsageReport resourceUsageReport =
application.getApplicationResourceUsageReport();
if (resourceUsageReport != null) {
jobStatus.setNeededMem(
resourceUsageReport.getNeededResources().getMemory());
jobStatus.setNumReservedSlots(
resourceUsageReport.getNumReservedContainers());
jobStatus.setNumUsedSlots(resourceUsageReport.getNumUsedContainers());
jobStatus.setReservedMem(
resourceUsageReport.getReservedResources().getMemory());
jobStatus.setUsedMem(resourceUsageReport.getUsedResources().getMemory());
}
return jobStatus;
}

View File

@ -23,8 +23,6 @@ import static org.mockito.Mockito.when;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.JobStatus.State;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
@ -40,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.util.Records;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@ -112,6 +111,14 @@ public class TestTypeConverter {
when(mockReport.getUser()).thenReturn("dummy-user");
when(mockReport.getQueue()).thenReturn("dummy-queue");
String jobFile = "dummy-path/job.xml";
try {
JobStatus status = TypeConverter.fromYarn(mockReport, jobFile);
} catch (NullPointerException npe) {
Assert.fail("Type converstion from YARN fails for jobs without " +
"ApplicationUsageReport");
}
ApplicationResourceUsageReport appUsageRpt = Records
.newRecord(ApplicationResourceUsageReport.class);
Resource r = Records.newRecord(Resource.class);

View File

@ -23,12 +23,17 @@ import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.Flushable;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -44,6 +49,8 @@ import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
@ -262,7 +269,86 @@ public class TaskLog {
}
writeToIndexFile(logLocation, isCleanup);
}
public static synchronized void syncLogsShutdown(
ScheduledExecutorService scheduler)
{
// flush standard streams
//
System.out.flush();
System.err.flush();
if (scheduler != null) {
scheduler.shutdownNow();
}
// flush & close all appenders
LogManager.shutdown();
}
@SuppressWarnings("unchecked")
public static synchronized void syncLogs() {
// flush standard streams
//
System.out.flush();
System.err.flush();
// flush flushable appenders
//
final Logger rootLogger = Logger.getRootLogger();
flushAppenders(rootLogger);
final Enumeration<Logger> allLoggers = rootLogger.getLoggerRepository().
getCurrentLoggers();
while (allLoggers.hasMoreElements()) {
final Logger l = allLoggers.nextElement();
flushAppenders(l);
}
}
@SuppressWarnings("unchecked")
private static void flushAppenders(Logger l) {
final Enumeration<Appender> allAppenders = l.getAllAppenders();
while (allAppenders.hasMoreElements()) {
final Appender a = allAppenders.nextElement();
if (a instanceof Flushable) {
try {
((Flushable) a).flush();
} catch (IOException ioe) {
System.err.println(a + ": Failed to flush!"
+ StringUtils.stringifyException(ioe));
}
}
}
}
public static ScheduledExecutorService createLogSyncer() {
final ScheduledExecutorService scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
final Thread t = Executors.defaultThreadFactory().newThread(r);
t.setDaemon(true);
t.setName("Thread for syncLogs");
return t;
}
});
ShutdownHookManager.get().addShutdownHook(new Runnable() {
@Override
public void run() {
TaskLog.syncLogsShutdown(scheduler);
}
}, 50);
scheduler.scheduleWithFixedDelay(
new Runnable() {
@Override
public void run() {
TaskLog.syncLogs();
}
}, 0L, 5L, TimeUnit.SECONDS);
return scheduler;
}
/**
* The filter for userlogs.
*/

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
import java.io.Flushable;
import java.util.LinkedList;
import java.util.Queue;
@ -31,7 +32,7 @@ import org.apache.log4j.spi.LoggingEvent;
*
*/
@InterfaceStability.Unstable
public class TaskLogAppender extends FileAppender {
public class TaskLogAppender extends FileAppender implements Flushable {
private String taskId; //taskId should be managed as String rather than TaskID object
//so that log4j can configure it from the configuration(log4j.properties).
private Integer maxEvents;
@ -92,6 +93,7 @@ public class TaskLogAppender extends FileAppender {
}
}
@Override
public void flush() {
if (qw != null) {
qw.flush();

View File

@ -45,7 +45,9 @@ import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.junit.Test;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
@ -76,8 +78,7 @@ public class TestNetworkedJob {
FileSystem fileSys = null;
try {
mr = MiniMRClientClusterFactory.create(this.getClass(), 2,
new Configuration());
mr = createMiniClusterWithCapacityScheduler();
JobConf job = new JobConf(mr.getConfig());
@ -129,8 +130,7 @@ public class TestNetworkedJob {
FileSystem fileSys = null;
try {
Configuration conf = new Configuration();
mr = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
mr = createMiniClusterWithCapacityScheduler();
JobConf job = new JobConf(mr.getConfig());
@ -315,8 +315,7 @@ public class TestNetworkedJob {
FileSystem fileSys = null;
PrintStream oldOut = System.out;
try {
Configuration conf = new Configuration();
mr = MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
mr = createMiniClusterWithCapacityScheduler();
JobConf job = new JobConf(mr.getConfig());
@ -392,4 +391,13 @@ public class TestNetworkedJob {
}
}
}
private MiniMRClientCluster createMiniClusterWithCapacityScheduler()
throws IOException {
Configuration conf = new Configuration();
// Expected queue names depending on Capacity Scheduler queue naming
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
CapacityScheduler.class);
return MiniMRClientClusterFactory.create(this.getClass(), 2, conf);
}
}

View File

@ -229,6 +229,17 @@ Release 2.4.0 - UNRELEASED
YARN-1567. In Fair Scheduler, allow empty queues to change between leaf and
parent on allocation file reload (Sandy Ryza)
YARN-1616. RMFatalEventDispatcher should log the cause of the event (kasha)
YARN-1624. QueuePlacementPolicy format is not easily readable via a JAXB
parser (Aditya Acharya via Sandy Ryza)
YARN-1623. Include queue name in RegisterApplicationMasterResponse (Sandy
Ryza)
YARN-1573. ZK store should use a private password for root-node-acls.
(kasha).
OPTIMIZATIONS
BUG FIXES
@ -352,6 +363,9 @@ Release 2.4.0 - UNRELEASED
YARN-1606. Fix the default value of yarn.resourcemanager.zk-timeout-ms
in yarn-default.xml (kasha)
YARN-1607. TestRM relies on the scheduler assigning multiple containers in
a single node update (Sandy Ryza)
Release 2.3.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -55,13 +55,14 @@ public abstract class RegisterApplicationMasterResponse {
public static RegisterApplicationMasterResponse newInstance(
Resource minCapability, Resource maxCapability,
Map<ApplicationAccessType, String> acls, ByteBuffer key,
List<Container> containersFromPreviousAttempt) {
List<Container> containersFromPreviousAttempt, String queue) {
RegisterApplicationMasterResponse response =
Records.newRecord(RegisterApplicationMasterResponse.class);
response.setMaximumResourceCapability(maxCapability);
response.setApplicationACLs(acls);
response.setClientToAMTokenMasterKey(key);
response.setContainersFromPreviousAttempt(containersFromPreviousAttempt);
response.setQueue(queue);
return response;
}
@ -111,6 +112,20 @@ public abstract class RegisterApplicationMasterResponse {
@Stable
public abstract void setClientToAMTokenMasterKey(ByteBuffer key);
/**
* <p>Get the queue that the application was placed in.<p>
*/
@Public
@Stable
public abstract String getQueue();
/**
* <p>Set the queue that the application was placed in.<p>
*/
@Public
@Stable
public abstract void setQueue(String queue);
/**
* <p>
* Get the list of running containers as viewed by

View File

@ -45,6 +45,7 @@ message RegisterApplicationMasterResponseProto {
optional bytes client_to_am_token_master_key = 2;
repeated ApplicationACLMapProto application_ACLs = 3;
repeated ContainerProto containers_from_previous_attempt = 4;
optional string queue = 5;
}
message FinishApplicationMasterRequestProto {

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn;
import java.io.File;
import java.io.Flushable;
import java.util.LinkedList;
import java.util.Queue;
@ -33,7 +34,9 @@ import org.apache.log4j.spi.LoggingEvent;
*/
@Public
@Unstable
public class ContainerLogAppender extends FileAppender {
public class ContainerLogAppender extends FileAppender
implements Flushable
{
private String containerLogDir;
//so that log4j can configure it from the configuration(log4j.properties).
private int maxEvents;
@ -65,6 +68,7 @@ public class ContainerLogAppender extends FileAppender {
}
}
@Override
public void flush() {
if (qw != null) {
qw.flush();

View File

@ -252,6 +252,25 @@ public class RegisterApplicationMasterResponsePBImpl extends
this.containersFromPreviousAttempt = new ArrayList<Container>();
this.containersFromPreviousAttempt.addAll(containers);
}
@Override
public String getQueue() {
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasQueue()) {
return null;
}
return p.getQueue();
}
@Override
public void setQueue(String queue) {
maybeInitBuilder();
if (queue == null) {
builder.clearQueue();
} else {
builder.setQueue(queue);
}
}
private void initRunningContainersList() {
RegisterApplicationMasterResponseProtoOrBuilder p = viaProto ? proto : builder;

View File

@ -357,7 +357,7 @@
By default, when this property is not set, we use the ACLs from
yarn.resourcemanager.zk-acl for shared admin access and
rm-address:cluster-timestamp for username-based exclusive create-delete
rm-address:random-number for username-based exclusive create-delete
access.
This property allows users to set ACLs of their choice instead of using

View File

@ -267,6 +267,7 @@ public class ApplicationMasterService extends AbstractService implements
.getMaximumResourceCapability());
response.setApplicationACLs(app.getRMAppAttempt(applicationAttemptId)
.getSubmissionContext().getAMContainerSpec().getApplicationACLs());
response.setQueue(app.getQueue());
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Setting client token master key");
response.setClientToAMTokenMasterKey(java.nio.ByteBuffer.wrap(rmContext

View File

@ -607,7 +607,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
@Override
public void handle(RMFatalEvent event) {
LOG.fatal("Received a " + RMFatalEvent.class.getName() + " of type " +
event.getType().name());
event.getType().name() + ". Cause:\n" + event.getCause());
if (event.getType() == RMFatalEventType.STATE_STORE_FENCED) {
LOG.info("RMStateStore has been fenced");

View File

@ -24,6 +24,7 @@ import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@ -47,7 +48,6 @@ import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.Appli
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
@ -74,6 +74,7 @@ import com.google.common.annotations.VisibleForTesting;
public class ZKRMStateStore extends RMStateStore {
public static final Log LOG = LogFactory.getLog(ZKRMStateStore.class);
private final SecureRandom random = new SecureRandom();
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
@ -136,6 +137,8 @@ public class ZKRMStateStore extends RMStateStore {
private String fencingNodePath;
private Op createFencingNodePathOp;
private Op deleteFencingNodePathOp;
private String zkRootNodeUsername;
private final String zkRootNodePassword = Long.toString(random.nextLong());
@VisibleForTesting
List<ACL> zkRootNodeAcl;
@ -145,9 +148,6 @@ public class ZKRMStateStore extends RMStateStore {
private final String zkRootNodeAuthScheme =
new DigestAuthenticationProvider().getScheme();
private String zkRootNodeUsername;
private String zkRootNodePassword;
/**
* Given the {@link Configuration} and {@link ACL}s used (zkAcl) for
* ZooKeeper access, construct the {@link ACL}s for the store's root node.
@ -172,7 +172,6 @@ public class ZKRMStateStore extends RMStateStore {
zkRootNodeUsername = HAUtil.getConfValueForRMInstance(
YarnConfiguration.RM_ADDRESS,
YarnConfiguration.DEFAULT_RM_ADDRESS, conf);
zkRootNodePassword = Long.toString(ResourceManager.getClusterTimeStamp());
Id rmId = new Id(zkRootNodeAuthScheme,
DigestAuthenticationProvider.generateDigest(
zkRootNodeUsername + ":" + zkRootNodePassword));

View File

@ -80,7 +80,13 @@ public class QueuePlacementPolicy {
Node node = elements.item(i);
if (node instanceof Element) {
Element element = (Element)node;
String ruleName = element.getTagName();
String ruleName = element.getAttribute("name");
if ("".equals(ruleName)) {
throw new AllocationConfigurationException("No name provided for a " +
"rule element");
}
Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
if (clazz == null) {
throw new AllocationConfigurationException("No rule class found for "

View File

@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import junit.framework.Assert;
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.NMToken;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
@ -46,6 +48,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
@ -56,6 +59,9 @@ public class TestRM {
private static final Log LOG = LogFactory.getLog(TestRM.class);
// Milliseconds to sleep for when waiting for something to happen
private final static int WAIT_SLEEP_MS = 100;
@Test
public void testGetNewAppId() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
@ -69,7 +75,7 @@ public class TestRM {
rm.stop();
}
@Test
@Test (timeout = 30000)
public void testAppWithNoContainers() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
@ -91,7 +97,7 @@ public class TestRM {
rm.stop();
}
@Test
@Test (timeout = 30000)
public void testAppOnMultiNode() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.DEBUG);
@ -116,30 +122,30 @@ public class TestRM {
am.allocate("h1" , 1000, request, new ArrayList<ContainerId>());
//kick the scheduler
nm1.nodeHeartbeat(true);
List<Container> conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
int contReceived = conts.size();
while (contReceived < 3) {//only 3 containers are available on node1
nm1.nodeHeartbeat(true);
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
contReceived = conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 3);
Thread.sleep(2000);
Thread.sleep(WAIT_SLEEP_MS);
}
Assert.assertEquals(3, conts.size());
//send node2 heartbeat
nm2.nodeHeartbeat(true);
conts = am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers();
contReceived = conts.size();
while (contReceived < 10) {
nm2.nodeHeartbeat(true);
conts.addAll(am.allocate(new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>()).getAllocatedContainers());
contReceived = conts.size();
LOG.info("Got " + contReceived + " containers. Waiting to get " + 10);
Thread.sleep(2000);
Thread.sleep(WAIT_SLEEP_MS);
}
Assert.assertEquals(10, conts.size());
@ -150,7 +156,7 @@ public class TestRM {
rm.stop();
}
@Test
@Test (timeout = 40000)
public void testNMToken() throws Exception {
MockRM rm = new MockRM();
try {
@ -187,19 +193,17 @@ public class TestRM {
// initially requesting 2 containers.
AllocateResponse response =
am.allocate("h1", 1000, 2, releaseContainerList);
nm1.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2,
nmTokens);
nmTokens, nm1);
Assert.assertEquals(1, nmTokens.size());
// requesting 2 more containers.
response = am.allocate("h1", 1000, 2, releaseContainerList);
nm1.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4,
nmTokens);
nmTokens, nm1);
Assert.assertEquals(1, nmTokens.size());
@ -211,23 +215,27 @@ public class TestRM {
new ArrayList<Container>();
response = am.allocate("h2", 1000, 2, releaseContainerList);
nm2.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2,
nmTokens);
nmTokens, nm2);
Assert.assertEquals(2, nmTokens.size());
// Simulating NM-2 restart.
nm2 = rm.registerNode("h2:1234", 10000);
nm2.nodeHeartbeat(true);
// Wait for reconnect to make it through the RM and create a new RMNode
Map<NodeId, RMNode> nodes = rm.getRMContext().getRMNodes();
while (nodes.get(nm2.getNodeId()).getLastNodeHeartBeatResponse()
.getResponseId() > 0) {
Thread.sleep(WAIT_SLEEP_MS);
}
int interval = 40;
// Wait for nm Token to be cleared.
while (nmTokenSecretManager
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
nm2.getNodeId()) && interval-- > 0) {
LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId());
Thread.sleep(1000);
Thread.sleep(WAIT_SLEEP_MS);
}
Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptRegistered(attempt.getAppAttemptId()));
@ -238,10 +246,9 @@ public class TestRM {
// We should again receive the NMToken.
response = am.allocate("h2", 1000, 2, releaseContainerList);
nm2.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4,
nmTokens);
nmTokens, nm2);
Assert.assertEquals(2, nmTokens.size());
// Now rolling over NMToken masterKey. it should resend the NMToken in
@ -270,10 +277,9 @@ public class TestRM {
Assert.assertEquals(0, nmTokens.size());
// We should again receive the NMToken.
response = am.allocate("h2", 1000, 1, releaseContainerList);
nm2.nodeHeartbeat(true);
Assert.assertEquals(0, response.getAllocatedContainers().size());
allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5,
nmTokens);
nmTokens, nm2);
Assert.assertEquals(1, nmTokens.size());
Assert.assertTrue(nmTokenSecretManager
.isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
@ -305,12 +311,14 @@ public class TestRM {
protected void allocateContainersAndValidateNMTokens(MockAM am,
ArrayList<Container> containersReceived, int totalContainerRequested,
HashMap<String, Token> nmTokens) throws Exception, InterruptedException {
HashMap<String, Token> nmTokens, MockNM nm) throws Exception,
InterruptedException {
ArrayList<ContainerId> releaseContainerList = new ArrayList<ContainerId>();
AllocateResponse response;
ArrayList<ResourceRequest> resourceRequest =
new ArrayList<ResourceRequest>();
while (containersReceived.size() < totalContainerRequested) {
nm.nodeHeartbeat(true);
LOG.info("requesting containers..");
response =
am.allocate(resourceRequest, releaseContainerList);
@ -326,7 +334,7 @@ public class TestRM {
}
LOG.info("Got " + containersReceived.size()
+ " containers. Waiting to get " + totalContainerRequested);
Thread.sleep(500);
Thread.sleep(WAIT_SLEEP_MS);
}
}

View File

@ -23,6 +23,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestFifoScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.BeforeClass;
@ -150,4 +152,33 @@ public class TestApplicationMasterService {
}
}
}
@Test (timeout = 60000)
public void testNotifyAMOfPlacedQueue() throws Exception {
// By default, FairScheduler assigns queue by user name
conf.setClass(YarnConfiguration.RM_SCHEDULER, FairScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(conf);
try {
rm.start();
// Register node1
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 6 * GB);
// Submit an application
RMApp app1 = rm.submitApp(1024, "somename", "user1");
// kick the scheduling
nm1.nodeHeartbeat(true);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
RegisterApplicationMasterResponse response = am1.registerAppAttempt();
Assert.assertEquals("root.user1", response.getQueue());
} finally {
if (rm != null) {
rm.stop();
}
}
}
}

View File

@ -75,7 +75,7 @@ public class TestAllocationFileLoaderService {
out.println(" </queue>");
out.println(" <queue name=\"queueB\" />");
out.println(" <queuePlacementPolicy>");
out.println(" <default />");
out.println(" <rule name='default' />");
out.println(" </queuePlacementPolicy>");
out.println("</allocations>");
out.close();
@ -113,8 +113,8 @@ public class TestAllocationFileLoaderService {
out.println(" <maxRunningApps>3</maxRunningApps>");
out.println(" </queue>");
out.println(" <queuePlacementPolicy>");
out.println(" <specified />");
out.println(" <default />");
out.println(" <rule name='specified' />");
out.println(" <rule name='default' />");
out.println(" </queuePlacementPolicy>");
out.println("</allocations>");
out.close();

View File

@ -49,8 +49,8 @@ public class TestQueuePlacementPolicy {
public void testSpecifiedUserPolicy() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <specified />");
sb.append(" <user />");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq",policy.assignAppToQueue("specifiedq", "someuser"));
@ -62,9 +62,9 @@ public class TestQueuePlacementPolicy {
public void testNoCreate() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <specified />");
sb.append(" <user create=\"false\" />");
sb.append(" <default />");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='user' create=\"false\" />");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
@ -77,8 +77,8 @@ public class TestQueuePlacementPolicy {
public void testSpecifiedThenReject() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <specified />");
sb.append(" <reject />");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='reject' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
@ -89,8 +89,8 @@ public class TestQueuePlacementPolicy {
public void testOmittedTerminalRule() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <specified />");
sb.append(" <user create=\"false\" />");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='user' create=\"false\" />");
sb.append("</queuePlacementPolicy>");
parse(sb.toString());
}
@ -99,9 +99,9 @@ public class TestQueuePlacementPolicy {
public void testTerminalRuleInMiddle() throws Exception {
StringBuffer sb = new StringBuffer();
sb.append("<queuePlacementPolicy>");
sb.append(" <specified />");
sb.append(" <default />");
sb.append(" <user />");
sb.append(" <rule name='specified' />");
sb.append(" <rule name='default' />");
sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>");
parse(sb.toString());
}

View File

@ -318,9 +318,9 @@ Allocation file format
<userMaxAppsDefault>5</userMaxAppsDefault>
<queuePlacementPolicy>
<specified />
<primarygroup create="false" />
<default />
<rule name="specified" />
<rule name="primaryGroup" create="false" />
<rule name="default" />
</queuePlacementPolicy>
</allocations>
---