svn merge -c 1354316 from trunk for HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1354317 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-06-27 02:53:20 +00:00
parent 4bb61ecbd7
commit 078670046d
6 changed files with 198 additions and 12 deletions

View File

@ -243,6 +243,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3428. Move DelegationTokenRenewer to common (tucu)
HDFS-3551. WebHDFS CREATE should use client location for HTTP redirection.
(szetszwo)
Release 2.0.0-alpha - 05-23-2012
INCOMPATIBLE CHANGES

View File

@ -1035,4 +1035,8 @@ public void clearPendingQueues() {
}
}
@Override
public String toString() {
return getClass().getSimpleName() + ": " + host2DatanodeMap;
}
}

View File

@ -17,7 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -156,4 +158,14 @@ DatanodeDescriptor getDatanodeByHost(String ipAddr) {
hostmapLock.readLock().unlock();
}
}
@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName())
.append("[");
for(Map.Entry<String, DatanodeDescriptor[]> e : map.entrySet()) {
b.append("\n " + e.getKey() + " => " + Arrays.asList(e.getValue()));
}
return b.append("\n]").toString();
}
}

View File

@ -56,6 +56,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -115,6 +116,11 @@ public static String getRemoteAddress() {
return REMOTE_ADDRESS.get();
}
/** Set the remote client address. */
static void setRemoteAddress(String remoteAddress) {
REMOTE_ADDRESS.set(remoteAddress);
}
private @Context ServletContext context;
private @Context HttpServletRequest request;
private @Context HttpServletResponse response;
@ -134,12 +140,26 @@ private void init(final UserGroupInformation ugi,
response.setContentType(null);
}
private static DatanodeInfo chooseDatanode(final NameNode namenode,
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
Configuration conf) throws IOException {
if (op == GetOpParam.Op.OPEN
final long blocksize, Configuration conf) throws IOException {
final BlockManager bm = namenode.getNamesystem().getBlockManager();
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
final DatanodeDescriptor clientNode = bm.getDatanodeManager(
).getDatanodeByHost(getRemoteAddress());
if (clientNode != null) {
final DatanodeDescriptor[] datanodes = bm.getBlockPlacementPolicy(
).chooseTarget(path, 1, clientNode, null, blocksize);
if (datanodes.length > 0) {
return datanodes[0];
}
}
} else if (op == GetOpParam.Op.OPEN
|| op == GetOpParam.Op.GETFILECHECKSUM
|| op == PostOpParam.Op.APPEND) {
//choose a datanode containing a replica
final NamenodeProtocols np = namenode.getRpcServer();
final HdfsFileStatus status = np.getFileInfo(path);
if (status == null) {
@ -158,14 +178,13 @@ private static DatanodeInfo chooseDatanode(final NameNode namenode,
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount();
if (count > 0) {
return JspHelper.bestNode(locations.get(0), conf);
return JspHelper.bestNode(locations.get(0).getLocations(), false, conf);
}
}
}
return (DatanodeDescriptor)namenode.getNamesystem().getBlockManager(
).getDatanodeManager().getNetworkTopology().chooseRandom(
NodeBase.ROOT);
return (DatanodeDescriptor)bm.getDatanodeManager().getNetworkTopology(
).chooseRandom(NodeBase.ROOT);
}
private Token<? extends TokenIdentifier> generateDelegationToken(
@ -183,9 +202,11 @@ private URI redirectURI(final NameNode namenode,
final UserGroupInformation ugi, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final String path, final HttpOpParam.Op op, final long openOffset,
final long blocksize,
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset, conf);
final DatanodeInfo dn = chooseDatanode(namenode, path, op, openOffset,
blocksize, conf);
final String delegationQuery;
if (!UserGroupInformation.isSecurityEnabled()) {
@ -356,7 +377,7 @@ private Response put(
case CREATE:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L,
fullpath, op.getValue(), -1L, blockSize.getValue(conf),
permission, overwrite, bufferSize, replication, blockSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
@ -502,7 +523,7 @@ private Response post(
case APPEND:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L, bufferSize);
fullpath, op.getValue(), -1L, -1L, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
default:
@ -598,7 +619,7 @@ private Response get(
case OPEN:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), offset.getValue(), offset, length, bufferSize);
fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GET_BLOCK_LOCATIONS:
@ -634,7 +655,7 @@ private Response get(
case GETFILECHECKSUM:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
fullpath, op.getValue(), -1L);
fullpath, op.getValue(), -1L, -1L);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETDELEGATIONTOKEN:

View File

@ -0,0 +1,140 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode.web.resources;
import java.util.Arrays;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.PostOpParam;
import org.apache.hadoop.hdfs.web.resources.PutOpParam;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test;
/**
* Test WebHDFS which provides data locality using HTTP redirection.
*/
public class TestWebHdfsDataLocality {
static final Log LOG = LogFactory.getLog(TestWebHdfsDataLocality.class);
{
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.OFF);
((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.OFF);
((Log4JLogger)LogFactory.getLog(FSNamesystem.class)).getLogger().setLevel(Level.OFF);
}
private static final String RACK0 = "/rack0";
private static final String RACK1 = "/rack1";
private static final String RACK2 = "/rack2";
@Test
public void testDataLocality() throws Exception {
final Configuration conf = WebHdfsTestUtil.createConf();
final String[] racks = {RACK0, RACK0, RACK1, RACK1, RACK2, RACK2};
final int nDataNodes = racks.length;
LOG.info("nDataNodes=" + nDataNodes + ", racks=" + Arrays.asList(racks));
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(nDataNodes)
.racks(racks)
.build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final NameNode namenode = cluster.getNameNode();
final DatanodeManager dm = namenode.getNamesystem().getBlockManager(
).getDatanodeManager();
LOG.info("dm=" + dm);
final long blocksize = DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
final String f = "/foo";
{ //test CREATE
for(int i = 0; i < nDataNodes; i++) {
//set client address to a particular datanode
final DataNode dn = cluster.getDataNodes().get(i);
final String ipAddr = dm.getDatanode(dn.getDatanodeId()).getIpAddr();
NamenodeWebHdfsMethods.setRemoteAddress(ipAddr);
//The chosen datanode must be the same as the client address
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PutOpParam.Op.CREATE, -1L, blocksize, conf);
Assert.assertEquals(ipAddr, chosen.getIpAddr());
}
}
//create a file with one replica.
final Path p = new Path(f);
final FSDataOutputStream out = dfs.create(p, (short)1);
out.write(1);
out.close();
//get replica location.
final LocatedBlocks locatedblocks = NameNodeAdapter.getBlockLocations(
namenode, f, 0, 1);
final List<LocatedBlock> lb = locatedblocks.getLocatedBlocks();
Assert.assertEquals(1, lb.size());
final DatanodeInfo[] locations = lb.get(0).getLocations();
Assert.assertEquals(1, locations.length);
final DatanodeInfo expected = locations[0];
//For GETFILECHECKSUM, OPEN and APPEND,
//the chosen datanode must be the same as the replica location.
{ //test GETFILECHECKSUM
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.GETFILECHECKSUM, -1L, blocksize, conf);
Assert.assertEquals(expected, chosen);
}
{ //test OPEN
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, GetOpParam.Op.OPEN, 0, blocksize, conf);
Assert.assertEquals(expected, chosen);
}
{ //test APPEND
final DatanodeInfo chosen = NamenodeWebHdfsMethods.chooseDatanode(
namenode, f, PostOpParam.Op.APPEND, -1L, blocksize, conf);
Assert.assertEquals(expected, chosen);
}
} finally {
cluster.shutdown();
}
}
}

View File

@ -40,6 +40,12 @@
public class WebHdfsTestUtil {
public static final Log LOG = LogFactory.getLog(WebHdfsTestUtil.class);
public static Configuration createConf() {
final Configuration conf = new Configuration();
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
return conf;
}
public static WebHdfsFileSystem getWebHdfsFileSystem(final Configuration conf
) throws IOException, URISyntaxException {
final String uri = WebHdfsFileSystem.SCHEME + "://"