Merging r1535122 through r1535532 from trunk to HDFS-2832

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1535534 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-10-24 20:33:04 +00:00
commit 1fcc24d56a
8 changed files with 165 additions and 48 deletions

View File

@ -369,6 +369,9 @@ Release 2.3.0 - UNRELEASED
HADOOP-9291. enhance unit-test coverage of package o.a.h.metrics2 (Ivan A. HADOOP-9291. enhance unit-test coverage of package o.a.h.metrics2 (Ivan A.
Veselovsky via jeagles) Veselovsky via jeagles)
HADOOP-10064. Upgrade to maven antrun plugin version 1.7 (Arpit Agarwal via
jeagles)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)

View File

@ -335,10 +335,16 @@ Release 2.3.0 - UNRELEASED
HDFS-4885. Improve the verifyBlockPlacement() API in BlockPlacementPolicy. HDFS-4885. Improve the verifyBlockPlacement() API in BlockPlacementPolicy.
(Junping Du via szetszwo) (Junping Du via szetszwo)
HDFS-5363. Refactor WebHdfsFileSystem: move SPENGO-authenticated connection
creation to URLConnectionFactory. (Haohui Mai via jing9)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
HDFS-5341. Reduce fsdataset lock duration during directory scanning.
(Qus-Jiawei via kihwal)
BUG FIXES BUG FIXES
HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin
Patrick McCabe) Patrick McCabe)

View File

