Merge r1408927 through r1410997 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1411007 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-11-18 22:31:28 +00:00
commit d8ffea5943
574 changed files with 2138 additions and 14777 deletions

View File

@ -710,7 +710,7 @@ runTests () {
ordered_modules="$ordered_modules $module"
fi
done
if [ -n $hdfs_modules ]; then
if [ -n "$hdfs_modules" ]; then
ordered_modules="$ordered_modules $hdfs_modules"
if [[ $building_common -eq 0 ]]; then
echo " Building hadoop-common with -Pnative in order to provide \

View File

@ -277,6 +277,9 @@ Trunk (Unreleased)
HADOOP-8974. TestDFVariations fails on Windows. (Chris Nauroth via suresh)
HADOOP-9037. Bug in test-patch.sh and precommit build process (Kihwal Lee
via jlowe)
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@ -364,6 +367,10 @@ Release 2.0.3-alpha - Unreleased
HADOOP-9021. Enforce configured SASL method on the server (daryn via
bobby)
HADOO-8998. set Cache-Control no-cache header on all dynamic content. (tucu)
HADOOP-9035. Generalize setup of LoginContext (daryn via bobby)
OPTIMIZATIONS
HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang
@ -426,6 +433,8 @@ Release 2.0.3-alpha - Unreleased
HADOOP-8999. SASL negotiation is flawed (daryn)
HADOOP-6607. Add different variants of non caching HTTP headers. (tucu)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -1116,6 +1125,18 @@ Release 2.0.0-alpha - 05-23-2012
HADOOP-8655. Fix TextInputFormat for large deliminators. (Gelesh via
bobby)
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
@ -1151,7 +1172,7 @@ Release 0.23.5 - UNRELEASED
HADOOP-9025. org.apache.hadoop.tools.TestCopyListing failing (Jonathan
Eagles via jlowe)
Release 0.23.4 - UNRELEASED
Release 0.23.4
INCOMPATIBLE CHANGES

View File

@ -91,8 +91,6 @@ class Test extends FsCommand {
@Override
protected void processNonexistentPath(PathData item) throws IOException {
// NOTE: errors for FNF is not how the shell works!
if (flag != 'e') displayError(new PathNotFoundException(item.toString()));
exitCode = 1;
}
}

View File

@ -26,6 +26,7 @@ import java.net.InetSocketAddress;
import java.net.URL;
import java.security.GeneralSecurityException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
@ -103,6 +104,7 @@ public class HttpServer implements FilterContainer {
public static final String CONF_CONTEXT_ATTRIBUTE = "hadoop.conf";
public static final String ADMINS_ACL = "admins.acl";
public static final String SPNEGO_FILTER = "SpnegoFilter";
public static final String NO_CACHE_FILTER = "NoCacheFilter";
public static final String BIND_ADDRESS = "bind.address";
@ -255,6 +257,7 @@ public class HttpServer implements FilterContainer {
webAppContext.setWar(appDir + "/" + name);
webAppContext.getServletContext().setAttribute(CONF_CONTEXT_ATTRIBUTE, conf);
webAppContext.getServletContext().setAttribute(ADMINS_ACL, adminsAcl);
addNoCacheFilter(webAppContext);
webServer.addHandler(webAppContext);
addDefaultApps(contexts, appDir, conf);
@ -279,6 +282,12 @@ public class HttpServer implements FilterContainer {
}
}
@SuppressWarnings("unchecked")
private void addNoCacheFilter(WebAppContext ctxt) {
defineFilter(ctxt, NO_CACHE_FILTER,
NoCacheFilter.class.getName(), Collections.EMPTY_MAP, new String[] { "/*"});
}
/**
* Create a required listener for the Jetty instance listening on the port
* provided. This wrapper and all subclasses must create at least one
@ -338,6 +347,7 @@ public class HttpServer implements FilterContainer {
}
logContext.setDisplayName("logs");
setContextAttributes(logContext, conf);
addNoCacheFilter(webAppContext);
defaultContexts.put(logContext, true);
}
// set up the context for "/static/*"
@ -369,6 +379,7 @@ public class HttpServer implements FilterContainer {
public void addContext(Context ctxt, boolean isFiltered)
throws IOException {
webServer.addHandler(ctxt);
addNoCacheFilter(webAppContext);
defaultContexts.put(ctxt, isFiltered);
}
@ -462,7 +473,7 @@ public class HttpServer implements FilterContainer {
holder.setName(name);
}
webAppContext.addServlet(holder, pathSpec);
if(requireAuth && UserGroupInformation.isSecurityEnabled()) {
LOG.info("Adding Kerberos (SPNEGO) filter to " + name);
ServletHandler handler = webAppContext.getServletHandler();
@ -954,7 +965,7 @@ public class HttpServer implements FilterContainer {
@Override
public Enumeration<String> getParameterNames() {
return new Enumeration<String>() {
private Enumeration<String> rawIterator =
private Enumeration<String> rawIterator =
rawRequest.getParameterNames();
@Override
public boolean hasMoreElements() {

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.http;
import javax.servlet.Filter;
import javax.servlet.FilterChain;
import javax.servlet.FilterConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
public class NoCacheFilter implements Filter {
@Override
public void init(FilterConfig filterConfig) throws ServletException {
}
@Override
public void doFilter(ServletRequest req, ServletResponse res,
FilterChain chain)
throws IOException, ServletException {
HttpServletResponse httpRes = (HttpServletResponse) res;
httpRes.setHeader("Cache-Control", "no-cache");
long now = System.currentTimeMillis();
httpRes.addDateHeader("Expires", now);
httpRes.addDateHeader("Date", now);
httpRes.addHeader("Pragma", "no-cache");
chain.doFilter(req, res);
}
@Override
public void destroy() {
}
}

View File

@ -294,14 +294,15 @@ public class Client {
}
}
AuthenticationMethod authentication;
if (token != null) {
authMethod = AuthenticationMethod.TOKEN.getAuthMethod();
} else if (UserGroupInformation.isSecurityEnabled()) {
// eventually just use the ticket's authMethod
authMethod = AuthMethod.KERBEROS;
} else {
authMethod = AuthMethod.SIMPLE;
authentication = AuthenticationMethod.TOKEN;
} else if (ticket != null) {
authentication = ticket.getRealAuthenticationMethod();
} else { // this only happens in lazy tests
authentication = AuthenticationMethod.SIMPLE;
}
authMethod = authentication.getAuthMethod();
if (LOG.isDebugEnabled())
LOG.debug("Use " + authMethod + " authentication for protocol "

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.security;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN_DEFAULT;
@ -133,7 +132,7 @@ public class UserGroupInformation {
}
Principal user = null;
// if we are using kerberos, try it out
if (useKerberos) {
if (isAuthenticationMethodEnabled(AuthenticationMethod.KERBEROS)) {
user = getCanonicalUser(KerberosPrincipal.class);
if (LOG.isDebugEnabled()) {
LOG.debug("using kerberos user:"+user);
@ -191,8 +190,8 @@ public class UserGroupInformation {
static UgiMetrics metrics = UgiMetrics.create();
/** Are the static variables that depend on configuration initialized? */
private static boolean isInitialized = false;
/** Should we use Kerberos configuration? */
private static boolean useKerberos;
/** The auth method to use */
private static AuthenticationMethod authenticationMethod;
/** Server-side groups fetching service */
private static Groups groups;
/** Min time (in seconds) before relogin for Kerberos */
@ -237,20 +236,7 @@ public class UserGroupInformation {
* @param conf the configuration to use
*/
private static synchronized void initUGI(Configuration conf) {
AuthenticationMethod auth = SecurityUtil.getAuthenticationMethod(conf);
switch (auth) {
case SIMPLE:
case TOKEN:
useKerberos = false;
break;
case KERBEROS:
useKerberos = true;
break;
default:
throw new IllegalArgumentException("Invalid attribute value for " +
HADOOP_SECURITY_AUTHENTICATION +
" of " + auth);
}
authenticationMethod = SecurityUtil.getAuthenticationMethod(conf);
try {
kerberosMinSecondsBeforeRelogin = 1000L * conf.getLong(
HADOOP_KERBEROS_MIN_SECONDS_BEFORE_RELOGIN,
@ -288,8 +274,14 @@ public class UserGroupInformation {
* @return true if UGI is working in a secure environment
*/
public static boolean isSecurityEnabled() {
return !isAuthenticationMethodEnabled(AuthenticationMethod.SIMPLE);
}
@InterfaceAudience.Private
@InterfaceStability.Evolving
private static boolean isAuthenticationMethodEnabled(AuthenticationMethod method) {
ensureInitialized();
return useKerberos;
return (authenticationMethod == method);
}
/**
@ -585,7 +577,7 @@ public class UserGroupInformation {
@InterfaceStability.Evolving
public static UserGroupInformation getUGIFromTicketCache(
String ticketCache, String user) throws IOException {
if (!isSecurityEnabled()) {
if (!isAuthenticationMethodEnabled(AuthenticationMethod.KERBEROS)) {
return getBestUGI(null, user);
}
try {
@ -638,19 +630,12 @@ public class UserGroupInformation {
public synchronized
static UserGroupInformation getLoginUser() throws IOException {
if (loginUser == null) {
ensureInitialized();
try {
Subject subject = new Subject();
LoginContext login;
AuthenticationMethod authenticationMethod;
if (isSecurityEnabled()) {
authenticationMethod = AuthenticationMethod.KERBEROS;
login = newLoginContext(HadoopConfiguration.USER_KERBEROS_CONFIG_NAME,
subject, new HadoopConfiguration());
} else {
authenticationMethod = AuthenticationMethod.SIMPLE;
login = newLoginContext(HadoopConfiguration.SIMPLE_CONFIG_NAME,
subject, new HadoopConfiguration());
}
LoginContext login =
newLoginContext(authenticationMethod.getLoginAppName(),
subject, new HadoopConfiguration());
login.login();
loginUser = new UserGroupInformation(subject);
loginUser.setLogin(login);
@ -675,6 +660,14 @@ public class UserGroupInformation {
return loginUser;
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
synchronized static void setLoginUser(UserGroupInformation ugi) {
// if this is to become stable, should probably logout the currently
// logged in ugi if it's different
loginUser = ugi;
}
/**
* Is this user logged in from a keytab file?
* @return true if the credentials are from a keytab file.
@ -1027,22 +1020,38 @@ public class UserGroupInformation {
public static enum AuthenticationMethod {
// currently we support only one auth per method, but eventually a
// subtype is needed to differentiate, ex. if digest is token or ldap
SIMPLE(AuthMethod.SIMPLE),
KERBEROS(AuthMethod.KERBEROS),
SIMPLE(AuthMethod.SIMPLE,
HadoopConfiguration.SIMPLE_CONFIG_NAME),
KERBEROS(AuthMethod.KERBEROS,
HadoopConfiguration.USER_KERBEROS_CONFIG_NAME),
TOKEN(AuthMethod.DIGEST),
CERTIFICATE(null),
KERBEROS_SSL(null),
PROXY(null);
private final AuthMethod authMethod;
private final String loginAppName;
private AuthenticationMethod(AuthMethod authMethod) {
this(authMethod, null);
}
private AuthenticationMethod(AuthMethod authMethod, String loginAppName) {
this.authMethod = authMethod;
this.loginAppName = loginAppName;
}
public AuthMethod getAuthMethod() {
return authMethod;
}
String getLoginAppName() {
if (loginAppName == null) {
throw new UnsupportedOperationException(
this + " login authentication is not supported");
}
return loginAppName;
}
public static AuthenticationMethod valueOf(AuthMethod authMethod) {
for (AuthenticationMethod value : values()) {
if (value.getAuthMethod() == authMethod) {
@ -1334,7 +1343,21 @@ public class UserGroupInformation {
public synchronized AuthenticationMethod getAuthenticationMethod() {
return user.getAuthenticationMethod();
}
/**
* Get the authentication method from the real user's subject. If there
* is no real user, return the given user's authentication method.
*
* @return AuthenticationMethod in the subject, null if not present.
*/
public synchronized AuthenticationMethod getRealAuthenticationMethod() {
UserGroupInformation ugi = getRealUser();
if (ugi == null) {
ugi = this;
}
return ugi.getAuthenticationMethod();
}
/**
* Returns the authentication method of a ugi. If the authentication method is
* PROXY, returns the authentication method of the real user.

View File

@ -539,4 +539,17 @@ public class TestHttpServer extends HttpServerFunctionalTest {
}
return server;
}
@Test
public void testNoCacheHeader() throws Exception {
URL url = new URL(baseUrl, "/echo?a=b&c=d");
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
assertEquals(HttpURLConnection.HTTP_OK, conn.getResponseCode());
assertEquals("no-cache", conn.getHeaderField("Cache-Control"));
assertEquals("no-cache", conn.getHeaderField("Pragma"));
assertNotNull(conn.getHeaderField("Expires"));
assertNotNull(conn.getHeaderField("Date"));
assertEquals(conn.getHeaderField("Expires"), conn.getHeaderField("Date"));
}
}

View File

@ -70,16 +70,75 @@ public class TestUserGroupInformation {
/** configure ugi */
@BeforeClass
public static void setup() {
javax.security.auth.login.Configuration.setConfiguration(
new DummyLoginConfiguration());
}
@Before
public void setupUgi() {
conf = new Configuration();
conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTH_TO_LOCAL,
"RULE:[2:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//" +
"RULE:[1:$1@$0](.*@HADOOP.APACHE.ORG)s/@.*//"
+ "DEFAULT");
UserGroupInformation.setConfiguration(conf);
javax.security.auth.login.Configuration.setConfiguration(
new DummyLoginConfiguration());
UserGroupInformation.setLoginUser(null);
}
@After
public void resetUgi() {
UserGroupInformation.setLoginUser(null);
}
@Test
public void testSimpleLogin() throws IOException {
tryLoginAuthenticationMethod(AuthenticationMethod.SIMPLE, true);
}
@Test
public void testTokenLogin() throws IOException {
tryLoginAuthenticationMethod(AuthenticationMethod.TOKEN, false);
}
@Test
public void testProxyLogin() throws IOException {
tryLoginAuthenticationMethod(AuthenticationMethod.PROXY, false);
}
private void tryLoginAuthenticationMethod(AuthenticationMethod method,
boolean expectSuccess)
throws IOException {
SecurityUtil.setAuthenticationMethod(method, conf);
UserGroupInformation.setConfiguration(conf); // pick up changed auth
UserGroupInformation ugi = null;
Exception ex = null;
try {
ugi = UserGroupInformation.getLoginUser();
} catch (Exception e) {
ex = e;
}
if (expectSuccess) {
assertNotNull(ugi);
assertEquals(method, ugi.getAuthenticationMethod());
} else {
assertNotNull(ex);
assertEquals(UnsupportedOperationException.class, ex.getClass());
assertEquals(method + " login authentication is not supported",
ex.getMessage());
}
}
@Test
public void testGetRealAuthenticationMethod() {
UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
ugi.setAuthenticationMethod(AuthenticationMethod.SIMPLE);
assertEquals(AuthenticationMethod.SIMPLE, ugi.getAuthenticationMethod());
assertEquals(AuthenticationMethod.SIMPLE, ugi.getRealAuthenticationMethod());
ugi = UserGroupInformation.createProxyUser("user2", ugi);
assertEquals(AuthenticationMethod.PROXY, ugi.getAuthenticationMethod());
assertEquals(AuthenticationMethod.SIMPLE, ugi.getRealAuthenticationMethod());
}
/** Test login method */
@Test
public void testLogin() throws Exception {

View File

@ -31,6 +31,7 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.ext.Provider;
import java.lang.reflect.Type;
import java.security.Principal;
import java.text.MessageFormat;
import java.util.regex.Pattern;
@Provider
@ -40,13 +41,26 @@ public class UserProvider extends AbstractHttpContextInjectable<Principal> imple
public static final String USER_NAME_PARAM = "user.name";
public static final Pattern USER_PATTERN = Pattern.compile("[_a-zA-Z0-9]+");
public static final Pattern USER_PATTERN = Pattern.compile("^[A-Za-z_][A-Za-z0-9._-]*[$]?$");
private static class UserParam extends StringParam {
static class UserParam extends StringParam {
public UserParam(String user) {
super(USER_NAME_PARAM, user, USER_PATTERN);
}
@Override
public String parseParam(String str) {
if (str != null) {
int len = str.length();
if (len < 1 || len > 31) {
throw new IllegalArgumentException(MessageFormat.format(
"Parameter [{0}], invalid value [{1}], it's length must be between 1 and 31",
getName(), str));
}
}
return super.parseParam(str);
}
}
@Override

View File

@ -19,13 +19,18 @@
package org.apache.hadoop.lib.wsrs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import java.security.Principal;
import javax.ws.rs.core.MultivaluedMap;
import org.apache.hadoop.test.TestException;
import org.apache.hadoop.test.TestExceptionHelper;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.MethodRule;
import org.mockito.Mockito;
import org.slf4j.MDC;
@ -35,6 +40,9 @@ import com.sun.jersey.core.spi.component.ComponentScope;
public class TestUserProvider {
@Rule
public MethodRule exceptionHelper = new TestExceptionHelper();
@Test
@SuppressWarnings("unchecked")
public void noUser() {
@ -92,4 +100,51 @@ public class TestUserProvider {
assertEquals(up.getInjectable(null, null, Principal.class), up);
assertNull(up.getInjectable(null, null, String.class));
}
@Test
@TestException(exception = IllegalArgumentException.class)
public void userNameEmpty() {
UserProvider.UserParam userParam = new UserProvider.UserParam("username");
userParam.parseParam("");
}
@Test
@TestException(exception = IllegalArgumentException.class)
public void userNameTooLong() {
UserProvider.UserParam userParam = new UserProvider.UserParam("username");
userParam.parseParam("a123456789012345678901234567890x");
}
@Test
@TestException(exception = IllegalArgumentException.class)
public void userNameInvalidStart() {
UserProvider.UserParam userParam = new UserProvider.UserParam("username");
userParam.parseParam("1x");
}
@Test
@TestException(exception = IllegalArgumentException.class)
public void userNameInvalidDollarSign() {
UserProvider.UserParam userParam = new UserProvider.UserParam("username");
userParam.parseParam("1$x");
}
@Test
public void userNameMinLength() {
UserProvider.UserParam userParam = new UserProvider.UserParam("username");
assertNotNull(userParam.parseParam("a"));
}
@Test
public void userNameMaxLength() {
UserProvider.UserParam userParam = new UserProvider.UserParam("username");
assertNotNull(userParam.parseParam("a123456789012345678901234567890"));
}
@Test
public void userNameValidDollarSign() {
UserProvider.UserParam userParam = new UserProvider.UserParam("username");
assertNotNull(userParam.parseParam("a$"));
}
}

View File

@ -159,6 +159,11 @@ Trunk (Unreleased)
HDFS-4153. Add START_MSG/SHUTDOWN_MSG for JournalNode. (liang xie via atm)
HDFS-3935. Add JournalNode to the start/stop scripts (Andy Isaacson via todd)
HDFS-4206. Change the fields in INode and its subclasses to private.
(szetszwo)
OPTIMIZATIONS
BUG FIXES
@ -245,12 +250,12 @@ Trunk (Unreleased)
HDFS-4115. TestHDFSCLI.testAll fails one test due to number format.
(Trevor Robinson via suresh)
HDFS-4106. BPServiceActor#lastHeartbeat, lastBlockReport and
lastDeletedReport should be volatile. (Jing Zhao via suresh)
HDFS-4165. Faulty sanity check in FsDirectory.unprotectedSetQuota.
(Binglin Chang via suresh)
HDFS-4105. The SPNEGO user for secondary namenode should use the web
keytab. (Arpit Gupta via jitendra)
BREAKDOWN OF HDFS-3077 SUBTASKS
HDFS-3077. Quorum-based protocol for reading and writing edit logs.
@ -467,6 +472,9 @@ Release 2.0.3-alpha - Unreleased
HDFS-1322. Document umask in DistributedFileSystem#mkdirs javadocs.
(Colin Patrick McCabe via eli)
HDFS-4038. Override toString() for BookKeeperEditLogInputStream.
(Vinay via umamahesh)
OPTIMIZATIONS
BUG FIXES
@ -586,6 +594,29 @@ Release 2.0.3-alpha - Unreleased
HDFS-3921. NN will prematurely consider blocks missing when entering active
state while still in safe mode. (atm)
HDFS-4106. BPServiceActor#lastHeartbeat, lastBlockReport and
lastDeletedReport should be volatile. (Jing Zhao via suresh)
HDFS-4139. fuse-dfs RO mode still allows file truncation.
(Colin Patrick McCabe via eli)
HDFS-4104. dfs -test -d prints inappropriate error on nonexistent directory
(Andy Isaacson via daryn)
HDFS-3623. BKJM: zkLatchWaitTimeout hard coded to 6000. Make use of ZKSessionTimeout instead.
(umamahesh)
HDFS-4100. Fix all findbug security warings. (Liang Xie via eli)
HDFS-3507. DFS#isInSafeMode needs to execute only on Active NameNode.
(Vinay via atm)
HDFS-4156. Seeking to a negative position should throw an IOE.
(Eli Reisman via eli)
HDFS-4171. WebHDFS and HttpFs should accept only valid Unix user
names. (tucu)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -1963,6 +1994,18 @@ Release 2.0.0-alpha - 05-23-2012
HDFS-3039. Address findbugs and javadoc warnings on branch. (todd via atm)
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
@ -2000,7 +2043,12 @@ Release 0.23.5 - UNRELEASED
HDFS-4172. namenode does not URI-encode parameters when building URI for
datanode request (Derek Dagit via bobby)
Release 0.23.4 - UNRELEASED
HDFS-4182. SecondaryNameNode leaks NameCache entries (bobby)
HDFS-4186. logSync() is called with the write lock held while releasing
lease (Kihwal Lee via daryn)
Release 0.23.4
INCOMPATIBLE CHANGES

View File

@ -129,8 +129,9 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
@Override
public String getName() {
return String.format("BookKeeper[%s,first=%d,last=%d]",
lh.toString(), firstTxId, lastTxId);
return String.format(
"BookKeeperLedger[ledgerId=%d,firstTxId=%d,lastTxId=%d]", lh.getId(),
firstTxId, lastTxId);
}
@Override
@ -157,6 +158,11 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
}
}
@Override
public String toString() {
return ("BookKeeperEditLogInputStream {" + this.getName() + "}");
}
/**
* Input stream implementation which can be used by
* FSEditLogOp.Reader

View File

@ -180,9 +180,16 @@ public class BookKeeperJournalManager implements JournalManager {
try {
zkConnectLatch = new CountDownLatch(1);
zkc = new ZooKeeper(zkConnect, conf.getInt(BKJM_ZK_SESSION_TIMEOUT,
BKJM_ZK_SESSION_TIMEOUT_DEFAULT), new ZkConnectionWatcher());
if (!zkConnectLatch.await(6000, TimeUnit.MILLISECONDS)) {
int bkjmZKSessionTimeout = conf.getInt(BKJM_ZK_SESSION_TIMEOUT,
BKJM_ZK_SESSION_TIMEOUT_DEFAULT);
zkc = new ZooKeeper(zkConnect, bkjmZKSessionTimeout,
new ZkConnectionWatcher());
// Configured zk session timeout + some extra grace period (here
// BKJM_ZK_SESSION_TIMEOUT_DEFAULT used as grace period)
int zkConnectionLatchTimeout = bkjmZKSessionTimeout
+ BKJM_ZK_SESSION_TIMEOUT_DEFAULT;
if (!zkConnectLatch
.await(zkConnectionLatchTimeout, TimeUnit.MILLISECONDS)) {
throw new IOException("Error connecting to zookeeper");
}

View File

@ -85,6 +85,21 @@ if [ -n "$SECONDARY_NAMENODES" ]; then
--script "$bin/hdfs" start secondarynamenode
fi
#---------------------------------------------------------
# quorumjournal nodes (if any)
SHARED_EDITS_DIR=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.namenode.shared.edits.dir 2>&-)
case "$SHARED_EDITS_DIR" in
qjournal://*)
JOURNAL_NODES=$(echo "$SHARED_EDITS_DIR" | sed 's,qjournal://\([^/]*\)/.*,\1,g; s/;/ /g; s/:[0-9]*//g')
echo "Starting journal nodes [$JOURNAL_NODES]"
"$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
--config "$HADOOP_CONF_DIR" \
--hostnames "$JOURNAL_NODES" \
--script "$bin/hdfs" start journalnode ;;
esac
#---------------------------------------------------------
# ZK Failover controllers, if auto-HA is enabled
AUTOHA_ENABLED=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.ha.automatic-failover.enabled)

View File

@ -61,6 +61,21 @@ if [ -n "$SECONDARY_NAMENODES" ]; then
--script "$bin/hdfs" stop secondarynamenode
fi
#---------------------------------------------------------
# quorumjournal nodes (if any)
SHARED_EDITS_DIR=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.namenode.shared.edits.dir 2>&-)
case "$SHARED_EDITS_DIR" in
qjournal://*)
JOURNAL_NODES=$(echo "$SHARED_EDITS_DIR" | sed 's,qjournal://\([^/]*\)/.*,\1,g; s/;/ /g; s/:[0-9]*//g')
echo "Stopping journal nodes [$JOURNAL_NODES]"
"$HADOOP_PREFIX/sbin/hadoop-daemons.sh" \
--config "$HADOOP_CONF_DIR" \
--hostnames "$JOURNAL_NODES" \
--script "$bin/hdfs" stop journalnode ;;
esac
#---------------------------------------------------------
# ZK Failover controllers, if auto-HA is enabled
AUTOHA_ENABLED=$($HADOOP_PREFIX/bin/hdfs getconf -confKey dfs.ha.automatic-failover.enabled)

View File

@ -1883,10 +1883,25 @@ public class DFSClient implements java.io.Closeable {
/**
* Enter, leave or get safe mode.
*
* @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction)
* @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeAction,boolean)
*/
public boolean setSafeMode(SafeModeAction action) throws IOException {
return namenode.setSafeMode(action);
return setSafeMode(action, false);
}
/**
* Enter, leave or get safe mode.
*
* @param action
* One of SafeModeAction.GET, SafeModeAction.ENTER and
* SafeModeActiob.LEAVE
* @param isChecked
* If true, then check only active namenode's safemode status, else
* check first namenode's status.
* @see ClientProtocol#setSafeMode(HdfsConstants.SafeModeActio,boolean)
*/
public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{
return namenode.setSafeMode(action, isChecked);
}
/**

View File

@ -1076,6 +1076,9 @@ public class DFSInputStream extends FSInputStream implements ByteBufferReadable
if (targetPos > getFileLength()) {
throw new IOException("Cannot seek after EOF");
}
if (targetPos < 0) {
throw new IOException("Cannot seek to negative offset");
}
if (closed) {
throw new IOException("Stream is closed!");
}

View File

@ -627,11 +627,27 @@ public class DistributedFileSystem extends FileSystem {
* Enter, leave or get safe mode.
*
* @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(
* HdfsConstants.SafeModeAction)
* HdfsConstants.SafeModeAction,boolean)
*/
public boolean setSafeMode(HdfsConstants.SafeModeAction action)
throws IOException {
return dfs.setSafeMode(action);
return setSafeMode(action, false);
}
/**
* Enter, leave or get safe mode.
*
* @param action
* One of SafeModeAction.ENTER, SafeModeAction.LEAVE and
* SafeModeAction.GET
* @param isChecked
* If true check only for Active NNs status, else check first NN's
* status
* @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setSafeMode(SafeModeAction, boolean)
*/
public boolean setSafeMode(HdfsConstants.SafeModeAction action,
boolean isChecked) throws IOException {
return dfs.setSafeMode(action, isChecked);
}
/**
@ -878,13 +894,15 @@ public class DistributedFileSystem extends FileSystem {
}
/**
* Utility function that returns if the NameNode is in safemode or not.
*
* Utility function that returns if the NameNode is in safemode or not. In HA
* mode, this API will return only ActiveNN's safemode status.
*
* @return true if NameNode is in safemode, false otherwise.
* @throws IOException when there is an issue communicating with the NameNode
* @throws IOException
* when there is an issue communicating with the NameNode
*/
public boolean isInSafeMode() throws IOException {
return setSafeMode(SafeModeAction.SAFEMODE_GET);
return setSafeMode(SafeModeAction.SAFEMODE_GET, true);
}
/**

View File

@ -621,7 +621,7 @@ public interface ClientProtocol {
* <p>
* Safe mode is entered automatically at name node startup.
* Safe mode can also be entered manually using
* {@link #setSafeMode(HdfsConstants.SafeModeAction) setSafeMode(SafeModeAction.SAFEMODE_GET)}.
* {@link #setSafeMode(HdfsConstants.SafeModeAction,boolean) setSafeMode(SafeModeAction.SAFEMODE_ENTER,false)}.
* <p>
* At startup the name node accepts data node reports collecting
* information about block locations.
@ -637,11 +637,11 @@ public interface ClientProtocol {
* Then the name node leaves safe mode.
* <p>
* If safe mode is turned on manually using
* {@link #setSafeMode(HdfsConstants.SafeModeAction) setSafeMode(SafeModeAction.SAFEMODE_ENTER)}
* {@link #setSafeMode(HdfsConstants.SafeModeAction,boolean) setSafeMode(SafeModeAction.SAFEMODE_ENTER,false)}
* then the name node stays in safe mode until it is manually turned off
* using {@link #setSafeMode(HdfsConstants.SafeModeAction) setSafeMode(SafeModeAction.SAFEMODE_LEAVE)}.
* using {@link #setSafeMode(HdfsConstants.SafeModeAction,boolean) setSafeMode(SafeModeAction.SAFEMODE_LEAVE,false)}.
* Current state of the name node can be verified using
* {@link #setSafeMode(HdfsConstants.SafeModeAction) setSafeMode(SafeModeAction.SAFEMODE_GET)}
* {@link #setSafeMode(HdfsConstants.SafeModeAction,boolean) setSafeMode(SafeModeAction.SAFEMODE_GET,false)}
* <h4>Configuration parameters:</h4>
* <tt>dfs.safemode.threshold.pct</tt> is the threshold parameter.<br>
* <tt>dfs.safemode.extension</tt> is the safe mode extension parameter.<br>
@ -659,12 +659,15 @@ public interface ClientProtocol {
* @param action <ul> <li>0 leave safe mode;</li>
* <li>1 enter safe mode;</li>
* <li>2 get safe mode state.</li></ul>
* @param isChecked If true then action will be done only in ActiveNN.
*
* @return <ul><li>0 if the safe mode is OFF or</li>
* <li>1 if the safe mode is ON.</li></ul>
*
* @throws IOException
*/
public boolean setSafeMode(HdfsConstants.SafeModeAction action)
@Idempotent
public boolean setSafeMode(HdfsConstants.SafeModeAction action, boolean isChecked)
throws IOException;
/**

View File

@ -535,7 +535,8 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
public SetSafeModeResponseProto setSafeMode(RpcController controller,
SetSafeModeRequestProto req) throws ServiceException {
try {
boolean result = server.setSafeMode(PBHelper.convert(req.getAction()));
boolean result = server.setSafeMode(PBHelper.convert(req.getAction()),
req.getChecked());
return SetSafeModeResponseProto.newBuilder().setResult(result).build();
} catch (IOException e) {
throw new ServiceException(e);

View File

@ -511,9 +511,9 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
@Override
public boolean setSafeMode(SafeModeAction action) throws IOException {
SetSafeModeRequestProto req = SetSafeModeRequestProto.newBuilder().
setAction(PBHelper.convert(action)).build();
public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException {
SetSafeModeRequestProto req = SetSafeModeRequestProto.newBuilder()
.setAction(PBHelper.convert(action)).setChecked(isChecked).build();
try {
return rpcProxy.setSafeMode(null, req).getResult();
} catch (ServiceException e) {

View File

@ -31,6 +31,7 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -139,8 +140,9 @@ public class GetJournalEditServlet extends HttpServlet {
HttpServletRequest request, HttpServletResponse response)
throws IOException {
String myStorageInfoString = storage.toColonSeparatedString();
String theirStorageInfoString = request.getParameter(STORAGEINFO_PARAM);
String theirStorageInfoString = StringEscapeUtils.escapeHtml(
request.getParameter(STORAGEINFO_PARAM));
if (theirStorageInfoString != null
&& !myStorageInfoString.equals(theirStorageInfoString)) {
String msg = "This node has storage info '" + myStorageInfoString

View File

@ -259,7 +259,8 @@ public class DatanodeJspHelper {
int namenodeInfoPort = -1;
if (namenodeInfoPortStr != null)
namenodeInfoPort = Integer.parseInt(namenodeInfoPortStr);
final String nnAddr = req.getParameter(JspHelper.NAMENODE_ADDRESS);
final String nnAddr = StringEscapeUtils.escapeHtml(
req.getParameter(JspHelper.NAMENODE_ADDRESS));
if (nnAddr == null){
out.print(JspHelper.NAMENODE_ADDRESS + " url param is null");
return;
@ -637,7 +638,7 @@ public class DatanodeJspHelper {
UserGroupInformation ugi = JspHelper.getUGI(req, conf);
String namenodeInfoPortStr = req.getParameter("namenodeInfoPort");
String nnAddr = req.getParameter(JspHelper.NAMENODE_ADDRESS);
String nnAddr = StringEscapeUtils.escapeHtml(req.getParameter(JspHelper.NAMENODE_ADDRESS));
int namenodeInfoPort = -1;
if (namenodeInfoPortStr != null)
namenodeInfoPort = Integer.parseInt(namenodeInfoPortStr);

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.util.ByteArray;
import org.apache.hadoop.hdfs.util.ReadOnlyList;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/*************************************************
@ -126,6 +127,12 @@ public class FSDirectory implements Closeable {
this.cond = dirLock.writeLock().newCondition();
this.namesystem = ns;
int threshold = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
NameNode.LOG.info("Caching file names occuring more than " + threshold
+ " times");
this.nameCache = new NameCache<ByteArray>(threshold);
reset();
this.fsImage = fsImage;
@ -141,13 +148,6 @@ public class FSDirectory implements Closeable {
this.maxDirItems = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
int threshold = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
NameNode.LOG.info("Caching file names occuring more than " + threshold
+ " times");
nameCache = new NameCache<ByteArray>(threshold);
}
private FSNamesystem getFSNamesystem() {
@ -178,6 +178,12 @@ public class FSDirectory implements Closeable {
writeUnlock();
}
}
//This is for testing purposes only
@VisibleForTesting
boolean isReady() {
return ready;
}
// exposed for unit tests
protected void setReady(boolean flag) {
@ -303,14 +309,14 @@ public class FSDirectory implements Closeable {
return newNode;
}
INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
INodeDirectory addToParent(INodeDirectory parentINode,
INode newNode, boolean propagateModTime) {
// NOTE: This does not update space counts for parents
INodeDirectory newParent = null;
writeLock();
try {
try {
newParent = rootDir.addToParent(src, newNode, parentINode,
newParent = rootDir.addToParent(newNode, parentINode,
propagateModTime);
cacheName(newNode);
} catch (FileNotFoundException e) {
@ -539,7 +545,7 @@ public class FSDirectory implements Closeable {
return true;
}
if (srcInode.isSymlink() &&
dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
dst.equals(((INodeSymlink)srcInode).getSymlinkString())) {
throw new FileAlreadyExistsException(
"Cannot rename symlink "+src+" to its target "+dst);
}
@ -667,7 +673,7 @@ public class FSDirectory implements Closeable {
"The source "+src+" and destination "+dst+" are the same");
}
if (srcInode.isSymlink() &&
dst.equals(((INodeSymlink)srcInode).getLinkValue())) {
dst.equals(((INodeSymlink)srcInode).getSymlinkString())) {
throw new FileAlreadyExistsException(
"Cannot rename symlink "+src+" to its target "+dst);
}
@ -1291,7 +1297,7 @@ public class FSDirectory implements Closeable {
HdfsFileStatus listing[] = new HdfsFileStatus[numOfListing];
for (int i=0; i<numOfListing; i++) {
INode cur = contents.get(startChild+i);
listing[i] = createFileStatus(cur.name, cur, needLocation);
listing[i] = createFileStatus(cur.getLocalNameBytes(), cur, needLocation);
}
return new DirectoryListing(
listing, totalNumChildren-startChild-numOfListing);
@ -1519,7 +1525,7 @@ public class FSDirectory implements Closeable {
for(int i=0; i < numOfINodes; i++) {
if (inodes[i].isQuotaSet()) { // a directory with quota
INodeDirectoryWithQuota node =(INodeDirectoryWithQuota)inodes[i];
node.unprotectedUpdateNumItemsInTree(nsDelta, dsDelta);
node.addSpaceConsumed(nsDelta, dsDelta);
}
}
}
@ -2142,11 +2148,18 @@ public class FSDirectory implements Closeable {
* Reset the entire namespace tree.
*/
void reset() {
final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
INodeDirectory.ROOT_NAME,
getFSNamesystem().createFsOwnerPermissions(new FsPermission((short)0755)),
Long.MAX_VALUE, UNKNOWN_DISK_SPACE);
rootDir = INodeDirectorySnapshottable.newInstance(r, 0);
writeLock();
try {
setReady(false);
final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
INodeDirectory.ROOT_NAME,
getFSNamesystem().createFsOwnerPermissions(new FsPermission((short)0755)),
Long.MAX_VALUE, UNKNOWN_DISK_SPACE);
rootDir = INodeDirectorySnapshottable.newInstance(r, 0);
nameCache.reset();
} finally {
writeUnlock();
}
}
/**

View File

@ -257,7 +257,8 @@ class FSImageFormat {
INode newNode = loadINode(in); // read rest of inode
// add to parent
namesystem.dir.addToParent(localName, parent, newNode, false);
newNode.setLocalName(localName);
namesystem.dir.addToParent(parent, newNode, false);
}
return numChildren;
}
@ -291,8 +292,8 @@ class FSImageFormat {
}
// add new inode
parentINode = fsDir.addToParent(pathComponents[pathComponents.length-1],
parentINode, newNode, false);
newNode.setLocalName(pathComponents[pathComponents.length-1]);
parentINode = fsDir.addToParent(parentINode, newNode, false);
}
}

View File

@ -168,7 +168,7 @@ public class FSImageSerialization {
out.writeLong(0); // access time
out.writeLong(0); // preferred block size
out.writeInt(-2); // # of blocks
Text.writeString(out, ((INodeSymlink)node).getLinkValue());
Text.writeString(out, ((INodeSymlink)node).getSymlinkString());
filePerm.fromShort(node.getFsPermissionShort());
PermissionStatus.write(out, node.getUserName(),
node.getGroupName(),

View File

@ -1731,16 +1731,25 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
short replication, long blockSize) throws AccessControlException,
SafeModeException, FileAlreadyExistsException, UnresolvedLinkException,
FileNotFoundException, ParentNotDirectoryException, IOException {
boolean skipSync = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
startFileInternal(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize);
} catch (StandbyException se) {
skipSync = true;
throw se;
} finally {
writeUnlock();
}
getEditLog().logSync();
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
getEditLog().logSync();
}
}
if (auditLog.isInfoEnabled() && isExternalInvocation()) {
final HdfsFileStatus stat = dir.getFileInfo(src, false);
logAuditEvent(UserGroupInformation.getCurrentUser(),
@ -1922,6 +1931,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
boolean recoverLease(String src, String holder, String clientMachine)
throws IOException {
boolean skipSync = false;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
@ -1943,8 +1953,16 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
recoverLeaseInternal(inode, src, holder, clientMachine, true);
} catch (StandbyException se) {
skipSync = true;
throw se;
} finally {
writeUnlock();
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
getEditLog().logSync();
}
}
return false;
}
@ -2047,6 +2065,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
throws AccessControlException, SafeModeException,
FileAlreadyExistsException, FileNotFoundException,
ParentNotDirectoryException, IOException {
boolean skipSync = false;
if (!supportAppends) {
throw new UnsupportedOperationException(
"Append is not enabled on this NameNode. Use the " +
@ -2060,10 +2079,17 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
lb = startFileInternal(src, null, holder, clientMachine,
EnumSet.of(CreateFlag.APPEND),
false, blockManager.maxReplication, 0);
} catch (StandbyException se) {
skipSync = true;
throw se;
} finally {
writeUnlock();
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
getEditLog().logSync();
}
}
getEditLog().logSync();
if (lb != null) {
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.appendFile: file "
@ -3027,7 +3053,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
* RecoveryInProgressException if lease recovery is in progress.<br>
* IOException in case of an error.
* @return true if file has been successfully finalized and closed or
* false if block recovery has been initiated
* false if block recovery has been initiated. Since the lease owner
* has been changed and logged, caller should call logSync().
*/
boolean internalReleaseLease(Lease lease, String src,
String recoveryLeaseHolder) throws AlreadyBeingCreatedException,
@ -3148,6 +3175,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
assert hasWriteLock();
if(newHolder == null)
return lease;
// The following transaction is not synced. Make sure it's sync'ed later.
logReassignLease(lease.getHolder(), src, newHolder);
return reassignLeaseInternal(lease, src, newHolder, pendingFile);
}
@ -5257,13 +5285,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
private void logReassignLease(String leaseHolder, String src,
String newHolder) {
writeLock();
try {
getEditLog().logReassignLease(leaseHolder, src, newHolder);
} finally {
writeUnlock();
}
getEditLog().logSync();
assert hasWriteLock();
getEditLog().logReassignLease(leaseHolder, src, newHolder);
}
/**

View File

@ -49,23 +49,12 @@ public abstract class INode implements Comparable<byte[]> {
static final ReadOnlyList<INode> EMPTY_READ_ONLY_LIST
= ReadOnlyList.Util.emptyList();
/**
* The inode name is in java UTF8 encoding;
* The name in HdfsFileStatus should keep the same encoding as this.
* if this encoding is changed, implicitly getFileInfo and listStatus in
* clientProtocol are changed; The decoding at the client
* side should change accordingly.
*/
protected byte[] name;
protected INodeDirectory parent;
protected long modificationTime;
protected long accessTime;
/** Simple wrapper for two counters :
* nsCount (namespace consumed) and dsCount (diskspace consumed).
*/
/** Wrapper of two counters for namespace consumed and diskspace consumed. */
static class DirCounts {
/** namespace count */
long nsCount = 0;
/** diskspace count */
long dsCount = 0;
/** returns namespace count */
@ -78,10 +67,6 @@ public abstract class INode implements Comparable<byte[]> {
}
}
//Only updated by updatePermissionStatus(...).
//Other codes should not modify it.
private long permission;
private static enum PermissionStatusFormat {
MODE(0, 16),
GROUP(MODE.OFFSET + MODE.LENGTH, 25),
@ -104,31 +89,67 @@ public abstract class INode implements Comparable<byte[]> {
long combine(long bits, long record) {
return (record & ~MASK) | (bits << OFFSET);
}
/** Set the {@link PermissionStatus} */
static long toLong(PermissionStatus ps) {
long permission = 0L;
final int user = SerialNumberManager.INSTANCE.getUserSerialNumber(
ps.getUserName());
permission = PermissionStatusFormat.USER.combine(user, permission);
final int group = SerialNumberManager.INSTANCE.getGroupSerialNumber(
ps.getGroupName());
permission = PermissionStatusFormat.GROUP.combine(group, permission);
final int mode = ps.getPermission().toShort();
permission = PermissionStatusFormat.MODE.combine(mode, permission);
return permission;
}
}
INode(PermissionStatus permissions, long mTime, long atime) {
this.name = null;
this.parent = null;
this.modificationTime = mTime;
setAccessTime(atime);
setPermissionStatus(permissions);
/**
* The inode name is in java UTF8 encoding;
* The name in HdfsFileStatus should keep the same encoding as this.
* if this encoding is changed, implicitly getFileInfo and listStatus in
* clientProtocol are changed; The decoding at the client
* side should change accordingly.
*/
private byte[] name = null;
/**
* Permission encoded using PermissionStatusFormat.
* Codes other than {@link #updatePermissionStatus(PermissionStatusFormat, long)}.
* should not modify it.
*/
private long permission = 0L;
protected INodeDirectory parent = null;
protected long modificationTime = 0L;
protected long accessTime = 0L;
private INode(byte[] name, long permission, INodeDirectory parent,
long modificationTime, long accessTime) {
this.name = name;
this.permission = permission;
this.parent = parent;
this.modificationTime = modificationTime;
this.accessTime = accessTime;
}
INode(byte[] name, PermissionStatus permissions, INodeDirectory parent,
long modificationTime, long accessTime) {
this(name, PermissionStatusFormat.toLong(permissions), parent,
modificationTime, accessTime);
}
INode(PermissionStatus permissions, long mtime, long atime) {
this(null, permissions, null, mtime, atime);
}
protected INode(String name, PermissionStatus permissions) {
this(permissions, 0L, 0L);
setLocalName(name);
this(DFSUtil.string2Bytes(name), permissions, null, 0L, 0L);
}
/** copy constructor
*
* @param other Other node to be copied
*/
/** @param other Other node to be copied */
INode(INode other) {
setLocalName(other.getLocalName());
this.parent = other.getParent();
setPermissionStatus(other.getPermissionStatus());
setModificationTime(other.getModificationTime());
setAccessTime(other.getAccessTime());
this(other.getLocalNameBytes(), other.permission, other.getParent(),
other.getModificationTime(), other.getAccessTime());
}
/**

View File

@ -68,9 +68,8 @@ public class INodeDirectory extends INode {
}
/** constructor */
INodeDirectory(byte[] localName, PermissionStatus permissions, long mTime) {
this(permissions, mTime);
this.name = localName;
INodeDirectory(byte[] name, PermissionStatus permissions, long mtime) {
super(name, permissions, null, mtime, 0L);
}
/** copy constructor
@ -93,25 +92,30 @@ public class INodeDirectory extends INode {
return false;
}
INode removeChild(INode node) {
assert children != null;
int low = Collections.binarySearch(children, node.name);
if (low >= 0) {
return children.remove(low);
} else {
return null;
private void assertChildrenNonNull() {
if (children == null) {
throw new AssertionError("children is null: " + this);
}
}
private int searchChildren(INode inode) {
return Collections.binarySearch(children, inode.getLocalNameBytes());
}
INode removeChild(INode node) {
assertChildrenNonNull();
final int i = searchChildren(node);
return i >= 0? children.remove(i): null;
}
/** Replace a child that has the same name as newChild by newChild.
*
* @param newChild Child node to be added
*/
void replaceChild(INode newChild) {
if ( children == null ) {
throw new IllegalArgumentException("The directory is empty");
}
int low = Collections.binarySearch(children, newChild.name);
assertChildrenNonNull();
final int low = searchChildren(newChild);
if (low>=0) { // an old child exists so replace by the newChild
children.set(low, newChild);
} else {
@ -248,7 +252,7 @@ public class INodeDirectory extends INode {
final String remainder =
constructPath(components, count + 1, components.length);
final String link = DFSUtil.bytes2String(components[count]);
final String target = ((INodeSymlink)curNode).getLinkValue();
final String target = ((INodeSymlink)curNode).getSymlinkString();
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("UnresolvedPathException " +
" path: " + path + " preceding: " + preceding +
@ -360,7 +364,7 @@ public class INodeDirectory extends INode {
if (children == null) {
children = new ArrayList<INode>(DEFAULT_FILES_PER_DIRECTORY);
}
int low = Collections.binarySearch(children, node.name);
final int low = searchChildren(node);
if(low >= 0)
return null;
node.parent = this;
@ -400,13 +404,9 @@ public class INodeDirectory extends INode {
* @throws FileNotFoundException if parent does not exist or
* is not a directory.
*/
INodeDirectory addToParent( byte[] localname,
INode newNode,
INodeDirectory parent,
boolean propagateModTime
) throws FileNotFoundException {
INodeDirectory addToParent(INode newNode, INodeDirectory parent,
boolean propagateModTime) throws FileNotFoundException {
// insert into the parent children list
newNode.name = localname;
if(parent.addChild(newNode, propagateModTime) == null)
return null;
return parent;
@ -444,7 +444,7 @@ public class INodeDirectory extends INode {
if (pathComponents.length < 2) { // add root
return null;
}
newNode.name = pathComponents[pathComponents.length - 1];
newNode.setLocalName(pathComponents[pathComponents.length - 1]);
// insert into the parent children list
INodeDirectory parent = getParent(pathComponents);
return parent.addChild(newNode, propagateModTime) == null? null: parent;

View File

@ -27,9 +27,9 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
*/
public class INodeDirectoryWithQuota extends INodeDirectory {
private long nsQuota; /// NameSpace quota
private long nsCount;
private long nsCount = 1L;
private long dsQuota; /// disk space quota
private long diskspace;
private long diskspace = 0L;
/** Convert an existing directory inode to one with the given quota
*
@ -44,7 +44,8 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
other.spaceConsumedInTree(counts);
this.nsCount = counts.getNsCount();
this.diskspace = counts.getDsCount();
setQuota(nsQuota, dsQuota);
this.nsQuota = nsQuota;
this.dsQuota = dsQuota;
}
/** constructor with no quota verification */
@ -53,7 +54,6 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
super(permissions, modificationTime);
this.nsQuota = nsQuota;
this.dsQuota = dsQuota;
this.nsCount = 1;
}
/** constructor with no quota verification */
@ -62,7 +62,6 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
super(name, permissions);
this.nsQuota = nsQuota;
this.dsQuota = dsQuota;
this.nsCount = 1;
}
/** Get this directory's namespace quota
@ -116,19 +115,8 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
* @param nsDelta the change of the tree size
* @param dsDelta change to disk space occupied
*/
void updateNumItemsInTree(long nsDelta, long dsDelta) {
nsCount += nsDelta;
diskspace += dsDelta;
}
/** Update the size of the tree
*
* @param nsDelta the change of the tree size
* @param dsDelta change to disk space occupied
**/
void unprotectedUpdateNumItemsInTree(long nsDelta, long dsDelta) {
nsCount = nsCount + nsDelta;
diskspace = diskspace + dsDelta;
void addSpaceConsumed(long nsDelta, long dsDelta) {
setSpaceConsumed(nsCount + nsDelta, diskspace + dsDelta);
}
/**

View File

@ -45,14 +45,43 @@ public class INodeFile extends INode implements BlockCollection {
static final FsPermission UMASK = FsPermission.createImmutable((short)0111);
//Number of bits for Block size
static final short BLOCKBITS = 48;
//Header mask 64-bit representation
//Format: [16 bits for replication][48 bits for PreferredBlockSize]
static final long HEADERMASK = 0xffffL << BLOCKBITS;
/** Format: [16 bits for replication][48 bits for PreferredBlockSize] */
private static class HeaderFormat {
/** Number of bits for Block size */
static final int BLOCKBITS = 48;
/** Header mask 64-bit representation */
static final long HEADERMASK = 0xffffL << BLOCKBITS;
static final long MAX_BLOCK_SIZE = ~HEADERMASK;
static short getReplication(long header) {
return (short) ((header & HEADERMASK) >> BLOCKBITS);
}
private long header;
static long combineReplication(long header, short replication) {
if (replication <= 0) {
throw new IllegalArgumentException(
"Unexpected value for the replication: " + replication);
}
return ((long)replication << BLOCKBITS) | (header & MAX_BLOCK_SIZE);
}
static long getPreferredBlockSize(long header) {
return header & MAX_BLOCK_SIZE;
}
static long combinePreferredBlockSize(long header, long blockSize) {
if (blockSize < 0) {
throw new IllegalArgumentException("Block size < 0: " + blockSize);
} else if (blockSize > MAX_BLOCK_SIZE) {
throw new IllegalArgumentException("Block size = " + blockSize
+ " > MAX_BLOCK_SIZE = " + MAX_BLOCK_SIZE);
}
return (header & HEADERMASK) | (blockSize & MAX_BLOCK_SIZE);
}
}
private long header = 0L;
private BlockInfo[] blocks;
@ -60,15 +89,15 @@ public class INodeFile extends INode implements BlockCollection {
short replication, long modificationTime,
long atime, long preferredBlockSize) {
super(permissions, modificationTime, atime);
this.setFileReplication(replication);
this.setPreferredBlockSize(preferredBlockSize);
header = HeaderFormat.combineReplication(header, replication);
header = HeaderFormat.combinePreferredBlockSize(header, preferredBlockSize);
this.blocks = blklist;
}
protected INodeFile(INodeFile f) {
this(f.getPermissionStatus(), f.getBlocks(), f.getFileReplication(),
f.getModificationTime(), f.getAccessTime(), f.getPreferredBlockSize());
this.name = f.getLocalNameBytes();
this.setLocalName(f.getLocalNameBytes());
}
/**
@ -83,7 +112,7 @@ public class INodeFile extends INode implements BlockCollection {
/** @return the replication factor of the file. */
public final short getFileReplication() {
return (short) ((header & HEADERMASK) >> BLOCKBITS);
return HeaderFormat.getReplication(header);
}
@Override
@ -92,21 +121,13 @@ public class INodeFile extends INode implements BlockCollection {
}
protected void setFileReplication(short replication) {
if(replication <= 0)
throw new IllegalArgumentException("Unexpected value for the replication");
header = ((long)replication << BLOCKBITS) | (header & ~HEADERMASK);
header = HeaderFormat.combineReplication(header, replication);
}
/** @return preferred block size (in bytes) of the file. */
@Override
public long getPreferredBlockSize() {
return header & ~HEADERMASK;
}
private void setPreferredBlockSize(long preferredBlkSize) {
if((preferredBlkSize < 0) || (preferredBlkSize > ~HEADERMASK ))
throw new IllegalArgumentException("Unexpected value for the block size");
header = (header & HEADERMASK) | (preferredBlkSize & ~HEADERMASK);
return HeaderFormat.getPreferredBlockSize(header);
}
/** @return the blocks of the file. */

View File

@ -22,19 +22,16 @@ import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.DFSUtil;
/**
* An INode representing a symbolic link.
* An {@link INode} representing a symbolic link.
*/
@InterfaceAudience.Private
public class INodeSymlink extends INode {
private byte[] symlink; // The target URI
private final byte[] symlink; // The target URI
INodeSymlink(String value, long modTime, long atime,
INodeSymlink(String value, long mtime, long atime,
PermissionStatus permissions) {
super(permissions, modTime, atime);
assert value != null;
setLinkValue(value);
setModificationTimeForce(modTime);
setAccessTime(atime);
super(permissions, mtime, atime);
this.symlink = DFSUtil.string2Bytes(value);
}
public INodeSymlink(INodeSymlink that) {
@ -49,12 +46,8 @@ public class INodeSymlink extends INode {
public boolean isSymlink() {
return true;
}
void setLinkValue(String value) {
this.symlink = DFSUtil.string2Bytes(value);
}
public String getLinkValue() {
public String getSymlinkString() {
return DFSUtil.bytes2String(symlink);
}

View File

@ -401,17 +401,21 @@ public class LeaseManager {
@Override
public void run() {
for(; shouldRunMonitor && fsnamesystem.isRunning(); ) {
boolean needSync = false;
try {
fsnamesystem.writeLockInterruptibly();
try {
if (!fsnamesystem.isInSafeMode()) {
checkLeases();
needSync = checkLeases();
}
} finally {
fsnamesystem.writeUnlock();
// lease reassignments should to be sync'ed.
if (needSync) {
fsnamesystem.getEditLog().logSync();
}
}
Thread.sleep(HdfsServerConstants.NAMENODE_LEASE_RECHECK_INTERVAL);
} catch(InterruptedException ie) {
if (LOG.isDebugEnabled()) {
@ -422,13 +426,16 @@ public class LeaseManager {
}
}
/** Check the leases beginning from the oldest. */
private synchronized void checkLeases() {
/** Check the leases beginning from the oldest.
* @return true is sync is needed.
*/
private synchronized boolean checkLeases() {
boolean needSync = false;
assert fsnamesystem.hasWriteLock();
for(; sortedLeases.size() > 0; ) {
final Lease oldest = sortedLeases.first();
if (!oldest.expiredHardLimit()) {
return;
return needSync;
}
LOG.info(oldest + " has expired hard limit");
@ -451,6 +458,10 @@ public class LeaseManager {
LOG.debug("Started block recovery " + p + " lease " + oldest);
}
}
// If a lease recovery happened, we need to sync later.
if (!needSync && !completed) {
needSync = true;
}
} catch (IOException e) {
LOG.error("Cannot release the path " + p + " in the lease "
+ oldest, e);
@ -462,6 +473,7 @@ public class LeaseManager {
removeLease(oldest, p);
}
}
return needSync;
}
@Override

View File

@ -152,4 +152,14 @@ class NameCache<K> {
cache.put(name, name);
lookups += useThreshold;
}
public void reset() {
initialized = false;
cache.clear();
if (transientMap == null) {
transientMap = new HashMap<K, UseCount>();
} else {
transientMap.clear();
}
}
}

View File

@ -714,8 +714,17 @@ class NameNodeRpcServer implements NamenodeProtocols {
}
@Override // ClientProtocol
public boolean setSafeMode(SafeModeAction action) throws IOException {
namesystem.checkOperation(OperationCategory.UNCHECKED);
public boolean setSafeMode(SafeModeAction action, boolean isChecked)
throws IOException {
OperationCategory opCategory = OperationCategory.UNCHECKED;
if (isChecked) {
if (action == SafeModeAction.SAFEMODE_GET) {
opCategory = OperationCategory.READ;
} else {
opCategory = OperationCategory.WRITE;
}
}
namesystem.checkOperation(opCategory);
return namesystem.setSafeMode(action);
}

View File

@ -250,8 +250,15 @@ public class SecondaryNameNode implements Runnable {
new AccessControlList(conf.get(DFS_ADMIN, " "))) {
{
if (UserGroupInformation.isSecurityEnabled()) {
initSpnego(conf, DFSConfigKeys.DFS_SECONDARY_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY,
DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY);
String httpKeytabKey = DFSConfigKeys.
DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY;
if (null == conf.get(httpKeytabKey)) {
httpKeytabKey = DFSConfigKeys.DFS_SECONDARY_NAMENODE_KEYTAB_FILE_KEY;
}
initSpnego(
conf,
DFSConfigKeys.DFS_SECONDARY_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY,
httpKeytabKey);
}
}
};
@ -886,6 +893,7 @@ public class SecondaryNameNode implements Runnable {
"just been downloaded");
}
dstImage.reloadFromImageFile(file, dstNamesystem);
dstNamesystem.dir.imageLoadComplete();
}
Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.ipc.RPC;
@ -399,7 +400,7 @@ public class DFSAdmin extends FsShell {
} catch (java.lang.InterruptedException e) {
throw new IOException("Wait Interrupted");
}
inSafeMode = dfs.isInSafeMode();
inSafeMode = dfs.setSafeMode(SafeModeAction.SAFEMODE_GET);
}
}

View File

@ -48,7 +48,7 @@ abstract class StringParam extends Param<String, StringParam.Domain> {
@Override
final String parse(final String str) {
if (pattern != null) {
if (str != null && pattern != null) {
if (!pattern.matcher(str).matches()) {
throw new IllegalArgumentException("Invalid value: \"" + str
+ "\" does not belong to the domain " + getDomain());

View File

@ -19,6 +19,9 @@ package org.apache.hadoop.hdfs.web.resources;
import org.apache.hadoop.security.UserGroupInformation;
import java.text.MessageFormat;
import java.util.regex.Pattern;
/** User parameter. */
public class UserParam extends StringParam {
/** Parameter name. */
@ -26,14 +29,29 @@ public class UserParam extends StringParam {
/** Default parameter value. */
public static final String DEFAULT = "";
private static final Domain DOMAIN = new Domain(NAME, null);
private static final Domain DOMAIN = new Domain(NAME,
Pattern.compile("^[A-Za-z_][A-Za-z0-9._-]*[$]?$"));
private static String validateLength(String str) {
if (str == null) {
throw new IllegalArgumentException(
MessageFormat.format("Parameter [{0}], cannot be NULL", NAME));
}
int len = str.length();
if (len < 1 || len > 31) {
throw new IllegalArgumentException(MessageFormat.format(
"Parameter [{0}], invalid value [{1}], it's length must be between 1 and 31",
NAME, str));
}
return str;
}
/**
* Constructor.
* @param str a string representation of the parameter value.
*/
public UserParam(final String str) {
super(DOMAIN, str == null || str.equals(DEFAULT)? null: str);
super(DOMAIN, str == null || str.equals(DEFAULT)? null : validateLength(str));
}
/**

View File

@ -31,7 +31,6 @@
//
typedef struct dfs_context_struct {
int debug;
int read_only;
int usetrash;
int direct_io;
char **protectedpaths;

View File

@ -93,6 +93,18 @@ int main(int argc, char *argv[])
if (!options.no_permissions) {
fuse_opt_add_arg(&args, "-odefault_permissions");
}
/*
* FUSE already has a built-in parameter for mounting the filesystem as
* read-only, -r. We defined our own parameter for doing this called -oro.
* We support it by translating it into -r internally.
* The kernel intercepts and returns an error message for any "write"
* operations that the user attempts to perform on a read-only filesystem.
* That means that we don't have to write any code to handle read-only mode.
* See HDFS-4139 for more details.
*/
if (options.read_only) {
fuse_opt_add_arg(&args, "-r");
}
{
char buf[80];

View File

@ -39,11 +39,6 @@ int dfs_mkdir(const char *path, mode_t mode)
return -EACCES;
}
if (dfs->read_only) {
ERROR("HDFS is configured read-only, cannot create directory %s", path);
return -EACCES;
}
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "

View File

@ -43,11 +43,6 @@ int dfs_rename(const char *from, const char *to)
return -EACCES;
}
if (dfs->read_only) {
ERROR("HDFS configured read-only, cannot rename directory %s", from);
return -EACCES;
}
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "

View File

@ -44,12 +44,6 @@ int dfs_rmdir(const char *path)
goto cleanup;
}
if (dfs->read_only) {
ERROR("HDFS configured read-only, cannot delete directory %s", path);
ret = -EACCES;
goto cleanup;
}
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "

View File

@ -40,12 +40,6 @@ int dfs_unlink(const char *path)
goto cleanup;
}
if (dfs->read_only) {
ERROR("HDFS configured read-only, cannot create directory %s", path);
ret = -EACCES;
goto cleanup;
}
ret = fuseConnectAsThreadUid(&conn);
if (ret) {
fprintf(stderr, "fuseConnectAsThreadUid: failed to open a libhdfs "

View File

@ -114,7 +114,6 @@ void *dfs_init(void)
// initialize the context
dfs->debug = options.debug;
dfs->read_only = options.read_only;
dfs->usetrash = options.usetrash;
dfs->protectedpaths = NULL;
dfs->rdbuffer_size = options.rdbuffer_size;

View File

@ -265,6 +265,7 @@ enum SafeModeActionProto {
message SetSafeModeRequestProto {
required SafeModeActionProto action = 1;
optional bool checked = 2 [default = false];
}
message SetSafeModeResponseProto {

View File

@ -452,6 +452,15 @@ public class TestDFSShell {
assertEquals(" no error ", 0, ret);
assertTrue("empty path specified",
(returned.lastIndexOf("empty string") == -1));
out.reset();
argv = new String[3];
argv[0] = "-test";
argv[1] = "-d";
argv[2] = "/no/such/dir";
ret = ToolRunner.run(shell, argv);
returned = out.toString();
assertEquals(" -test -d wrong result ", 1, ret);
assertTrue(returned.isEmpty());
} finally {
if (bak != null) {
System.setErr(bak);

View File

@ -61,9 +61,11 @@ public class TestFetchImage {
fs.mkdirs(new Path("/foo2"));
fs.mkdirs(new Path("/foo3"));
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc()
.setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
cluster.getNameNodeRpc().saveNamespace();
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.getNameNodeRpc()
.setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
runFetchImage(dfsAdmin, cluster);
} finally {

View File

@ -133,14 +133,16 @@ public class TestLeaseRecovery {
filestr = "/foo.safemode";
filepath = new Path(filestr);
dfs.create(filepath, (short)1);
cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_ENTER, false);
assertTrue(dfs.dfs.exists(filestr));
DFSTestUtil.waitReplication(dfs, filepath, (short)1);
waitLeaseRecovery(cluster);
// verify that we still cannot recover the lease
LeaseManager lm = NameNodeAdapter.getLeaseManager(cluster.getNamesystem());
assertTrue("Found " + lm.countLease() + " lease, expected 1", lm.countLease() == 1);
cluster.getNameNodeRpc().setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
cluster.getNameNodeRpc().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false);
}
finally {
if (cluster != null) {cluster.shutdown();}

View File

@ -133,7 +133,69 @@ public class TestSeekBug {
cluster.shutdown();
}
}
/**
* Test (expected to throw IOE) for negative
* <code>FSDataInpuStream#seek</code> argument
*/
@Test (expected=IOException.class)
public void testNegativeSeek() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem();
try {
Path seekFile = new Path("seekboundaries.dat");
DFSTestUtil.createFile(
fs,
seekFile,
ONEMB,
ONEMB,
fs.getDefaultBlockSize(seekFile),
fs.getDefaultReplication(seekFile),
seed);
FSDataInputStream stream = fs.open(seekFile);
// Perform "safe seek" (expected to pass)
stream.seek(65536);
assertEquals(65536, stream.getPos());
// expect IOE for this call
stream.seek(-73);
} finally {
fs.close();
cluster.shutdown();
}
}
/**
* Test (expected to throw IOE) for <code>FSDataInpuStream#seek</code>
* when the position argument is larger than the file size.
*/
@Test (expected=IOException.class)
public void testSeekPastFileSize() throws IOException {
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
FileSystem fs = cluster.getFileSystem();
try {
Path seekFile = new Path("seekboundaries.dat");
DFSTestUtil.createFile(
fs,
seekFile,
ONEMB,
ONEMB,
fs.getDefaultBlockSize(seekFile),
fs.getDefaultReplication(seekFile),
seed);
FSDataInputStream stream = fs.open(seekFile);
// Perform "safe seek" (expected to pass)
stream.seek(65536);
assertEquals(65536, stream.getPos());
// expect IOE for this call
stream.seek(ONEMB + ONEMB + ONEMB);
} finally {
fs.close();
cluster.shutdown();
}
}
/**
* Tests if the seek bug exists in FSDataInputStream in LocalFS.
*/

View File

@ -142,9 +142,9 @@ public class UpgradeUtilities {
writeFile(fs, new Path(baseDir, "file2"), buffer, bufferSize);
// save image
namenode.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
namenode.setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
namenode.saveNamespace();
namenode.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
namenode.setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
// write more files
writeFile(fs, new Path(baseDir, "file3"), buffer, bufferSize);

View File

@ -278,7 +278,8 @@ public class NNThroughputBenchmark {
}
void cleanUp() throws IOException {
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false);
if(!keepResults)
nameNodeProto.delete(getBaseDir(), true);
}
@ -479,7 +480,8 @@ public class NNThroughputBenchmark {
@Override
long executeOp(int daemonId, int inputIdx, String ignore)
throws IOException {
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false);
long start = Time.now();
nameNodeProto.delete(BASE_DIR_NAME, true);
long end = Time.now();
@ -547,7 +549,8 @@ public class NNThroughputBenchmark {
@Override
void generateInputs(int[] opsPerThread) throws IOException {
assert opsPerThread.length == numThreads : "Error opsPerThread.length";
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false);
// int generatedFileIdx = 0;
LOG.info("Generate " + numOpsRequired + " intputs for " + getOpName());
fileNames = new String[numThreads][];
@ -1035,7 +1038,8 @@ public class NNThroughputBenchmark {
FileNameGenerator nameGenerator;
nameGenerator = new FileNameGenerator(getBaseDir(), 100);
String clientName = getClientName(007);
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
nameNodeProto.setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE,
false);
for(int idx=0; idx < nrFiles; idx++) {
String fileName = nameGenerator.getNextFileName("ThroughputBench");
nameNodeProto.create(fileName, FsPermission.getDefault(), clientName,

View File

@ -1828,11 +1828,11 @@ public class TestCheckpoint {
// Now primary NN saves namespace 3 times
NamenodeProtocols nn = cluster.getNameNodeRpc();
nn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
nn.setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
for (int i = 0; i < 3; i++) {
nn.saveNamespace();
}
nn.setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
nn.setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
// Now the secondary tries to checkpoint again with its
// old image in memory.
@ -1919,9 +1919,9 @@ public class TestCheckpoint {
// Perform a saveNamespace, so that the NN has a new fsimage, and the 2NN
// therefore needs to download a new fsimage the next time it performs a
// checkpoint.
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
cluster.getNameNodeRpc().saveNamespace();
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
// Ensure that the 2NN can still perform a checkpoint.
secondary.doCheckpoint();
@ -1966,9 +1966,9 @@ public class TestCheckpoint {
// Perform a saveNamespace, so that the NN has a new fsimage, and the 2NN
// therefore needs to download a new fsimage the next time it performs a
// checkpoint.
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
cluster.getNameNodeRpc().saveNamespace();
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
// Ensure that the 2NN can still perform a checkpoint.
secondary.doCheckpoint();

View File

@ -447,7 +447,7 @@ public class TestEditLog {
// Now ask to sync edit from B, which should sync both edits.
doCallLogSync(threadB, editLog);
assertEquals("logSync from second thread should bump txid up to 2",
assertEquals("logSync from second thread should bump txid up to 3",
3, editLog.getSyncTxId());
// Now ask to sync edit from A, which was already batched in - thus

View File

@ -81,7 +81,6 @@ public class TestFSDirectory {
DFSTestUtil.createFile(hdfs, file5, 1024, REPLICATION, seed);
hdfs.mkdirs(sub2);
}
@After
@ -132,6 +131,16 @@ public class TestFSDirectory {
Assert.assertTrue(diff.contains(file4.getName()));
}
@Test
public void testReset() throws Exception {
fsdir.reset();
Assert.assertFalse(fsdir.isReady());
final INodeDirectory root = (INodeDirectory) fsdir.getINode("/");
Assert.assertTrue(root.getChildrenList(null).isEmpty());
fsdir.imageLoadComplete();
Assert.assertTrue(fsdir.isReady());
}
static void checkClassName(String line) {
int i = line.lastIndexOf('(');
int j = line.lastIndexOf('@');

View File

@ -149,8 +149,8 @@ public class TestListCorruptFileBlocks {
conf.setFloat(DFSConfigKeys.DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,
0f);
cluster = new MiniDFSCluster.Builder(conf).waitSafeMode(false).build();
cluster.getNameNodeRpc().
setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
cluster.getNameNodeRpc().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false);
FileSystem fs = cluster.getFileSystem();
// create two files with one block each
@ -247,8 +247,8 @@ public class TestListCorruptFileBlocks {
cluster.getNameNode().isInSafeMode());
// now leave safe mode so that we can clean up
cluster.getNameNodeRpc().
setSafeMode(HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
cluster.getNameNodeRpc().setSafeMode(
HdfsConstants.SafeModeAction.SAFEMODE_LEAVE, false);
util.cleanup(fs, "/srcdat10");
} catch (Exception e) {

View File

@ -149,8 +149,8 @@ public class TestNNStorageRetentionFunctional {
private static void doSaveNamespace(NameNode nn) throws IOException {
LOG.info("Saving namespace...");
nn.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
nn.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
nn.getRpcServer().saveNamespace();
nn.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
nn.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
}
}

View File

@ -58,6 +58,17 @@ public class TestNameCache {
for (String s : notMatching) {
verifyNameReuse(cache, s, false);
}
cache.reset();
cache.initialized();
for (String s : matching) {
verifyNameReuse(cache, s, false);
}
for (String s : notMatching) {
verifyNameReuse(cache, s, false);
}
}
private void verifyNameReuse(NameCache<String> cache, String s, boolean reused) {

View File

@ -384,7 +384,7 @@ public class TestStartup {
new PermissionStatus("hairong", null, FsPermission.getDefault()), true);
NamenodeProtocols nnRpc = namenode.getRpcServer();
assertTrue(nnRpc.getFileInfo("/test").isDir());
nnRpc.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
nnRpc.setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
nnRpc.saveNamespace();
namenode.stop();
namenode.join();
@ -414,7 +414,7 @@ public class TestStartup {
NameNode namenode = new NameNode(conf);
NamenodeProtocols nnRpc = namenode.getRpcServer();
assertTrue(nnRpc.getFileInfo("/test").isDir());
nnRpc.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
nnRpc.setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
nnRpc.saveNamespace();
namenode.stop();
namenode.join();

View File

@ -17,10 +17,11 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.List;
import org.apache.commons.logging.Log;
@ -34,6 +35,7 @@ import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
@ -43,6 +45,8 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.After;
@ -603,9 +607,9 @@ public class TestHASafeMode {
HATestUtil.waitForStandbyToCatchUp(nn0, nn1);
// get some blocks in the SBN's image
nn1.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
nn1.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
NameNodeAdapter.saveNamespace(nn1);
nn1.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
nn1.getRpcServer().setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
// and some blocks in the edit logs
DFSTestUtil.createFile(fs, new Path("/test2"), 15*BLOCK_SIZE, (short)3, 1L);
@ -664,5 +668,50 @@ public class TestHASafeMode {
string + "\n" +
"==================================================\n\n");
}
/**
* DFS#isInSafeMode should check the ActiveNNs safemode in HA enabled cluster. HDFS-3507
*
* @throws Exception
*/
@Test
public void testIsInSafemode() throws Exception {
// Check for the standby nn without client failover.
NameNode nn2 = cluster.getNameNode(1);
assertTrue("nn2 should be in standby state", nn2.isStandbyState());
InetSocketAddress nameNodeAddress = nn2.getNameNodeAddress();
Configuration conf = new Configuration();
DistributedFileSystem dfs = new DistributedFileSystem();
try {
dfs.initialize(
URI.create("hdfs://" + nameNodeAddress.getHostName() + ":"
+ nameNodeAddress.getPort()), conf);
dfs.isInSafeMode();
fail("StandBy should throw exception for isInSafeMode");
} catch (IOException e) {
if (e instanceof RemoteException) {
IOException sbExcpetion = ((RemoteException) e).unwrapRemoteException();
assertTrue("StandBy nn should not support isInSafeMode",
sbExcpetion instanceof StandbyException);
} else {
throw e;
}
} finally {
if (null != dfs) {
dfs.close();
}
}
// Check with Client FailOver
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.getNameNodeRpc(1).setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
DistributedFileSystem dfsWithFailOver = (DistributedFileSystem) fs;
assertTrue("ANN should be in SafeMode", dfsWithFailOver.isInSafeMode());
cluster.getNameNodeRpc(1).setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
assertFalse("ANN should be out of SafeMode", dfsWithFailOver.isInSafeMode());
}
}

View File

@ -403,9 +403,9 @@ public class TestNameNodeMetrics {
assertGauge("TransactionsSinceLastCheckpoint", 4L, getMetrics(NS_METRICS));
assertGauge("TransactionsSinceLastLogRoll", 1L, getMetrics(NS_METRICS));
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
cluster.getNameNodeRpc().saveNamespace();
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE);
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_LEAVE, false);
long newLastCkptTime = MetricsAsserts.getLongGauge("LastCheckpointTime",
getMetrics(NS_METRICS));

View File

@ -133,7 +133,8 @@ public class TestOfflineImageViewer {
}
// Write results to the fsimage file
cluster.getNameNodeRpc().setSafeMode(SafeModeAction.SAFEMODE_ENTER);
cluster.getNameNodeRpc()
.setSafeMode(SafeModeAction.SAFEMODE_ENTER, false);
cluster.getNameNodeRpc().saveNamespace();
// Determine location of fsimage file

View File

@ -26,6 +26,9 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.junit.Assert;
import org.junit.Test;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
public class TestParam {
public static final Log LOG = LogFactory.getLog(TestParam.class);
@ -234,4 +237,43 @@ public class TestParam {
final String actual = Param.toSortedString(sep, equalParam, ampParam);
Assert.assertEquals(expected, actual);
}
@Test
public void userNameEmpty() {
UserParam userParam = new UserParam("");
assertNull(userParam.getValue());
}
@Test(expected = IllegalArgumentException.class)
public void userNameTooLong() {
new UserParam("a123456789012345678901234567890x");
}
@Test(expected = IllegalArgumentException.class)
public void userNameInvalidStart() {
new UserParam("1x");
}
@Test(expected = IllegalArgumentException.class)
public void userNameInvalidDollarSign() {
new UserParam("1$x");
}
@Test
public void userNameMinLength() {
UserParam userParam = new UserParam("a");
assertNotNull(userParam.getValue());
}
@Test
public void userNameMaxLength() {
UserParam userParam = new UserParam("a123456789012345678901234567890");
assertNotNull(userParam.getValue());
}
@Test
public void userNameValidDollarSign() {
UserParam userParam = new UserParam("a$");
assertNotNull(userParam.getValue());
}
}

View File

@ -6264,8 +6264,8 @@
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^test: `/dir0/file': No such file or directory</expected-output>
<type>ExactComparator</type>
<expected-output></expected-output>
</comparator>
</comparators>
</test>
@ -6280,8 +6280,8 @@
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^test: `file': No such file or directory</expected-output>
<type>ExactComparator</type>
<expected-output></expected-output>
</comparator>
</comparators>
</test>
@ -6296,8 +6296,8 @@
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^test: `/dir': No such file or directory</expected-output>
<type>ExactComparator</type>
<expected-output></expected-output>
</comparator>
</comparators>
</test>
@ -6312,8 +6312,8 @@
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^test: `dir0': No such file or directory</expected-output>
<type>ExactComparator</type>
<expected-output></expected-output>
</comparator>
</comparators>
</test>
@ -6328,8 +6328,8 @@
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^test: `hdfs:///dir0/file': No such file or directory</expected-output>
<type>ExactComparator</type>
<expected-output></expected-output>
</comparator>
</comparators>
</test>
@ -6344,8 +6344,8 @@
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^test: `hdfs:///dir': No such file or directory</expected-output>
<type>ExactComparator</type>
<expected-output></expected-output>
</comparator>
</comparators>
</test>
@ -6360,8 +6360,8 @@
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^test: `hdfs://\w+[-.a-z0-9]*:[0-9]+/dir0/file': No such file or directory</expected-output>
<type>ExactComparator</type>
<expected-output></expected-output>
</comparator>
</comparators>
</test>
@ -6376,8 +6376,8 @@
</cleanup-commands>
<comparators>
<comparator>
<type>RegexpComparator</type>
<expected-output>^test: `hdfs://\w+[-.a-z0-9]*:[0-9]+/dir': No such file or directory</expected-output>
<type>ExactComparator</type>
<expected-output></expected-output>
</comparator>
</comparators>
</test>

View File

@ -166,6 +166,8 @@ Release 2.0.3-alpha - Unreleased
HADOOP-8911. CRLF characters in source and text files.
(Raja Aluri via suresh)
MAPREDUCE-4723. Fix warnings found by findbugs 2. (Sandy Ryza via eli)
OPTIMIZATIONS
BUG FIXES
@ -197,6 +199,9 @@ Release 2.0.3-alpha - Unreleased
MAPREDUCE-4777. In TestIFile, testIFileReaderWithCodec relies on
testIFileWriterWithCodec. (Sandy Ryza via tomwhite)
MAPREDUCE-4800. Cleanup o.a.h.mapred.MapTaskStatus - remove unused
code. (kkambatl via tucu)
Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES
@ -573,6 +578,21 @@ Release 2.0.0-alpha - 05-23-2012
MAPREDUCE-4444. nodemanager fails to start when one of the local-dirs is
bad (Jason Lowe via bobby)
Release 0.23.6 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
BUG FIXES
MAPREDUCE-4802. Takes a long time to load the task list on the AM for
large jobs (Ravi Prakash via bobby)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES
@ -593,6 +613,9 @@ Release 0.23.5 - UNRELEASED
OPTIMIZATIONS
MAPREDUCE-4720. Browser thinks History Server main page JS is taking too
long (Ravi Prakash via bobby)
BUG FIXES
MAPREDUCE-4554. Job Credentials are not transmitted if security is turned
@ -660,8 +683,17 @@ Release 0.23.5 - UNRELEASED
MAPREDUCE-4786. Job End Notification retry interval is 5 milliseconds by
default (Ravi Prakash via bobby)
MAPREDUCE-4517. Too many INFO messages written out during AM to RM heartbeat
(Jason Lowe via tgraves)
MAPREDUCE-4797. LocalContainerAllocator can loop forever trying to contact
the RM (jlowe via bobby)
MAPREDUCE-4801. ShuffleHandler can generate large logs due to prematurely
closed channels (jlowe via bobby)
Release 0.23.4 - UNRELEASED
Release 0.23.4
INCOMPATIBLE CHANGES

View File

@ -479,4 +479,28 @@
<Field name="sslFileBufferSize" />
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapreduce.util.ProcessTree" />
<Method name="sendSignal" />
<Bug pattern="NP_GUARANTEED_DEREF_ON_EXCEPTION_PATH" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapreduce.util.ProcessTree" />
<Method name="isSetsidSupported" />
<Bug pattern="NP_GUARANTEED_DEREF_ON_EXCEPTION_PATH" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapreduce.util.ProcessTree" />
<Method name="isSetsidSupported" />
<Bug pattern="NP_NULL_ON_SOME_PATH_EXCEPTION" />
</Match>
<Match>
<Class name="org.apache.hadoop.mapreduce.v2.hs.CachedHistoryStorage$1" />
<Bug pattern="SE_BAD_FIELD_INNER_CLASS" />
</Match>
</FindBugsFilter>

View File

@ -359,9 +359,8 @@ public class LocalContainerLauncher extends AbstractService implements
+ StringUtils.stringifyException(e));
}
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos));
umbilical.reportDiagnosticInfo(classicAttemptID, baos.toString());
umbilical.reportDiagnosticInfo(classicAttemptID,
StringUtils.stringifyException(exception));
throw new RuntimeException();
} catch (Throwable throwable) {

View File

@ -315,8 +315,6 @@ public class TaskAttemptListenerImpl extends CompositeService
+ taskStatus.getProgress());
// Task sends the updated state-string to the TT.
taskAttemptStatus.stateString = taskStatus.getStateString();
// Set the output-size when map-task finishes. Set by the task itself.
taskAttemptStatus.outputSize = taskStatus.getOutputSize();
// Task sends the updated phase to the TT.
taskAttemptStatus.phase = TypeConverter.toYarn(taskStatus.getPhase());
// Counters are updated by the task. Convert counters into new format as

View File

@ -184,10 +184,8 @@ class YarnChild {
LOG.info("Exception cleaning up: " + StringUtils.stringifyException(e));
}
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
exception.printStackTrace(new PrintStream(baos));
if (taskid != null) {
umbilical.fatalError(taskid, baos.toString());
umbilical.fatalError(taskid, StringUtils.stringifyException(exception));
}
} catch (Throwable throwable) {
LOG.fatal("Error running child : "

View File

@ -600,6 +600,8 @@ public class JobHistoryEventHandler extends AbstractService
summary.setJobFinishTime(juce.getFinishTime());
setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
break;
default:
throw new YarnException("Invalid event type");
}
}

View File

@ -49,7 +49,6 @@ public class TaskAttemptStatusUpdateEvent extends TaskAttemptEvent {
public Counters counters;
public String stateString;
public Phase phase;
public long outputSize;
public List<TaskAttemptId> fetchFailedMaps;
public long mapFinishTime;
public long shuffleFinishTime;

View File

@ -833,6 +833,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
break;
case SUCCEEDED:
metrics.completedJob(this);
break;
default:
throw new IllegalArgumentException("Illegal job state: " + finalState);
}
return finalState;
}
@ -1311,6 +1314,9 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
case REDUCE:
this.finalReduceCounters.incrAllCounters(counters);
break;
default:
throw new IllegalStateException("Task type neither map nor reduce: " +
t.getType());
}
this.fullCounters.incrAllCounters(counters);
}

View File

@ -1335,6 +1335,8 @@ public abstract class TaskAttemptImpl implements
taskAttempt.attemptId,
TaskEventType.T_ATTEMPT_KILLED));
break;
default:
LOG.error("Task final state is not FAILED or KILLED: " + finalState);
}
if (taskAttempt.getLaunchTime() != 0) {
TaskAttemptUnsuccessfulCompletionEvent tauce =

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.mapreduce.v2.app.local;
import java.util.ArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -62,7 +61,6 @@ public class LocalContainerAllocator extends RMCommunicator
@SuppressWarnings("rawtypes")
private final EventHandler eventHandler;
private AtomicInteger containerCount = new AtomicInteger();
private long retryInterval;
private long retrystartTime;
private String nmHost;
@ -102,9 +100,9 @@ public class LocalContainerAllocator extends RMCommunicator
this.applicationAttemptId, this.lastResponseID, super
.getApplicationProgress(), new ArrayList<ResourceRequest>(),
new ArrayList<ContainerId>());
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
AMResponse response;
try {
AllocateResponse allocateResponse = scheduler.allocate(allocateRequest);
response = allocateResponse.getAMResponse();
// Reset retry count if no exception occurred.
retrystartTime = System.currentTimeMillis();

View File

@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.util.RackResolver;
@ -145,6 +146,8 @@ public class RMContainerAllocator extends RMContainerRequestor
BlockingQueue<ContainerAllocatorEvent> eventQueue
= new LinkedBlockingQueue<ContainerAllocatorEvent>();
private ScheduleStats scheduleStats = new ScheduleStats();
public RMContainerAllocator(ClientService clientService, AppContext context) {
super(clientService, context);
this.stopped = new AtomicBoolean(false);
@ -208,13 +211,10 @@ public class RMContainerAllocator extends RMContainerRequestor
@Override
protected synchronized void heartbeat() throws Exception {
LOG.info("Before Scheduling: " + getStat());
scheduleStats.updateAndLogIfChanged("Before Scheduling: ");
List<Container> allocatedContainers = getResources();
LOG.info("After Scheduling: " + getStat());
if (allocatedContainers.size() > 0) {
LOG.info("Before Assign: " + getStat());
scheduledRequests.assign(allocatedContainers);
LOG.info("After Assign: " + getStat());
}
int completedMaps = getJob().getCompletedMaps();
@ -235,6 +235,8 @@ public class RMContainerAllocator extends RMContainerRequestor
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
}
scheduleStats.updateAndLogIfChanged("After Scheduling: ");
}
@Override
@ -245,7 +247,7 @@ public class RMContainerAllocator extends RMContainerRequestor
}
eventHandlingThread.interrupt();
super.stop();
LOG.info("Final Stats: " + getStat());
scheduleStats.log("Final Stats: ");
}
public boolean getIsReduceStarted() {
@ -427,7 +429,9 @@ public class RMContainerAllocator extends RMContainerRequestor
return;
}
LOG.info("Recalculating schedule...");
int headRoom = getAvailableResources() != null ?
getAvailableResources().getMemory() : 0;
LOG.info("Recalculating schedule, headroom=" + headRoom);
//check for slow start
if (!getIsReduceStarted()) {//not set yet
@ -536,24 +540,6 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
/**
* Synchronized to avoid findbugs warnings
*/
private synchronized String getStat() {
return "PendingReduces:" + pendingReduces.size() +
" ScheduledMaps:" + scheduledRequests.maps.size() +
" ScheduledReduces:" + scheduledRequests.reduces.size() +
" AssignedMaps:" + assignedRequests.maps.size() +
" AssignedReduces:" + assignedRequests.reduces.size() +
" completedMaps:" + getJob().getCompletedMaps() +
" completedReduces:" + getJob().getCompletedReduces() +
" containersAllocated:" + containersAllocated +
" containersReleased:" + containersReleased +
" hostLocalAssigned:" + hostLocalAssigned +
" rackLocalAssigned:" + rackLocalAssigned +
" availableResources(headroom):" + getAvailableResources();
}
@SuppressWarnings("unchecked")
private List<Container> getResources() throws Exception {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;//first time it would be null
@ -595,6 +581,9 @@ public class RMContainerAllocator extends RMContainerRequestor
if (newContainers.size() + finishedContainers.size() > 0 || headRoom != newHeadRoom) {
//something changed
recalculateReduceSchedule = true;
if (LOG.isDebugEnabled() && headRoom != newHeadRoom) {
LOG.debug("headroom=" + newHeadRoom);
}
}
if (LOG.isDebugEnabled()) {
@ -1123,4 +1112,60 @@ public class RMContainerAllocator extends RMContainerRequestor
}
}
}
private class ScheduleStats {
int numPendingReduces;
int numScheduledMaps;
int numScheduledReduces;
int numAssignedMaps;
int numAssignedReduces;
int numCompletedMaps;
int numCompletedReduces;
int numContainersAllocated;
int numContainersReleased;
public void updateAndLogIfChanged(String msgPrefix) {
boolean changed = false;
// synchronized to fix findbug warnings
synchronized (RMContainerAllocator.this) {
changed |= (numPendingReduces != pendingReduces.size());
numPendingReduces = pendingReduces.size();
changed |= (numScheduledMaps != scheduledRequests.maps.size());
numScheduledMaps = scheduledRequests.maps.size();
changed |= (numScheduledReduces != scheduledRequests.reduces.size());
numScheduledReduces = scheduledRequests.reduces.size();
changed |= (numAssignedMaps != assignedRequests.maps.size());
numAssignedMaps = assignedRequests.maps.size();
changed |= (numAssignedReduces != assignedRequests.reduces.size());
numAssignedReduces = assignedRequests.reduces.size();
changed |= (numCompletedMaps != getJob().getCompletedMaps());
numCompletedMaps = getJob().getCompletedMaps();
changed |= (numCompletedReduces != getJob().getCompletedReduces());
numCompletedReduces = getJob().getCompletedReduces();
changed |= (numContainersAllocated != containersAllocated);
numContainersAllocated = containersAllocated;
changed |= (numContainersReleased != containersReleased);
numContainersReleased = containersReleased;
}
if (changed) {
log(msgPrefix);
}
}
public void log(String msgPrefix) {
LOG.info(msgPrefix + "PendingReds:" + numPendingReduces +
" ScheduledMaps:" + numScheduledMaps +
" ScheduledReds:" + numScheduledReduces +
" AssignedMaps:" + numAssignedMaps +
" AssignedReds:" + numAssignedReduces +
" CompletedMaps:" + numCompletedMaps +
" CompletedReds:" + numCompletedReduces +
" ContAlloc:" + numContainersAllocated +
" ContRel:" + numContainersReleased +
" HostLocal:" + hostLocalAssigned +
" RackLocal:" + rackLocalAssigned);
}
}
}

View File

@ -210,7 +210,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
return; //already blacklisted
}
Integer failures = nodeFailures.remove(hostName);
failures = failures == null ? 0 : failures;
failures = failures == null ? Integer.valueOf(0) : failures;
failures++;
LOG.info(failures + " failures on node " + hostName);
if (failures >= maxTaskFailuresPerNode) {

View File

@ -43,7 +43,6 @@ abstract class StartEndTimesBase implements TaskRuntimeEstimator {
static final int MINIMUM_COMPLETE_NUMBER_TO_SPECULATE
= 1;
protected Configuration conf = null;
protected AppContext context = null;
protected final Map<TaskAttemptId, Long> startTimes
@ -82,7 +81,6 @@ abstract class StartEndTimesBase implements TaskRuntimeEstimator {
@Override
public void contextualize(Configuration conf, AppContext context) {
this.conf = conf;
this.context = context;
Map<JobId, Job> allJobs = context.getAllJobs();

View File

@ -35,7 +35,6 @@ public class AppView extends TwoColumnLayout {
protected void commonPreHead(Page.HTML<_> html) {
set(ACCORDION_ID, "nav");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:1}");
set(THEMESWITCHER_ID, "themeswitcher");
}
@Override

View File

@ -83,7 +83,6 @@ public class NavBlock extends HtmlBlock {
li().a("/conf", "Configuration")._().
li().a("/logs", "Local logs")._().
li().a("/stacks", "Server stacks")._().
li().a("/metrics", "Server metrics")._()._()._().
div("#themeswitcher")._();
li().a("/metrics", "Server metrics")._()._()._();
}
}

View File

@ -21,15 +21,13 @@ package org.apache.hadoop.mapreduce.v2.app.webapp;
import static org.apache.hadoop.mapreduce.v2.app.webapp.AMParams.TASK_TYPE;
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.util.StringHelper.percent;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR;
import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE;
import org.apache.hadoop.mapreduce.v2.api.records.TaskType;
import org.apache.hadoop.mapreduce.v2.app.job.Task;
import org.apache.hadoop.mapreduce.v2.app.webapp.dao.TaskInfo;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.util.Times;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TABLE;
import org.apache.hadoop.yarn.webapp.hamlet.Hamlet.TBODY;
@ -66,6 +64,8 @@ public class TasksBlock extends HtmlBlock {
th("Finish Time").
th("Elapsed Time")._()._().
tbody();
StringBuilder tasksTableData = new StringBuilder("[\n");
for (Task task : app.getJob().getTasks().values()) {
if (type != null && task.getType() != type) {
continue;
@ -73,31 +73,28 @@ public class TasksBlock extends HtmlBlock {
TaskInfo info = new TaskInfo(task);
String tid = info.getId();
String pct = percent(info.getProgress() / 100);
long startTime = info.getStartTime();
long finishTime = info.getFinishTime();
long elapsed = info.getElapsedTime();
tbody.
tr().
td().
br().$title(String.valueOf(info.getTaskNum()))._(). // sorting
a(url("task", tid), tid)._().
td().
br().$title(pct)._().
div(_PROGRESSBAR).
$title(join(pct, '%')). // tooltip
div(_PROGRESSBAR_VALUE).
$style(join("width:", pct, '%'))._()._()._().
td(info.getState()).
td().
br().$title(String.valueOf(startTime))._().
_(Times.format(startTime))._().
td().
br().$title(String.valueOf(finishTime))._().
_(Times.format(finishTime))._().
td().
br().$title(String.valueOf(elapsed))._().
_(StringUtils.formatTime(elapsed))._()._();
tasksTableData.append("[\"<a href='").append(url("task", tid))
.append("'>").append(tid).append("</a>\",\"")
//Progress bar
.append("<br title='").append(pct)
.append("'> <div class='").append(C_PROGRESSBAR).append("' title='")
.append(join(pct, '%')).append("'> ").append("<div class='")
.append(C_PROGRESSBAR_VALUE).append("' style='")
.append(join("width:", pct, '%')).append("'> </div> </div>\",\"")
.append(info.getState()).append("\",\"")
.append(info.getStartTime()).append("\",\"")
.append(info.getFinishTime()).append("\",\"")
.append(info.getElapsedTime()).append("\"],\n");
}
//Remove the last comma and close off the array of arrays
if(tasksTableData.charAt(tasksTableData.length() - 2) == ',') {
tasksTableData.delete(tasksTableData.length()-2, tasksTableData.length()-1);
}
tasksTableData.append("]");
html.script().$type("text/javascript").
_("var tasksTableData=" + tasksTableData)._();
tbody._()._();
}
}

View File

@ -37,11 +37,26 @@ public class TasksPage extends AppView {
}
private String tasksTableInit() {
return tableInit().
// Sort by id upon page load
append(", aaSorting: [[0, 'asc']]").
append(",aoColumns:[{sType:'title-numeric'},{sType:'title-numeric',").
append("bSearchable:false},null,{sType:'title-numeric'},").
append("{sType:'title-numeric'},{sType:'title-numeric'}]}").toString();
return tableInit()
.append(", 'aaData': tasksTableData")
.append(", bDeferRender: true")
.append(", bProcessing: true")
.append("\n, aoColumnDefs: [\n")
.append("{'sType':'numeric', 'aTargets': [0]")
.append(", 'mRender': parseHadoopID }")
.append("\n, {'sType':'numeric', bSearchable:false, 'aTargets': [1]")
.append(", 'mRender': parseHadoopProgress }")
.append("\n, {'sType':'numeric', 'aTargets': [3, 4]")
.append(", 'mRender': renderHadoopDate }")
.append("\n, {'sType':'numeric', 'aTargets': [5]")
.append(", 'mRender': renderHadoopElapsedTime }]")
// Sort by id upon page load
.append(", aaSorting: [[0, 'asc']] }").toString();
}
}

View File

@ -285,6 +285,8 @@ public class JobInfo {
case SCHEDULED:
++this.mapsPending;
break;
default:
break;
}
break;
case REDUCE:
@ -296,8 +298,13 @@ public class JobInfo {
case SCHEDULED:
++this.reducesPending;
break;
default:
break;
}
break;
default:
throw new IllegalStateException(
"Task type is neither map nor reduce: " + task.getType());
}
// Attempts counts
Map<TaskAttemptId, TaskAttempt> attempts = task.getAttempts();
@ -337,6 +344,9 @@ public class JobInfo {
this.failedReduceAttempts += failed;
this.killedReduceAttempts += killed;
break;
default:
throw new IllegalStateException("Task type neither map nor reduce: " +
task.getType());
}
}
}

View File

@ -412,7 +412,6 @@ public class TestFetchFailure {
status.fetchFailedMaps = new ArrayList<TaskAttemptId>();
status.id = attempt.getID();
status.mapFinishTime = 0;
status.outputSize = 0;
status.phase = phase;
status.progress = 0.5f;
status.shuffleFinishTime = 0;

View File

@ -86,7 +86,6 @@ public class TestMRClientService {
taskAttemptStatus.stateString = "RUNNING";
taskAttemptStatus.taskState = TaskAttemptState.RUNNING;
taskAttemptStatus.phase = Phase.MAP;
taskAttemptStatus.outputSize = 3;
// send the status update
app.getContext().getEventHandler().handle(
new TaskAttemptStatusUpdateEvent(attempt.getID(), taskAttemptStatus));

View File

@ -0,0 +1,108 @@
package org.apache.hadoop.mapreduce.v2.app.local;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.AppContext;
import org.apache.hadoop.mapreduce.v2.app.client.ClientService;
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.AMRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRemoteException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.Assert;
import org.junit.Test;
public class TestLocalContainerAllocator {
@Test
public void testRMConnectionRetry() throws Exception {
// verify the connection exception is thrown
// if we haven't exhausted the retry interval
Configuration conf = new Configuration();
LocalContainerAllocator lca = new StubbedLocalContainerAllocator();
lca.init(conf);
lca.start();
try {
lca.heartbeat();
Assert.fail("heartbeat was supposed to throw");
} catch (YarnRemoteException e) {
// YarnRemoteException is expected
} finally {
lca.stop();
}
// verify YarnException is thrown when the retry interval has expired
conf.setLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, 0);
lca = new StubbedLocalContainerAllocator();
lca.init(conf);
lca.start();
try {
lca.heartbeat();
Assert.fail("heartbeat was supposed to throw");
} catch (YarnException e) {
// YarnException is expected
} finally {
lca.stop();
}
}
private static class StubbedLocalContainerAllocator
extends LocalContainerAllocator {
public StubbedLocalContainerAllocator() {
super(mock(ClientService.class), createAppContext(),
"nmhost", 1, 2, null);
}
@Override
protected void register() {
}
@Override
protected void startAllocatorThread() {
allocatorThread = new Thread();
}
@Override
protected AMRMProtocol createSchedulerProxy() {
AMRMProtocol scheduler = mock(AMRMProtocol.class);
try {
when(scheduler.allocate(isA(AllocateRequest.class)))
.thenThrow(RPCUtil.getRemoteException(new IOException("forcefail")));
} catch (YarnRemoteException e) {
}
return scheduler;
}
private static AppContext createAppContext() {
ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
ApplicationAttemptId attemptId =
BuilderUtils.newApplicationAttemptId(appId, 1);
Job job = mock(Job.class);
@SuppressWarnings("rawtypes")
EventHandler eventHandler = mock(EventHandler.class);
AppContext ctx = mock(AppContext.class);
when(ctx.getApplicationID()).thenReturn(appId);
when(ctx.getApplicationAttemptId()).thenReturn(attemptId);
when(ctx.getJob(isA(JobId.class))).thenReturn(job);
when(ctx.getClusterInfo()).thenReturn(
new ClusterInfo(BuilderUtils.newResource(1024), BuilderUtils
.newResource(10240)));
when(ctx.getEventHandler()).thenReturn(eventHandler);
return ctx;
}
}
}

View File

@ -59,6 +59,8 @@ import org.apache.hadoop.yarn.util.Apps;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import com.google.common.base.Charsets;
/**
* Helper class for MR applications
*/
@ -159,7 +161,8 @@ public class MRApps extends Apps {
}
if (classpathFileStream != null) {
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
reader = new BufferedReader(new InputStreamReader(classpathFileStream,
Charsets.UTF_8));
String cp = reader.readLine();
if (cp != null) {
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),

View File

@ -420,6 +420,8 @@ public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {
}
break;
}
default:
continue; // nothing special to do for this character
}
}
pathStrings.add(commaSeparatedPaths.substring(pathStart, length));

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.io.Writer;
import java.util.List;
@ -30,6 +31,8 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Charsets;
/**
* <code>JobQueueClient</code> is interface provided to the user in order to get
* JobQueue related information from the {@link JobTracker}
@ -144,7 +147,8 @@ class JobQueueClient extends Configured implements Tool {
private void displayQueueList() throws IOException {
JobQueueInfo[] rootQueues = jc.getRootQueues();
for (JobQueueInfo queue : rootQueues) {
printJobQueueInfo(queue, new PrintWriter(System.out));
printJobQueueInfo(queue, new PrintWriter(new OutputStreamWriter(
System.out, Charsets.UTF_8)));
}
}
@ -182,7 +186,8 @@ class JobQueueClient extends Configured implements Tool {
System.out.println("Queue \"" + queue + "\" does not exist.");
return;
}
printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
printJobQueueInfo(jobQueueInfo, new PrintWriter(new OutputStreamWriter(
System.out, Charsets.UTF_8)));
if (showJobs && (jobQueueInfo.getChildren() == null ||
jobQueueInfo.getChildren().size() == 0)) {
JobStatus[] jobs = jobQueueInfo.getJobStatuses();
@ -223,10 +228,10 @@ class JobQueueClient extends Configured implements Tool {
if ("-queueinfo".equals(cmd)) {
System.err.println(prefix + "[" + cmd + "<job-queue-name> [-showJobs]]");
} else {
System.err.printf(prefix + "<command> <args>\n");
System.err.printf("\t[-list]\n");
System.err.printf("\t[-info <job-queue-name> [-showJobs]]\n");
System.err.printf("\t[-showacls] \n\n");
System.err.printf(prefix + "<command> <args>%n");
System.err.printf("\t[-list]%n");
System.err.printf("\t[-info <job-queue-name> [-showJobs]]%n");
System.err.printf("\t[-showacls] %n%n");
ToolRunner.printGenericCommandUsage(System.out);
}
}

View File

@ -25,8 +25,7 @@ import java.io.IOException;
class MapTaskStatus extends TaskStatus {
private long mapFinishTime;
private long sortFinishTime;
private long mapFinishTime = 0;
public MapTaskStatus() {}
@ -49,10 +48,10 @@ class MapTaskStatus extends TaskStatus {
@Override
void setFinishTime(long finishTime) {
super.setFinishTime(finishTime);
if (mapFinishTime == 0) {
mapFinishTime = finishTime;
// set mapFinishTime if it hasn't been set before
if (getMapFinishTime() == 0) {
setMapFinishTime(finishTime);
}
setSortFinishTime(finishTime);
}
@Override
@ -74,16 +73,6 @@ class MapTaskStatus extends TaskStatus {
void setMapFinishTime(long mapFinishTime) {
this.mapFinishTime = mapFinishTime;
}
@Override
public long getSortFinishTime() {
return sortFinishTime;
}
@Override
void setSortFinishTime(long sortFinishTime) {
this.sortFinishTime = sortFinishTime;
}
@Override
synchronized void statusUpdate(TaskStatus status) {

View File

@ -49,6 +49,8 @@ import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import com.google.common.base.Charsets;
/**
* A simple logger to handle the task-specific user logs.
* This class uses the system property <code>hadoop.log.dir</code>.
@ -104,7 +106,8 @@ public class TaskLog {
throws IOException {
File indexFile = getIndexFile(taskid, isCleanup);
BufferedReader fis = new BufferedReader(new InputStreamReader(
SecureIOUtils.openForRead(indexFile, obtainLogDirOwner(taskid), null)));
SecureIOUtils.openForRead(indexFile, obtainLogDirOwner(taskid), null),
Charsets.UTF_8));
//the format of the index file is
//LOG_DIR: <the dir where the task logs are really stored>
//stdout:<start-offset in the stdout file> <length>

View File

@ -27,6 +27,8 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.*;
import com.google.common.base.Charsets;
/**
* An {@link InputFormat} for plain text files. Files are broken into lines.
* Either linefeed or carriage-return are used to signal end of line. Keys are
@ -59,7 +61,9 @@ public class TextInputFormat extends FileInputFormat<LongWritable, Text>
reporter.setStatus(genericSplit.toString());
String delimiter = job.get("textinputformat.record.delimiter");
byte[] recordDelimiterBytes = null;
if (null != delimiter) recordDelimiterBytes = delimiter.getBytes();
if (null != delimiter) {
recordDelimiterBytes = delimiter.getBytes(Charsets.UTF_8);
}
return new LineRecordReader(job, (FileSplit) genericSplit,
recordDelimiterBytes);
}

View File

@ -49,9 +49,7 @@ public class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
protected CombineFileSplit split;
protected JobConf jc;
protected Reporter reporter;
protected Class<RecordReader<K, V>> rrClass;
protected Constructor<RecordReader<K, V>> rrConstructor;
protected FileSystem fs;
protected int idx;
protected long progress;
@ -106,7 +104,6 @@ public class CombineFileRecordReader<K, V> implements RecordReader<K, V> {
throws IOException {
this.split = split;
this.jc = job;
this.rrClass = rrClass;
this.reporter = reporter;
this.idx = 0;
this.curReader = null;

View File

@ -56,6 +56,8 @@ import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import com.google.common.base.Charsets;
@InterfaceAudience.Private
@InterfaceStability.Unstable
class JobSubmitter {
@ -550,7 +552,7 @@ class JobSubmitter {
for(Map.Entry<String, String> ent: nm.entrySet()) {
credentials.addSecretKey(new Text(ent.getKey()), ent.getValue()
.getBytes());
.getBytes(Charsets.UTF_8));
}
} catch (JsonMappingException e) {
json_error = true;

View File

@ -188,7 +188,7 @@ public class HistoryViewer {
decimal.format(counter.getValue());
buff.append(
String.format("\n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
String.format("%n|%1$-30s|%2$-30s|%3$-10s|%4$-10s|%5$-10s",
totalGroup.getDisplayName(),
counter.getDisplayName(),
mapValue, reduceValue, totalValue));

View File

@ -30,6 +30,8 @@ import java.sql.Statement;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.InputFormat;
@ -58,6 +60,8 @@ import org.apache.hadoop.conf.Configuration;
public class DBInputFormat<T extends DBWritable>
extends InputFormat<LongWritable, T> implements Configurable {
private static final Log LOG = LogFactory.getLog(DBInputFormat.class);
private String dbProductName = "DEFAULT";
/**
@ -354,6 +358,8 @@ public class DBInputFormat<T extends DBWritable>
this.connection.close();
this.connection = null;
}
} catch (SQLException sqlE) { } // ignore exception on close.
} catch (SQLException sqlE) {
LOG.debug("Exception on close", sqlE);
}
}
}

View File

@ -219,7 +219,6 @@ public abstract class CombineFileInputFormat<K, V>
Path p = fs.makeQualified(paths[i]);
newpaths.add(p);
}
paths = null;
// In one single iteration, process all the paths in a single pool.
// Processing one pool at a time ensures that a split contains paths

View File

@ -46,9 +46,7 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
Integer.class};
protected CombineFileSplit split;
protected Class<? extends RecordReader<K,V>> rrClass;
protected Constructor<? extends RecordReader<K,V>> rrConstructor;
protected FileSystem fs;
protected TaskAttemptContext context;
protected int idx;
@ -111,7 +109,6 @@ public class CombineFileRecordReader<K, V> extends RecordReader<K, V> {
throws IOException {
this.split = split;
this.context = context;
this.rrClass = rrClass;
this.idx = 0;
this.curReader = null;
this.progress = 0;

View File

@ -425,6 +425,8 @@ public abstract class FileInputFormat<K, V> extends InputFormat<K, V> {
}
break;
}
default:
continue; // nothing special to do for this character
}
}
pathStrings.add(commaSeparatedPaths.substring(pathStart, length));

Some files were not shown because too many files have changed in this diff Show More