@ -191,6 +191,11 @@ public class DirectoryScanner implements Runnable {
private final FsVolumeSpi volume; private final FsVolumeSpi volume;
/**
* Get the file's length in async block scan
*/
private final long blockFileLength;
private final static Pattern CONDENSED_PATH_REGEX = private final static Pattern CONDENSED_PATH_REGEX =
Pattern.compile("(?<!^)(\\\\|/){2,}"); Pattern.compile("(?<!^)(\\\\|/){2,}");
@ -235,6 +240,7 @@ public class DirectoryScanner implements Runnable {
getCondensedPath(vol.getBasePath()); getCondensedPath(vol.getBasePath());
this.blockSuffix = blockFile == null ? null : this.blockSuffix = blockFile == null ? null :
getSuffix(blockFile, condensedVolPath); getSuffix(blockFile, condensedVolPath);
this.blockFileLength = (blockFile != null) ? blockFile.length() : 0;
if (metaFile == null) { if (metaFile == null) {
this.metaSuffix = null; this.metaSuffix = null;
} else if (blockFile == null) { } else if (blockFile == null) {
@ -251,6 +257,10 @@ public class DirectoryScanner implements Runnable {
new File(volume.getBasePath(), blockSuffix); new File(volume.getBasePath(), blockSuffix);
} }
long getBlockFileLength() {
return blockFileLength;
}
File getMetaFile() { File getMetaFile() {
if (metaSuffix == null) { if (metaSuffix == null) {
return null; return null;
@ -458,7 +468,7 @@ public class DirectoryScanner implements Runnable {
// Block metadata file exits and block file is missing // Block metadata file exits and block file is missing
addDifference(diffRecord, statsRecord, info); addDifference(diffRecord, statsRecord, info);
} else if (info.getGenStamp() != memBlock.getGenerationStamp() } else if (info.getGenStamp() != memBlock.getGenerationStamp()
|| info.getBlockFile().length() != memBlock.getNumBytes()) { || info.getBlockFileLength() != memBlock.getNumBytes()) {
// Block metadata file is missing or has wrong generation stamp, // Block metadata file is missing or has wrong generation stamp,
// or block file length is different than expected // or block file length is different than expected
statsRecord.mismatchBlocks++; statsRecord.mismatchBlocks++;

View File

@ -19,11 +19,19 @@
package org.apache.hadoop.hdfs.web; package org.apache.hadoop.hdfs.web;
import java.io.IOException; import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.net.URLConnection; import java.net.URLConnection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
/** /**
* Utilities for handling URLs * Utilities for handling URLs
@ -31,37 +39,94 @@ import org.apache.hadoop.classification.InterfaceStability;
@InterfaceAudience.LimitedPrivate({ "HDFS" }) @InterfaceAudience.LimitedPrivate({ "HDFS" })
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class URLConnectionFactory { public class URLConnectionFactory {
private static final Log LOG = LogFactory.getLog(URLConnectionFactory.class);
/** SPNEGO authenticator */
private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
/** /**
* Timeout for socket connects and reads * Timeout for socket connects and reads
*/ */
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
public static final URLConnectionFactory DEFAULT_CONNECTION_FACTORY = new URLConnectionFactory(DEFAULT_SOCKET_TIMEOUT); public static final URLConnectionFactory DEFAULT_CONNECTION_FACTORY = new URLConnectionFactory(
DEFAULT_SOCKET_TIMEOUT);
private int socketTimeout; private int socketTimeout;
/** Configure connections for AuthenticatedURL */
private ConnectionConfigurator connConfigurator = new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
URLConnectionFactory.setTimeouts(conn, socketTimeout);
return conn;
}
};
public URLConnectionFactory(int socketTimeout) { public URLConnectionFactory(int socketTimeout) {
this.socketTimeout = socketTimeout; this.socketTimeout = socketTimeout;
} }
/** /**
* Opens a url with read and connect timeouts * Opens a url with read and connect timeouts
* @param url to open *
* @param url
* to open
* @return URLConnection * @return URLConnection
* @throws IOException * @throws IOException
*/ */
public URLConnection openConnection(URL url) throws IOException { public URLConnection openConnection(URL url) throws IOException {
URLConnection connection = url.openConnection(); URLConnection connection = url.openConnection();
setTimeouts(connection); if (connection instanceof HttpURLConnection) {
connConfigurator.configure((HttpURLConnection) connection);
}
return connection; return connection;
} }
/**
* Opens a url with read and connect timeouts
*
* @param url URL to open
* @return URLConnection
* @throws IOException
* @throws AuthenticationException
*/
public URLConnection openConnection(HttpOpParam.Op op, URL url)
throws IOException, AuthenticationException {
if (op.getRequireAuth()) {
if (LOG.isDebugEnabled()) {
LOG.debug("open AuthenticatedURL connection" + url);
}
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
return new AuthenticatedURL(AUTH, connConfigurator).openConnection(url,
authToken);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("open URL connection");
}
return openConnection(url);
}
}
public ConnectionConfigurator getConnConfigurator() {
return connConfigurator;
}
public void setConnConfigurator(ConnectionConfigurator connConfigurator) {
this.connConfigurator = connConfigurator;
}
/** /**
* Sets timeout parameters on the given URLConnection. * Sets timeout parameters on the given URLConnection.
* *
* @param connection URLConnection to set * @param connection
* URLConnection to set
* @param socketTimeout
* the connection and read timeout of the connection.
*/ */
public void setTimeouts(URLConnection connection) { static void setTimeouts(URLConnection connection, int socketTimeout) {
connection.setConnectTimeout(socketTimeout); connection.setConnectTimeout(socketTimeout);
connection.setReadTimeout(socketTimeout); connection.setReadTimeout(socketTimeout);
} }

View File

@ -94,9 +94,7 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenRenewer; import org.apache.hadoop.security.token.TokenRenewer;
@ -119,20 +117,9 @@ public class WebHdfsFileSystem extends FileSystem
/** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */ /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
public static final String PATH_PREFIX = "/" + SCHEME + "/v" + VERSION; public static final String PATH_PREFIX = "/" + SCHEME + "/v" + VERSION;
/** SPNEGO authenticator */
private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
/** Default connection factory may be overridden in tests to use smaller timeout values */ /** Default connection factory may be overridden in tests to use smaller timeout values */
URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY; URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
/** Configures connections for AuthenticatedURL */
private final ConnectionConfigurator CONN_CONFIGURATOR =
new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
connectionFactory.setTimeouts(conn);
return conn;
}
};
/** Delegation token kind */ /** Delegation token kind */
public static final Text TOKEN_KIND = new Text("WEBHDFS delegation"); public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
/** Token selector */ /** Token selector */
@ -504,16 +491,7 @@ public class WebHdfsFileSystem extends FileSystem
throws IOException { throws IOException {
final HttpURLConnection conn; final HttpURLConnection conn;
try { try {
if (op.getRequireAuth()) { conn = (HttpURLConnection) connectionFactory.openConnection(op, url);
LOG.debug("open AuthenticatedURL connection");
UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
conn = new AuthenticatedURL(AUTH, CONN_CONFIGURATOR).openConnection(
url, authToken);
} else {
LOG.debug("open URL connection");
conn = (HttpURLConnection)connectionFactory.openConnection(url);
}
} catch (AuthenticationException e) { } catch (AuthenticationException e) {
throw new IOException(e); throw new IOException(e);
} }
@ -635,8 +613,10 @@ public class WebHdfsFileSystem extends FileSystem
checkRetry = false; checkRetry = false;
//Step 2) Submit another Http request with the URL from the Location header with data. //Step 2) Submit another Http request with the URL from the Location header with data.
conn = (HttpURLConnection)connectionFactory.openConnection(new URL(redirect)); conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
conn.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM); redirect));
conn.setRequestProperty("Content-Type",
MediaType.APPLICATION_OCTET_STREAM);
conn.setChunkedStreamingMode(32 << 10); //32kB-chunk conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
connect(); connect();
return conn; return conn;
@ -658,7 +638,8 @@ public class WebHdfsFileSystem extends FileSystem
disconnect(); disconnect();
checkRetry = false; checkRetry = false;
conn = (HttpURLConnection)connectionFactory.openConnection(new URL(redirect)); conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
redirect));
connect(); connect();
} }
@ -892,12 +873,6 @@ public class WebHdfsFileSystem extends FileSystem
.write(bufferSize); .write(bufferSize);
} }
@SuppressWarnings("deprecation")
@Override
public boolean delete(final Path f) throws IOException {
return delete(f, true);
}
@Override @Override
public boolean delete(Path f, boolean recursive) throws IOException { public boolean delete(Path f, boolean recursive) throws IOException {
final HttpOpParam.Op op = DeleteOpParam.Op.DELETE; final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.web;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
import org.junit.Test;
import com.google.common.collect.Lists;
public final class TestURLConnectionFactory {
@Test
public void testConnConfiguratior() throws IOException {
final URL u = new URL("http://localhost");
final List<HttpURLConnection> conns = Lists.newArrayList();
URLConnectionFactory fc = new URLConnectionFactory(
URLConnectionFactory.DEFAULT_SOCKET_TIMEOUT);
fc.setConnConfigurator(new ConnectionConfigurator() {
@Override
public HttpURLConnection configure(HttpURLConnection conn)
throws IOException {
Assert.assertEquals(u, conn.getURL());
conns.add(conn);
return conn;
}
});
fc.openConnection(u);
Assert.assertEquals(1, conns.size());
}
}

View File

@ -25,9 +25,11 @@ import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.nio.channels.SocketChannel; import java.nio.channels.SocketChannel;
import java.util.ArrayList; import java.util.ArrayList;
@ -41,6 +43,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -71,8 +74,9 @@ public class TestWebHdfsTimeouts {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
Configuration conf = WebHdfsTestUtil.createConf(); Configuration conf = WebHdfsTestUtil.createConf();
nnHttpAddress = NameNode.getHttpAddress(conf); serverSocket = new ServerSocket(0, CONNECTION_BACKLOG);
serverSocket = new ServerSocket(nnHttpAddress.getPort(), CONNECTION_BACKLOG); nnHttpAddress = new InetSocketAddress("localhost", serverSocket.getLocalPort());
conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "localhost:" + serverSocket.getLocalPort());
fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf); fs = WebHdfsTestUtil.getWebHdfsFileSystem(conf);
fs.connectionFactory = connectionFactory; fs.connectionFactory = connectionFactory;
clients = new ArrayList<SocketChannel>(); clients = new ArrayList<SocketChannel>();

View File

@ -143,7 +143,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId> <artifactId>maven-antrun-plugin</artifactId>
<version>1.6</version> <version>1.7</version>
</plugin> </plugin>
<plugin> <plugin>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
@ -175,7 +175,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<pluginExecutionFilter> <pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId> <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId> <artifactId>maven-antrun-plugin</artifactId>
<versionRange>[1.6,)</versionRange> <versionRange>[1.7,)</versionRange>
<goals> <goals>
<goal>run</goal> <goal>run</goal>
</goals> </goals>