Merge r1239398 through r1240449 from 0.23.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23-PB@1240450 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-02-04 03:40:45 +00:00
commit 2f0489edab
227 changed files with 2902 additions and 1766 deletions

View File

@ -78,6 +78,9 @@ Release 0.23.1 - Unreleased
IMPROVEMENTS
HADOOP-8015. ChRootFileSystem should extend FilterFileSystem
(Daryn Sharp via bobby)
HADOOP-7801. HADOOP_PREFIX cannot be overriden. (Bruno Mahé via tomwhite)
HADOOP-7802. Hadoop scripts unconditionally source
@ -158,6 +161,14 @@ Release 0.23.1 - Unreleased
OPTIMIZATIONS
BUG FIXES
HADOOP-8013. ViewFileSystem does not honor setVerifyChecksum
(Daryn Sharp via bobby)
HADOOP-8018. Hudson auto test for HDFS has started throwing javadoc
(Jon Eagles via bobby)
HADOOP-8001 ChecksumFileSystem's rename doesn't correctly handle checksum
files. (Daryn Sharp via bobby)
HADOOP-8006 TestFSInputChecker is failing in trunk.
(Daryn Sharp via bobby)
@ -258,6 +269,9 @@ Release 0.23.1 - Unreleased
HADOOP-7999. "hadoop archive" fails with ClassNotFoundException.
(Jason Lowe via mahadev)
HADOOP-8012. hadoop-daemon.sh and yarn-daemon.sh are trying to mkdir
and chown log/pid dirs which can fail. (Roman Shaposhnik via eli)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -95,8 +95,11 @@ fi
if [ "$HADOOP_LOG_DIR" = "" ]; then
export HADOOP_LOG_DIR="$HADOOP_PREFIX/logs"
fi
mkdir -p "$HADOOP_LOG_DIR"
chown $HADOOP_IDENT_STRING $HADOOP_LOG_DIR
if [ ! -w "$HADOOP_LOG_DIR" ] ; then
mkdir -p "$HADOOP_LOG_DIR"
chown $HADOOP_IDENT_STRING $HADOOP_LOG_DIR
fi
if [ "$HADOOP_PID_DIR" = "" ]; then
HADOOP_PID_DIR=/tmp
@ -118,7 +121,7 @@ case $startStop in
(start)
mkdir -p "$HADOOP_PID_DIR"
[ -w "$HADOOP_PID_DIR" ] || mkdir -p "$HADOOP_PID_DIR"
if [ -f $pid ]; then
if kill -0 `cat $pid` > /dev/null 2>&1; then

View File

@ -474,18 +474,21 @@ public abstract class ChecksumFileSystem extends FilterFileSystem {
if (fs.isDirectory(src)) {
return fs.rename(src, dst);
} else {
if (fs.isDirectory(dst)) {
dst = new Path(dst, src.getName());
}
boolean value = fs.rename(src, dst);
if (!value)
return false;
Path checkFile = getChecksumFile(src);
if (fs.exists(checkFile)) { //try to rename checksum
if (fs.isDirectory(dst)) {
value = fs.rename(checkFile, dst);
} else {
value = fs.rename(checkFile, getChecksumFile(dst));
}
Path srcCheckFile = getChecksumFile(src);
Path dstCheckFile = getChecksumFile(dst);
if (fs.exists(srcCheckFile)) { //try to rename checksum
value = fs.rename(srcCheckFile, dstCheckFile);
} else if (fs.exists(dstCheckFile)) {
// no src checksum, so remove dst checksum
value = fs.delete(dstCheckFile, true);
}
return value;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.fs;
import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.EnumSet;
import java.util.List;
@ -51,6 +52,7 @@ import org.apache.hadoop.util.Progressable;
public class FilterFileSystem extends FileSystem {
protected FileSystem fs;
private String swapScheme;
/*
* so that extending classes can define it
@ -63,13 +65,25 @@ public class FilterFileSystem extends FileSystem {
this.statistics = fs.statistics;
}
/**
* Get the raw file system
* @return FileSystem being filtered
*/
public FileSystem getRawFileSystem() {
return fs;
}
/** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc.
* for this FileSystem
* @param conf the configuration
*/
public void initialize(URI name, Configuration conf) throws IOException {
fs.initialize(name, conf);
super.initialize(name, conf);
String scheme = name.getScheme();
if (!scheme.equals(fs.getUri().getScheme())) {
swapScheme = scheme;
}
}
/** Returns a URI whose scheme and authority identify this FileSystem.*/
@ -88,7 +102,19 @@ public class FilterFileSystem extends FileSystem {
/** Make sure that a path specifies a FileSystem. */
public Path makeQualified(Path path) {
return fs.makeQualified(path);
Path fqPath = fs.makeQualified(path);
// swap in our scheme if the filtered fs is using a different scheme
if (swapScheme != null) {
try {
// NOTE: should deal with authority, but too much other stuff is broken
fqPath = new Path(
new URI(swapScheme, fqPath.toUri().getSchemeSpecificPart(), null)
);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
}
return fqPath;
}
///////////////////////////////////////////////////////////////

View File

@ -24,6 +24,7 @@ import java.util.*;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
/****************************************************************
* Implement the FileSystem API for the checksumed local filesystem.
@ -34,21 +35,26 @@ import org.apache.hadoop.classification.InterfaceStability;
public class LocalFileSystem extends ChecksumFileSystem {
static final URI NAME = URI.create("file:///");
static private Random rand = new Random();
FileSystem rfs;
public LocalFileSystem() {
this(new RawLocalFileSystem());
}
public FileSystem getRaw() {
return rfs;
return getRawFileSystem();
}
public LocalFileSystem(FileSystem rawLocalFileSystem) {
super(rawLocalFileSystem);
rfs = rawLocalFileSystem;
}
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
// ctor didn't initialize the filtered fs
getRawFileSystem().initialize(uri, conf);
}
/** Convert a path to a File. */
public File pathToFile(Path path) {
return ((RawLocalFileSystem)fs).pathToFile(path);

View File

@ -19,9 +19,6 @@ package org.apache.hadoop.fs.viewfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@ -31,11 +28,11 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
/**
@ -60,15 +57,14 @@ import org.apache.hadoop.util.Progressable;
@InterfaceAudience.Private
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
class ChRootedFileSystem extends FileSystem {
private final FileSystem myFs; // the base file system whose root is changed
class ChRootedFileSystem extends FilterFileSystem {
private final URI myUri; // the base URI + the chRoot
private final Path chRootPathPart; // the root below the root of the base
private final String chRootPathPartString;
private Path workingDir;
protected FileSystem getMyFs() {
return myFs;
return getRawFileSystem();
}
/**
@ -84,37 +80,16 @@ class ChRootedFileSystem extends FileSystem {
/**
* Constructor
* @param fs base file system
* @param theRoot chRoot for this file system
* @throws URISyntaxException
* @param uri base file system
* @param conf configuration
* @throws IOException
*/
public ChRootedFileSystem(final FileSystem fs, final Path theRoot)
throws URISyntaxException {
myFs = fs;
myFs.makeQualified(theRoot); //check that root is a valid path for fs
// Would like to call myFs.checkPath(theRoot);
// but not public
chRootPathPart = new Path(theRoot.toUri().getPath());
public ChRootedFileSystem(final URI uri, Configuration conf)
throws IOException {
super(FileSystem.get(uri, conf));
chRootPathPart = new Path(uri.getPath());
chRootPathPartString = chRootPathPart.toUri().getPath();
try {
initialize(fs.getUri(), fs.getConf());
} catch (IOException e) { // This exception should not be thrown
throw new RuntimeException("This should not occur");
}
/*
* We are making URI include the chrootedPath: e.g. file:///chrootedPath.
* This is questionable since Path#makeQualified(uri, path) ignores
* the pathPart of a uri. Since this class is internal we can ignore
* this issue but if we were to make it external then this needs
* to be resolved.
*/
// Handle the two cases:
// scheme:/// and scheme://authority/
myUri = new URI(myFs.getUri().toString() +
(myFs.getUri().getAuthority() == null ? "" : Path.SEPARATOR) +
chRootPathPart.toString().substring(1));
myUri = uri;
workingDir = getHomeDirectory();
// We don't use the wd of the myFs
}
@ -127,7 +102,6 @@ class ChRootedFileSystem extends FileSystem {
*/
public void initialize(final URI name, final Configuration conf)
throws IOException {
myFs.initialize(name, conf);
super.initialize(name, conf);
setConf(conf);
}
@ -137,12 +111,6 @@ class ChRootedFileSystem extends FileSystem {
return myUri;
}
@Override
public Path makeQualified(final Path path) {
return myFs.makeQualified(path);
// NOT myFs.makeQualified(fullPath(path));
}
/**
* Strip out the root from the path.
* @param p - fully qualified path p
@ -175,7 +143,7 @@ class ChRootedFileSystem extends FileSystem {
public Path getResolvedQualifiedPath(final Path f)
throws FileNotFoundException {
return myFs.makeQualified(
return makeQualified(
new Path(chRootPathPartString + f.toUri().toString()));
}
@ -199,14 +167,14 @@ class ChRootedFileSystem extends FileSystem {
public FSDataOutputStream create(final Path f, final FsPermission permission,
final boolean overwrite, final int bufferSize, final short replication,
final long blockSize, final Progressable progress) throws IOException {
return myFs.create(fullPath(f), permission, overwrite, bufferSize,
return super.create(fullPath(f), permission, overwrite, bufferSize,
replication, blockSize, progress);
}
@Override
public boolean delete(final Path f, final boolean recursive)
throws IOException {
return myFs.delete(fullPath(f), recursive);
return super.delete(fullPath(f), recursive);
}
@ -219,95 +187,90 @@ class ChRootedFileSystem extends FileSystem {
@Override
public BlockLocation[] getFileBlockLocations(final FileStatus fs, final long start,
final long len) throws IOException {
return myFs.getFileBlockLocations(
return super.getFileBlockLocations(
new ViewFsFileStatus(fs, fullPath(fs.getPath())), start, len);
}
@Override
public FileChecksum getFileChecksum(final Path f)
throws IOException {
return myFs.getFileChecksum(fullPath(f));
return super.getFileChecksum(fullPath(f));
}
@Override
public FileStatus getFileStatus(final Path f)
throws IOException {
return myFs.getFileStatus(fullPath(f));
return super.getFileStatus(fullPath(f));
}
@Override
public FsStatus getStatus(Path p) throws IOException {
return myFs.getStatus(fullPath(p));
return super.getStatus(fullPath(p));
}
@Override
public FsServerDefaults getServerDefaults() throws IOException {
return myFs.getServerDefaults();
return super.getServerDefaults();
}
@Override
public FileStatus[] listStatus(final Path f)
throws IOException {
return myFs.listStatus(fullPath(f));
return super.listStatus(fullPath(f));
}
@Override
public boolean mkdirs(final Path f, final FsPermission permission)
throws IOException {
return myFs.mkdirs(fullPath(f), permission);
return super.mkdirs(fullPath(f), permission);
}
@Override
public FSDataInputStream open(final Path f, final int bufferSize)
throws IOException {
return myFs.open(fullPath(f), bufferSize);
return super.open(fullPath(f), bufferSize);
}
@Override
public FSDataOutputStream append(final Path f, final int bufferSize,
final Progressable progress) throws IOException {
return myFs.append(fullPath(f), bufferSize, progress);
return super.append(fullPath(f), bufferSize, progress);
}
@Override
public boolean rename(final Path src, final Path dst) throws IOException {
// note fullPath will check that paths are relative to this FileSystem.
// Hence both are in same file system and a rename is valid
return myFs.rename(fullPath(src), fullPath(dst));
return super.rename(fullPath(src), fullPath(dst));
}
@Override
public void setOwner(final Path f, final String username,
final String groupname)
throws IOException {
myFs.setOwner(fullPath(f), username, groupname);
super.setOwner(fullPath(f), username, groupname);
}
@Override
public void setPermission(final Path f, final FsPermission permission)
throws IOException {
myFs.setPermission(fullPath(f), permission);
super.setPermission(fullPath(f), permission);
}
@Override
public boolean setReplication(final Path f, final short replication)
throws IOException {
return myFs.setReplication(fullPath(f), replication);
return super.setReplication(fullPath(f), replication);
}
@Override
public void setTimes(final Path f, final long mtime, final long atime)
throws IOException {
myFs.setTimes(fullPath(f), mtime, atime);
}
@Override
public void setVerifyChecksum(final boolean verifyChecksum) {
myFs.setVerifyChecksum(verifyChecksum);
super.setTimes(fullPath(f), mtime, atime);
}
@Override
public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
return myFs.getDelegationTokens(renewer);
public Path resolvePath(final Path p) throws IOException {
return super.resolvePath(fullPath(p));
}
}

View File

@ -168,8 +168,7 @@ public class ViewFileSystem extends FileSystem {
protected
FileSystem getTargetFileSystem(final URI uri)
throws URISyntaxException, IOException {
return new ChRootedFileSystem(FileSystem.get(uri, config),
new Path(uri.getPath()));
return new ChRootedFileSystem(uri, config);
}
@Override
@ -464,8 +463,11 @@ public class ViewFileSystem extends FileSystem {
@Override
public void setVerifyChecksum(final boolean verifyChecksum) {
// This is a file system level operations, however ViewFileSystem
// points to many file systems. Noop for ViewFileSystem.
List<InodeTree.MountPoint<FileSystem>> mountPoints =
fsState.getMountPoints();
for (InodeTree.MountPoint<FileSystem> mount : mountPoints) {
mount.target.targetFileSystem.setVerifyChecksum(verifyChecksum);
}
}
public MountPoint[] getMountPoints() {

View File

@ -72,14 +72,15 @@ public final class FileSystemTestHelper {
public static String getAbsoluteTestRootDir(FileSystem fSys)
throws IOException {
if (absTestRootDir == null) {
// NOTE: can't cache because of different filesystems!
//if (absTestRootDir == null)
if (TEST_ROOT_DIR.startsWith("/")) {
absTestRootDir = TEST_ROOT_DIR;
} else {
absTestRootDir = fSys.getWorkingDirectory().toString() + "/"
+ TEST_ROOT_DIR;
}
}
//}
return absTestRootDir;
}

View File

@ -203,4 +203,58 @@ public class TestChecksumFileSystem {
String str = readFile(localFs, testPath, 1024);
assertEquals("testing stale checksum", str);
}
@Test
public void testRenameFileToFile() throws Exception {
Path srcPath = new Path(TEST_ROOT_DIR, "testRenameSrc");
Path dstPath = new Path(TEST_ROOT_DIR, "testRenameDst");
verifyRename(srcPath, dstPath, false);
}
@Test
public void testRenameFileIntoDir() throws Exception {
Path srcPath = new Path(TEST_ROOT_DIR, "testRenameSrc");
Path dstPath = new Path(TEST_ROOT_DIR, "testRenameDir");
localFs.mkdirs(dstPath);
verifyRename(srcPath, dstPath, true);
}
@Test
public void testRenameFileIntoDirFile() throws Exception {
Path srcPath = new Path(TEST_ROOT_DIR, "testRenameSrc");
Path dstPath = new Path(TEST_ROOT_DIR, "testRenameDir/testRenameDst");
assertTrue(localFs.mkdirs(dstPath));
verifyRename(srcPath, dstPath, false);
}
void verifyRename(Path srcPath, Path dstPath, boolean dstIsDir)
throws Exception {
localFs.delete(srcPath,true);
localFs.delete(dstPath,true);
Path realDstPath = dstPath;
if (dstIsDir) {
localFs.mkdirs(dstPath);
realDstPath = new Path(dstPath, srcPath.getName());
}
// ensure file + checksum are moved
writeFile(localFs, srcPath, 1);
assertTrue(localFs.exists(localFs.getChecksumFile(srcPath)));
assertTrue(localFs.rename(srcPath, dstPath));
assertTrue(localFs.exists(localFs.getChecksumFile(realDstPath)));
// create a file with no checksum, rename, ensure dst checksum is removed
writeFile(localFs.getRawFileSystem(), srcPath, 1);
assertFalse(localFs.exists(localFs.getChecksumFile(srcPath)));
assertTrue(localFs.rename(srcPath, dstPath));
assertFalse(localFs.exists(localFs.getChecksumFile(realDstPath)));
// create file with checksum, rename over prior dst with no checksum
writeFile(localFs, srcPath, 1);
assertTrue(localFs.exists(localFs.getChecksumFile(srcPath)));
assertTrue(localFs.rename(srcPath, dstPath));
assertTrue(localFs.exists(localFs.getChecksumFile(realDstPath)));
}
}

View File

@ -51,7 +51,7 @@ public class TestChRootedFileSystem {
// ChRoot to the root of the testDirectory
fSys = new ChRootedFileSystem(fSysTarget, chrootedTo);
fSys = new ChRootedFileSystem(chrootedTo.toUri(), conf);
}
@After

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.IOException;
import java.net.URI;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.*;
import static org.junit.Assert.*;
/**
* Verify that viewfs propagates certain methods to the underlying fs
*/
public class TestViewFileSystemDelegation { //extends ViewFileSystemTestSetup {
static Configuration conf;
static FileSystem viewFs;
static FakeFileSystem fs1;
static FakeFileSystem fs2;
@BeforeClass
public static void setup() throws Exception {
conf = ViewFileSystemTestSetup.configWithViewfsScheme();
fs1 = setupFileSystem(new URI("fs1:/"), FakeFileSystem.class);
fs2 = setupFileSystem(new URI("fs2:/"), FakeFileSystem.class);
viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
}
static FakeFileSystem setupFileSystem(URI uri, Class clazz)
throws Exception {
String scheme = uri.getScheme();
conf.set("fs."+scheme+".impl", clazz.getName());
FakeFileSystem fs = (FakeFileSystem)FileSystem.get(uri, conf);
assertEquals(uri, fs.getUri());
Path targetPath = FileSystemTestHelper.getAbsoluteTestRootPath(fs);
ConfigUtil.addLink(conf, "/mounts/"+scheme, targetPath.toUri());
return fs;
}
@Test
public void testSanity() {
assertEquals("fs1:/", fs1.getUri().toString());
assertEquals("fs2:/", fs2.getUri().toString());
}
@Test
public void testVerifyChecksum() throws Exception {
checkVerifyChecksum(false);
checkVerifyChecksum(true);
}
void checkVerifyChecksum(boolean flag) {
viewFs.setVerifyChecksum(flag);
assertEquals(flag, fs1.getVerifyChecksum());
assertEquals(flag, fs2.getVerifyChecksum());
}
static class FakeFileSystem extends LocalFileSystem {
boolean verifyChecksum = true;
URI uri;
@Override
public void initialize(URI uri, Configuration conf) throws IOException {
super.initialize(uri, conf);
this.uri = uri;
}
@Override
public URI getUri() {
return uri;
}
@Override
public void setVerifyChecksum(boolean verifyChecksum) {
this.verifyChecksum = verifyChecksum;
}
public boolean getVerifyChecksum(){
return verifyChecksum;
}
}
}

View File

@ -208,6 +208,12 @@ Release 0.23.1 - UNRELEASED
HDFS-2397. Undeprecate SecondaryNameNode (eli)
HDFS-2814. NamenodeMXBean does not account for svn revision in the version
information. (Hitesh Shah via jitendra)
HDFS-2784. Update hftp and hdfs for host-based token support.
(Kihwal Lee via jitendra)
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)
@ -231,6 +237,8 @@ Release 0.23.1 - UNRELEASED
HDFS-2864. Remove some redundant methods and the constant METADATA_VERSION
from FSDataset. (szetszwo)
HDFS-2879. Change FSDataset to package private. (szetszwo)
BUG FIXES
HDFS-2541. For a sufficiently large value of blocks, the DN Scanner
@ -313,6 +321,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2835. Fix findbugs and javadoc issue with GetConf.java.
(suresh)
HDFS-2889. getNumCurrentReplicas is package private but should be public on
0.23 (see HDFS-2408). (Gregory Chanan via atm)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
@ -1654,8 +1665,13 @@ Release 0.22.1 - Unreleased
OPTIMIZATIONS
HDFS-2718. Optimize OP_ADD in edits loading. (shv)
BUG FIXES
HDFS-2877. If locking of a storage dir fails, it will remove the other
NN's lock file on exit. (todd)
Release 0.22.0 - 2011-11-29
INCOMPATIBLE CHANGES

View File

@ -631,7 +631,7 @@ public class DFSClient implements java.io.Closeable {
DelegationTokenIdentifier.stringifyToken(delToken));
ClientProtocol nn =
DFSUtil.createNamenode
(NameNode.getAddress(token.getService().toString()),
(SecurityUtil.getTokenServiceAddr(delToken),
conf, UserGroupInformation.getCurrentUser());
try {
return nn.renewDelegationToken(delToken);
@ -649,7 +649,7 @@ public class DFSClient implements java.io.Closeable {
LOG.info("Cancelling " +
DelegationTokenIdentifier.stringifyToken(delToken));
ClientProtocol nn = DFSUtil.createNamenode(
NameNode.getAddress(token.getService().toString()), conf,
SecurityUtil.getTokenServiceAddr(delToken), conf,
UserGroupInformation.getCurrentUser());
try {
nn.cancelDelegationToken(delToken);

View File

@ -1538,7 +1538,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
* write pipeline have failed.
* @return the number of valid replicas of the current block
*/
synchronized int getNumCurrentReplicas() throws IOException {
public synchronized int getNumCurrentReplicas() throws IOException {
dfsClient.checkOpen();
isClosed();
if (streamer == null) {

View File

@ -108,45 +108,10 @@ public class DistributedFileSystem extends FileSystem {
InetSocketAddress namenode = NameNode.getAddress(uri.getAuthority());
this.dfs = new DFSClient(namenode, conf, statistics);
this.uri = URI.create(HdfsConstants.HDFS_URI_SCHEME + "://" + uri.getAuthority());
this.uri = URI.create(uri.getScheme()+"://"+uri.getAuthority());
this.workingDir = getHomeDirectory();
}
/** Permit paths which explicitly specify the default port. */
@Override
protected void checkPath(Path path) {
URI thisUri = this.getUri();
URI thatUri = path.toUri();
String thatAuthority = thatUri.getAuthority();
if (thatUri.getScheme() != null
&& thatUri.getScheme().equalsIgnoreCase(thisUri.getScheme())
&& thatUri.getPort() == NameNode.DEFAULT_PORT
&& (thisUri.getPort() == -1 ||
thisUri.getPort() == NameNode.DEFAULT_PORT)
&& thatAuthority.substring(0,thatAuthority.indexOf(":"))
.equalsIgnoreCase(thisUri.getAuthority()))
return;
super.checkPath(path);
}
/** Normalize paths that explicitly specify the default port. */
@Override
public Path makeQualified(Path path) {
URI thisUri = this.getUri();
URI thatUri = path.toUri();
String thatAuthority = thatUri.getAuthority();
if (thatUri.getScheme() != null
&& thatUri.getScheme().equalsIgnoreCase(thisUri.getScheme())
&& thatUri.getPort() == NameNode.DEFAULT_PORT
&& thisUri.getPort() == -1
&& thatAuthority.substring(0,thatAuthority.indexOf(":"))
.equalsIgnoreCase(thisUri.getAuthority())) {
path = new Path(thisUri.getScheme(), thisUri.getAuthority(),
thatUri.getPath());
}
return super.makeQualified(path);
}
@Override
public Path getWorkingDirectory() {
return workingDir;

View File

@ -59,6 +59,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ServletUtil;
import org.xml.sax.Attributes;
@ -89,17 +90,20 @@ public class HftpFileSystem extends FileSystem
public static final Text TOKEN_KIND = new Text("HFTP delegation");
private String nnHttpUrl;
private Text hdfsServiceName;
protected UserGroupInformation ugi;
private URI hftpURI;
protected InetSocketAddress nnAddr;
protected UserGroupInformation ugi;
protected InetSocketAddress nnSecureAddr;
public static final String HFTP_TIMEZONE = "UTC";
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
private Token<?> delegationToken;
private Token<?> renewToken;
private static final HftpDelegationTokenSelector hftpTokenSelector =
new HftpDelegationTokenSelector();
public static final SimpleDateFormat getDateFormat() {
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
@ -115,11 +119,8 @@ public class HftpFileSystem extends FileSystem
@Override
protected int getDefaultPort() {
return getDefaultSecurePort();
//TODO: un-comment the following once HDFS-7510 is committed.
// return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
// DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
}
protected int getDefaultSecurePort() {
@ -127,16 +128,22 @@ public class HftpFileSystem extends FileSystem
DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
}
@Override
public String getCanonicalServiceName() {
return SecurityUtil.buildDTServiceName(hftpURI, getDefaultPort());
}
private String buildUri(String schema, String host, int port) {
StringBuilder sb = new StringBuilder(schema);
return sb.append(host).append(":").append(port).toString();
protected InetSocketAddress getNamenodeAddr(URI uri) {
// use authority so user supplied uri can override port
return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
}
protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
// must only use the host and the configured https port
return NetUtils.createSocketAddrForHost(uri.getHost(), getDefaultSecurePort());
}
@Override
public String getCanonicalServiceName() {
// unlike other filesystems, hftp's service is the secure port, not the
// actual port in the uri
return SecurityUtil.buildTokenService(nnSecureAddr).toString();
}
@Override
public void initialize(final URI name, final Configuration conf)
@ -144,95 +151,51 @@ public class HftpFileSystem extends FileSystem
super.initialize(name, conf);
setConf(conf);
this.ugi = UserGroupInformation.getCurrentUser();
nnAddr = NetUtils.createSocketAddr(name.toString());
// in case we open connection to hftp of a different cluster
// we need to know this cluster https port
// if it is not set we assume it is the same cluster or same port
int urlPort = conf.getInt("dfs.hftp.https.port", -1);
if(urlPort == -1)
urlPort = conf.getInt(DFSConfigKeys.DFS_HTTPS_PORT_KEY,
DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
String normalizedNN = NetUtils.normalizeHostName(name.getHost());
nnHttpUrl = buildUri("https://", normalizedNN ,urlPort);
LOG.debug("using url to get DT:" + nnHttpUrl);
this.nnAddr = getNamenodeAddr(name);
this.nnSecureAddr = getNamenodeSecureAddr(name);
try {
hftpURI = new URI(buildUri("hftp://", normalizedNN, urlPort));
} catch (URISyntaxException ue) {
throw new IOException("bad uri for hdfs", ue);
}
// if one uses RPC port different from the Default one,
// one should specify what is the setvice name for this delegation token
// otherwise it is hostname:RPC_PORT
String key = DelegationTokenSelector.SERVICE_NAME_KEY
+ SecurityUtil.buildDTServiceName(name,
DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
if(LOG.isDebugEnabled()) {
LOG.debug("Trying to find DT for " + name + " using key=" + key +
"; conf=" + conf.get(key, ""));
}
String nnServiceName = conf.get(key);
int nnPort = NameNode.DEFAULT_PORT;
if (nnServiceName != null) { // get the real port
nnPort = NetUtils.createSocketAddr(nnServiceName,
NameNode.DEFAULT_PORT).getPort();
}
try {
URI hdfsURI = new URI("hdfs://" + normalizedNN + ":" + nnPort);
hdfsServiceName = new Text(SecurityUtil.buildDTServiceName(hdfsURI,
nnPort));
} catch (URISyntaxException ue) {
throw new IOException("bad uri for hdfs", ue);
this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
null, null, null);
} catch (URISyntaxException e) {
throw new IllegalArgumentException(e);
}
if (UserGroupInformation.isSecurityEnabled()) {
//try finding a token for this namenode (esp applicable for tasks
//using hftp). If there exists one, just set the delegationField
String hftpServiceName = getCanonicalServiceName();
for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
Text kind = t.getKind();
if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind)) {
if (t.getService().equals(hdfsServiceName)) {
setDelegationToken(t);
break;
}
} else if (TOKEN_KIND.equals(kind)) {
if (hftpServiceName
.equals(normalizeService(t.getService().toString()))) {
setDelegationToken(t);
break;
}
}
}
//since we don't already have a token, go get one over https
if (delegationToken == null) {
setDelegationToken(getDelegationToken(null));
initDelegationToken();
}
}
protected void initDelegationToken() throws IOException {
// look for hftp token, then try hdfs
Token<?> token = selectHftpDelegationToken();
if (token == null) {
token = selectHdfsDelegationToken();
}
// if we don't already have a token, go get one over https
boolean createdToken = false;
if (token == null) {
token = getDelegationToken(null);
createdToken = (token != null);
}
// we already had a token or getDelegationToken() didn't fail.
if (token != null) {
setDelegationToken(token);
if (createdToken) {
dtRenewer.addRenewAction(this);
LOG.debug("Created new DT for " + token.getService());
} else {
LOG.debug("Found existing DT for " + token.getService());
}
}
}
private String normalizeService(String service) {
int colonIndex = service.indexOf(':');
if (colonIndex == -1) {
throw new IllegalArgumentException("Invalid service for hftp token: " +
service);
}
String hostname =
NetUtils.normalizeHostName(service.substring(0, colonIndex));
String port = service.substring(colonIndex + 1);
return hostname + ":" + port;
protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() {
Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr);
return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());
}
//TODO: un-comment the following once HDFS-7510 is committed.
// protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() {
// Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr);
// return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());
// }
protected Token<DelegationTokenIdentifier> selectHdfsDelegationToken() {
return DelegationTokenSelector.selectHdfsDelegationToken(
nnAddr, ugi, getConf());
@ -245,13 +208,17 @@ public class HftpFileSystem extends FileSystem
}
@Override
public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
renewToken = token;
// emulate the 203 usage of the tokens
// by setting the kind and service as if they were hdfs tokens
delegationToken = new Token<T>(token);
// NOTE: the remote nn must be configured to use hdfs
delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
delegationToken.setService(hdfsServiceName);
// no need to change service because we aren't exactly sure what it
// should be. we can guess, but it might be wrong if the local conf
// value is incorrect. the service is a client side field, so the remote
// end does not care about the value
}
@Override
@ -262,6 +229,7 @@ public class HftpFileSystem extends FileSystem
ugi.reloginFromKeytab();
return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
public Token<?> run() throws IOException {
final String nnHttpUrl = DFSUtil.createUri("https", nnSecureAddr).toString();
Credentials c;
try {
c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
@ -291,12 +259,7 @@ public class HftpFileSystem extends FileSystem
@Override
public URI getUri() {
try {
return new URI("hftp", null, nnAddr.getHostName(), nnAddr.getPort(),
null, null, null);
} catch (URISyntaxException e) {
return null;
}
return hftpURI;
}
/**
@ -722,11 +685,12 @@ public class HftpFileSystem extends FileSystem
public long renew(Token<?> token,
Configuration conf) throws IOException {
// update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
UserGroupInformation.getLoginUser().reloginFromKeytab();
// use https to renew the token
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
return
DelegationTokenFetcher.renewDelegationToken
("https://" + token.getService().toString(),
(DFSUtil.createUri("https", serviceAddr).toString(),
(Token<DelegationTokenIdentifier>) token);
}
@ -737,10 +701,18 @@ public class HftpFileSystem extends FileSystem
// update the kerberos credentials, if they are coming from a keytab
UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
// use https to cancel the token
InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
DelegationTokenFetcher.cancelDelegationToken
("https://" + token.getService().toString(),
(DFSUtil.createUri("https", serviceAddr).toString(),
(Token<DelegationTokenIdentifier>) token);
}
}
private static class HftpDelegationTokenSelector
extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
public HftpDelegationTokenSelector() {
super(TOKEN_KIND);
}
}
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
@ -120,6 +121,16 @@ public class HsftpFileSystem extends HftpFileSystem {
}
}
@Override
protected int getDefaultPort() {
return getDefaultSecurePort();
}
@Override
protected InetSocketAddress getNamenodeSecureAddr(URI uri) {
return getNamenodeAddr(uri);
}
@Override
protected HttpURLConnection openConnection(String path, String query)
throws IOException {
@ -161,16 +172,6 @@ public class HsftpFileSystem extends HftpFileSystem {
return (HttpURLConnection) conn;
}
@Override
public URI getUri() {
try {
return new URI("hsftp", null, nnAddr.getHostName(), nnAddr.getPort(),
null, null, null);
} catch (URISyntaxException e) {
return null;
}
}
/**
* Dummy hostname verifier that is used to bypass hostname checking
*/

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
@ -296,8 +297,7 @@ public class DelegationTokenSecretManager
}
final InetSocketAddress addr = namenode.getNameNodeAddress();
final String s = addr.getAddress().getHostAddress() + ":" + addr.getPort();
token.setService(new Text(s));
SecurityUtil.setTokenService(token, addr);
final Credentials c = new Credentials();
c.addToken(new Text(ugi.getShortUserName()), token);
return c;

View File

@ -157,9 +157,6 @@ public class BlockInfoUnderConstruction extends BlockInfo {
BlockInfo convertToCompleteBlock() throws IOException {
assert getBlockUCState() != BlockUCState.COMPLETE :
"Trying to convert a COMPLETE block";
if(getBlockUCState() != BlockUCState.COMMITTED)
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
return new BlockInfo(this);
}

View File

@ -441,15 +441,23 @@ public class BlockManager {
*/
private BlockInfo completeBlock(final INodeFile fileINode,
final int blkIndex) throws IOException {
return completeBlock(fileINode, blkIndex, false);
}
public BlockInfo completeBlock(final INodeFile fileINode,
final int blkIndex, final boolean force) throws IOException {
if(blkIndex < 0)
return null;
BlockInfo curBlock = fileINode.getBlocks()[blkIndex];
if(curBlock.isComplete())
return curBlock;
BlockInfoUnderConstruction ucBlock = (BlockInfoUnderConstruction)curBlock;
if(ucBlock.numNodes() < minReplication)
if(!force && ucBlock.numNodes() < minReplication)
throw new IOException("Cannot complete block: " +
"block does not satisfy minimal replication requirement.");
if(!force && ucBlock.getBlockUCState() != BlockUCState.COMMITTED)
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
BlockInfo completeBlock = ucBlock.convertToCompleteBlock();
// replace penultimate block in file
fileINode.setBlock(blkIndex, completeBlock);

View File

@ -62,6 +62,7 @@ import org.apache.hadoop.http.HtmlQuoting;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authentication.util.KerberosName;
@ -492,7 +493,7 @@ public class JspHelper {
return UserGroupInformation.createRemoteUser(strings[0]);
}
private static String getNNServiceAddress(ServletContext context,
private static InetSocketAddress getNNServiceAddress(ServletContext context,
HttpServletRequest request) {
String namenodeAddressInUrl = request.getParameter(NAMENODE_ADDRESS);
InetSocketAddress namenodeAddress = null;
@ -503,8 +504,7 @@ public class JspHelper {
context);
}
if (namenodeAddress != null) {
return (namenodeAddress.getAddress().getHostAddress() + ":"
+ namenodeAddress.getPort());
return namenodeAddress;
}
return null;
}
@ -547,9 +547,9 @@ public class JspHelper {
Token<DelegationTokenIdentifier> token =
new Token<DelegationTokenIdentifier>();
token.decodeFromUrlString(tokenString);
String serviceAddress = getNNServiceAddress(context, request);
InetSocketAddress serviceAddress = getNNServiceAddress(context, request);
if (serviceAddress != null) {
token.setService(new Text(serviceAddress));
SecurityUtil.setTokenService(token, serviceAddress);
token.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
}
ByteArrayInputStream buf = new ByteArrayInputStream(token

View File

@ -599,8 +599,12 @@ public abstract class Storage extends StorageInfo {
* @throws IOException if locking fails.
*/
FileLock tryLock() throws IOException {
boolean deletionHookAdded = false;
File lockF = new File(root, STORAGE_FILE_LOCK);
lockF.deleteOnExit();
if (!lockF.exists()) {
lockF.deleteOnExit();
deletionHookAdded = true;
}
RandomAccessFile file = new RandomAccessFile(lockF, "rws");
FileLock res = null;
try {
@ -613,6 +617,12 @@ public abstract class Storage extends StorageInfo {
file.close();
throw e;
}
if (res != null && !deletionHookAdded) {
// If the file existed prior to our startup, we didn't
// call deleteOnExit above. But since we successfully locked
// the dir, we can take care of cleaning it up.
lockF.deleteOnExit();
}
return res;
}

View File

@ -222,7 +222,7 @@ class BlockReceiver implements Closeable {
cleanupBlock();
// check if there is a disk error
IOException cause = FSDataset.getCauseIfDiskError(ioe);
IOException cause = DatanodeUtil.getCauseIfDiskError(ioe);
DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ",
cause);

View File

@ -0,0 +1,67 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.Block;
/** Provide utility methods for Datanode. */
@InterfaceAudience.Private
class DatanodeUtil {
private final static String DISK_ERROR = "Possible disk error on file creation: ";
/** Get the cause of an I/O exception if caused by a possible disk error
* @param ioe an I/O exception
* @return cause if the I/O exception is caused by a possible disk error;
* null otherwise.
*/
static IOException getCauseIfDiskError(IOException ioe) {
if (ioe.getMessage()!=null && ioe.getMessage().startsWith(DISK_ERROR)) {
return (IOException)ioe.getCause();
} else {
return null;
}
}
/**
* Create a new file.
* @throws IOException
* if the file already exists or if the file cannot be created.
*/
static File createTmpFile(Block b, File f) throws IOException {
if (f.exists()) {
throw new IOException("Unexpected problem in creating temporary file for "
+ b + ". File " + f + " should not be present, but is.");
}
// Create the zero-length temp file
final boolean fileCreated;
try {
fileCreated = f.createNewFile();
} catch (IOException ioe) {
throw (IOException)new IOException(DISK_ERROR + f).initCause(ioe);
}
if (!fileCreated) {
throw new IOException("Unexpected problem in creating temporary file for "
+ b + ". File " + f + " should be creatable, but is already present.");
}
return f;
}
}

View File

@ -75,7 +75,7 @@ import org.apache.hadoop.util.ReflectionUtils;
*
***************************************************/
@InterfaceAudience.Private
public class FSDataset implements FSDatasetInterface {
class FSDataset implements FSDatasetInterface {
/**
* A node type that can be built into a tree reflecting the
@ -373,7 +373,7 @@ public class FSDataset implements FSDatasetInterface {
*/
File createTmpFile(Block b) throws IOException {
File f = new File(tmpDir, b.getBlockName());
return FSDataset.createTmpFile(b, f);
return DatanodeUtil.createTmpFile(b, f);
}
/**
@ -382,7 +382,7 @@ public class FSDataset implements FSDatasetInterface {
*/
File createRbwFile(Block b) throws IOException {
File f = new File(rbwDir, b.getBlockName());
return FSDataset.createTmpFile(b, f);
return DatanodeUtil.createTmpFile(b, f);
}
File addBlock(Block b, File f) throws IOException {
@ -536,15 +536,15 @@ public class FSDataset implements FSDatasetInterface {
}
/** Return storage directory corresponding to the volume */
public File getDir() {
File getDir() {
return currentDir.getParentFile();
}
public File getCurrentDir() {
File getCurrentDir() {
return currentDir;
}
public File getRbwDir(String bpid) throws IOException {
File getRbwDir(String bpid) throws IOException {
BlockPoolSlice bp = getBlockPoolSlice(bpid);
return bp.getRbwDir();
}
@ -1085,26 +1085,6 @@ public class FSDataset implements FSDatasetInterface {
return new MetaDataInputStream(new FileInputStream(checksumFile),
checksumFile.length());
}
static File createTmpFile(Block b, File f) throws IOException {
if (f.exists()) {
throw new IOException("Unexpected problem in creating temporary file for "+
b + ". File " + f + " should not be present, but is.");
}
// Create the zero-length temp file
//
boolean fileCreated = false;
try {
fileCreated = f.createNewFile();
} catch (IOException ioe) {
throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
}
if (!fileCreated) {
throw new IOException("Unexpected problem in creating temporary file for "+
b + ". File " + f + " should be creatable, but is already present.");
}
return f;
}
private final DataNode datanode;
final FSVolumeSet volumes;
@ -1246,7 +1226,7 @@ public class FSDataset implements FSDatasetInterface {
/**
* Get File name for a given block.
*/
public File getBlockFile(ExtendedBlock b) throws IOException {
private File getBlockFile(ExtendedBlock b) throws IOException {
return getBlockFile(b.getBlockPoolId(), b.getLocalBlock());
}
@ -1320,7 +1300,7 @@ public class FSDataset implements FSDatasetInterface {
* @throws ReplicaNotFoundException if no entry is in the map or
* there is a generation stamp mismatch
*/
private ReplicaInfo getReplicaInfo(ExtendedBlock b)
ReplicaInfo getReplicaInfo(ExtendedBlock b)
throws ReplicaNotFoundException {
ReplicaInfo info = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
if (info == null) {
@ -1456,19 +1436,6 @@ public class FSDataset implements FSDatasetInterface {
}
}
private final static String DISK_ERROR = "Possible disk error on file creation: ";
/** Get the cause of an I/O exception if caused by a possible disk error
* @param ioe an I/O exception
* @return cause if the I/O exception is caused by a possible disk error;
* null otherwise.
*/
static IOException getCauseIfDiskError(IOException ioe) {
if (ioe.getMessage()!=null && ioe.getMessage().startsWith(DISK_ERROR)) {
return (IOException)ioe.getCause();
} else {
return null;
}
}
@Override // FSDatasetInterface
public synchronized ReplicaInPipelineInterface append(ExtendedBlock b,

View File

@ -162,7 +162,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
* be recovered (especially on Windows) on datanode restart.
*/
private void unlinkFile(File file, Block b) throws IOException {
File tmpFile = FSDataset.createTmpFile(b, FSDataset.getUnlinkTmpFile(file));
File tmpFile = DatanodeUtil.createTmpFile(b, FSDataset.getUnlinkTmpFile(file));
try {
FileInputStream in = new FileInputStream(file);
try {

View File

@ -262,22 +262,28 @@ public class FSDirectory implements Closeable {
short replication,
long modificationTime,
long atime,
long preferredBlockSize)
long preferredBlockSize,
String clientName,
String clientMachine)
throws UnresolvedLinkException {
INode newNode;
long diskspace = UNKNOWN_DISK_SPACE;
assert hasWriteLock();
if (blocks == null)
newNode = new INodeDirectory(permissions, modificationTime);
else {
else if(blocks.length == 0 || blocks[blocks.length-1].getBlockUCState()
== BlockUCState.UNDER_CONSTRUCTION) {
newNode = new INodeFileUnderConstruction(
permissions, blocks.length, replication,
preferredBlockSize, modificationTime, clientName,
clientMachine, null);
} else {
newNode = new INodeFile(permissions, blocks.length, replication,
modificationTime, atime, preferredBlockSize);
diskspace = ((INodeFile)newNode).diskspaceConsumed(blocks);
}
writeLock();
try {
try {
newNode = addNode(path, newNode, diskspace);
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
if(newNode != null && blocks != null) {
int nrBlocks = blocks.length;
// Add file->block mapping
@ -296,6 +302,74 @@ public class FSDirectory implements Closeable {
}
/**
* Update files in-memory data structures with new block information.
* @throws IOException
*/
void updateFile(INodeFile file,
String path,
PermissionStatus permissions,
BlockInfo[] blocks,
short replication,
long mtime,
long atime,
long preferredBlockSize) throws IOException {
// Update the salient file attributes.
file.setAccessTime(atime);
file.setModificationTimeForce(mtime);
// Update its block list
BlockInfo[] oldBlocks = file.getBlocks();
// Are we only updating the last block's gen stamp.
boolean isGenStampUpdate = oldBlocks.length == blocks.length;
// First, update blocks in common
BlockInfo oldBlock = null;
for (int i = 0; i < oldBlocks.length && i < blocks.length; i++) {
oldBlock = oldBlocks[i];
Block newBlock = blocks[i];
boolean isLastBlock = i == oldBlocks.length - 1;
if (oldBlock.getBlockId() != newBlock.getBlockId() ||
(oldBlock.getGenerationStamp() != newBlock.getGenerationStamp() &&
!(isGenStampUpdate && isLastBlock))) {
throw new IOException("Mismatched block IDs or generation stamps, " +
"attempting to replace block " + oldBlock + " with " + newBlock +
" as block # " + i + "/" + blocks.length + " of " + path);
}
oldBlock.setNumBytes(newBlock.getNumBytes());
oldBlock.setGenerationStamp(newBlock.getGenerationStamp());
}
if (blocks.length < oldBlocks.length) {
// We're removing a block from the file, e.g. abandonBlock(...)
if (!file.isUnderConstruction()) {
throw new IOException("Trying to remove a block from file " +
path + " which is not under construction.");
}
if (blocks.length != oldBlocks.length - 1) {
throw new IOException("Trying to remove more than one block from file "
+ path);
}
unprotectedRemoveBlock(path,
(INodeFileUnderConstruction)file, oldBlocks[oldBlocks.length - 1]);
} else if (blocks.length > oldBlocks.length) {
// We're adding blocks
// First complete last old Block
getBlockManager().completeBlock(file, oldBlocks.length-1, true);
// Add the new blocks
for (int i = oldBlocks.length; i < blocks.length; i++) {
// addBlock();
BlockInfo newBI = blocks[i];
getBlockManager().addINode(newBI, file);
file.addBlock(newBI);
}
}
}
INodeDirectory addToParent(byte[] src, INodeDirectory parentINode,
INode newNode, boolean propagateModTime) throws UnresolvedLinkException {
// NOTE: This does not update space counts for parents
@ -417,28 +491,33 @@ public class FSDirectory implements Closeable {
writeLock();
try {
// modify file-> block and blocksMap
fileNode.removeLastBlock(block);
getBlockManager().removeBlockFromMap(block);
unprotectedRemoveBlock(path, fileNode, block);
// write modified block locations to log
fsImage.getEditLog().logOpenFile(path, fileNode);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+path+" with "+block
+" block is removed from the file system");
}
// update space consumed
INode[] pathINodes = getExistingPathINodes(path);
updateCount(pathINodes, pathINodes.length-1, 0,
-fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
} finally {
writeUnlock();
}
return true;
}
void unprotectedRemoveBlock(String path, INodeFileUnderConstruction fileNode,
Block block) throws IOException {
// modify file-> block and blocksMap
fileNode.removeLastBlock(block);
getBlockManager().removeBlockFromMap(block);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* FSDirectory.removeBlock: "
+path+" with "+block
+" block is removed from the file system");
}
// update space consumed
INode[] pathINodes = getExistingPathINodes(path);
updateCount(pathINodes, pathINodes.length-1, 0,
-fileNode.getPreferredBlockSize()*fileNode.getReplication(), true);
}
/**
* @see #unprotectedRenameTo(String, String, long)
* @deprecated Use {@link #renameTo(String, String, Rename...)} instead.

View File

@ -187,31 +187,53 @@ public class FSEditLogLoader {
" clientMachine " + addCloseOp.clientMachine);
}
fsDir.unprotectedDelete(addCloseOp.path, addCloseOp.mtime);
// There are four cases here:
// 1. OP_ADD to create a new file
// 2. OP_ADD to update file blocks
// 3. OP_ADD to open file for append
// 4. OP_CLOSE to close the file
// add to the file tree
INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
addCloseOp.path, permissions,
blocks, replication,
addCloseOp.mtime, addCloseOp.atime, blockSize);
if (addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
//
// Replace current node with a INodeUnderConstruction.
// Recreate in-memory lease record.
//
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
node.getLocalNameBytes(),
node.getReplication(),
node.getModificationTime(),
node.getPreferredBlockSize(),
node.getBlocks(),
node.getPermissionStatus(),
addCloseOp.clientName,
addCloseOp.clientMachine,
null);
fsDir.replaceNode(addCloseOp.path, node, cons);
fsNamesys.leaseManager.addLease(cons.getClientName(),
addCloseOp.path);
// See if the file already exists
INodeFile oldFile = fsDir.getFileINode(addCloseOp.path);
if (oldFile == null) { // OP_ADD for a new file
assert addCloseOp.opCode == FSEditLogOpCodes.OP_ADD :
"Expected opcode OP_ADD, but got " + addCloseOp.opCode;
fsDir.unprotectedAddFile(
addCloseOp.path, permissions, blocks, replication,
addCloseOp.mtime, addCloseOp.atime, blockSize,
addCloseOp.clientName, addCloseOp.clientMachine);
} else {
fsDir.updateFile(oldFile,
addCloseOp.path, permissions, blocks, replication,
addCloseOp.mtime, addCloseOp.atime, blockSize);
if(addCloseOp.opCode == FSEditLogOpCodes.OP_CLOSE) { // OP_CLOSE
assert oldFile.isUnderConstruction() :
"File is not under construction: " + addCloseOp.path;
fsNamesys.getBlockManager().completeBlock(
oldFile, blocks.length-1, true);
INodeFile newFile =
((INodeFileUnderConstruction)oldFile).convertToInodeFile();
fsDir.replaceNode(addCloseOp.path, oldFile, newFile);
} else if(! oldFile.isUnderConstruction()) { // OP_ADD for append
INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
oldFile.getLocalNameBytes(),
oldFile.getReplication(),
oldFile.getModificationTime(),
oldFile.getPreferredBlockSize(),
oldFile.getBlocks(),
oldFile.getPermissionStatus(),
addCloseOp.clientName,
addCloseOp.clientMachine,
null);
fsDir.replaceNode(addCloseOp.path, oldFile, cons);
}
}
// Update file lease
if(addCloseOp.opCode == FSEditLogOpCodes.OP_ADD) {
fsNamesys.leaseManager.addLease(addCloseOp.clientName, addCloseOp.path);
} else { // Ops.OP_CLOSE
fsNamesys.leaseManager.removeLease(
((INodeFileUnderConstruction)oldFile).getClientName(), addCloseOp.path);
}
break;
}

View File

@ -4249,7 +4249,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
@Override // NameNodeMXBean
public String getVersion() {
return VersionInfo.getVersion();
return VersionInfo.getVersion() + ", r" + VersionInfo.getRevision();
}
@Override // NameNodeMXBean

View File

@ -41,8 +41,20 @@ public class INodeFileUnderConstruction extends INodeFile {
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) {
super(permissions.applyUMask(UMASK), 0, replication, modTime, modTime,
preferredBlockSize);
this(permissions, 0, replication, preferredBlockSize, modTime,
clientName, clientMachine, clientNode);
}
INodeFileUnderConstruction(PermissionStatus permissions,
int nrBlocks,
short replication,
long preferredBlockSize,
long modTime,
String clientName,
String clientMachine,
DatanodeDescriptor clientNode) {
super(permissions.applyUMask(UMASK), nrBlocks, replication,
modTime, modTime, preferredBlockSize);
this.clientName = clientName;
this.clientMachine = clientMachine;
this.clientNode = clientNode;

View File

@ -25,6 +25,7 @@ import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.PrintStream;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URL;
import java.net.URLConnection;
import java.security.PrivilegedExceptionAction;
@ -49,6 +50,7 @@ import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -201,7 +203,8 @@ public class DelegationTokenFetcher {
static public Credentials getDTfromRemote(String nnAddr,
String renewer) throws IOException {
DataInputStream dis = null;
InetSocketAddress serviceAddr = NetUtils.createSocketAddr(nnAddr);
try {
StringBuffer url = new StringBuffer();
if (renewer != null) {
@ -221,9 +224,7 @@ public class DelegationTokenFetcher {
ts.readFields(dis);
for(Token<?> token: ts.getAllTokens()) {
token.setKind(HftpFileSystem.TOKEN_KIND);
token.setService(new Text(SecurityUtil.buildDTServiceName
(remoteURL.toURI(),
DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT)));
SecurityUtil.setTokenService(token, serviceAddr);
}
return ts;
} catch (Exception e) {

View File

@ -882,6 +882,8 @@ public class MiniDFSCluster {
if(dn == null)
throw new IOException("Cannot start DataNode in "
+ dnConf.get(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY));
//NOTE: the following is true if and only if:
// hadoop.security.token.service.use_ip=true
//since the HDFS does things based on IP:port, we need to add the mapping
//for IP:port to rackId
String ipAddr = dn.getSelfAddr().getAddress().getHostAddress();

View File

@ -72,12 +72,20 @@ public class TestAbandonBlock {
// Now abandon the last block
DFSClient dfsclient = DFSClientAdapter.getDFSClient((DistributedFileSystem)fs);
LocatedBlocks blocks = dfsclient.getNamenode().getBlockLocations(src, 0, 1);
LocatedBlocks blocks =
dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
int orginalNumBlocks = blocks.locatedBlockCount();
LocatedBlock b = blocks.getLastLocatedBlock();
dfsclient.getNamenode().abandonBlock(b.getBlock(), src, dfsclient.clientName);
// And close the file
fout.close();
// Close cluster and check the block has been abandoned after restart
cluster.restartNameNode();
blocks = dfsclient.getNamenode().getBlockLocations(src, 0, Integer.MAX_VALUE);
assert orginalNumBlocks == blocks.locatedBlockCount() + 1 :
"Blocks " + b + " has not been abandoned.";
}
@Test

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.PrintStream;
@ -35,16 +36,15 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.hdfs.tools.DFSck;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Before;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import junit.framework.Assert;
/**
* Class is used to test client reporting corrupted block replica to name node.
@ -305,9 +305,9 @@ public class TestClientReportBadBlock {
*/
private static void corruptBlock(final ExtendedBlock block, final DataNode dn)
throws FileNotFoundException, IOException {
final FSDataset data = (FSDataset) dn.getFSDataset();
final RandomAccessFile raFile = new RandomAccessFile(
data.getBlockFile(block), "rw");
final File f = DataNodeTestUtils.getBlockFile(
dn, block.getBlockPoolId(), block.getLocalBlock());
final RandomAccessFile raFile = new RandomAccessFile(f, "rw");
final byte[] bytes = new byte[(int) BLOCK_SIZE];
for (int i = 0; i < BLOCK_SIZE; i++) {
bytes[i] = (byte) (i);

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
@ -1164,9 +1163,9 @@ public class TestDFSShell extends TestCase {
String poolId = cluster.getNamesystem().getBlockPoolId();
Iterable<Block>[] blocks = cluster.getAllBlockReports(poolId);
for(int i = 0; i < blocks.length; i++) {
FSDataset ds = (FSDataset)datanodes.get(i).getFSDataset();
DataNode dn = datanodes.get(i);
for(Block b : blocks[i]) {
files.add(DataNodeTestUtils.getBlockFile(ds, poolId, b.getBlockId()));
files.add(DataNodeTestUtils.getFile(dn, poolId, b.getBlockId()));
}
}
return files;

View File

@ -17,29 +17,27 @@
*/
package org.apache.hadoop.hdfs;
import static org.junit.Assert.*;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.List;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.HardLink;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.junit.Test;
/**
* This class tests the building blocks that are needed to
@ -133,14 +131,13 @@ public class TestFileAppend{
LocatedBlocks locations = client.getNamenode().getBlockLocations(
file1.toString(), 0, Long.MAX_VALUE);
List<LocatedBlock> blocks = locations.getLocatedBlocks();
FSDataset dataset = (FSDataset) dn[0].data;
//
// Create hard links for a few of the blocks
//
for (int i = 0; i < blocks.size(); i = i + 2) {
ExtendedBlock b = blocks.get(i).getBlock();
final File f = DataNodeTestUtils.getBlockFile(dataset,
final File f = DataNodeTestUtils.getFile(dn[0],
b.getBlockPoolId(), b.getLocalBlock().getBlockId());
File link = new File(f.toString() + ".link");
System.out.println("Creating hardlink for File " + f + " to " + link);
@ -154,7 +151,7 @@ public class TestFileAppend{
ExtendedBlock b = blocks.get(i).getBlock();
System.out.println("testCopyOnWrite detaching block " + b);
assertTrue("Detaching block " + b + " should have returned true",
dataset.unlinkBlock(b, 1));
DataNodeTestUtils.unlinkBlock(dn[0], b, 1));
}
// Since the blocks were already detached earlier, these calls should
@ -164,7 +161,7 @@ public class TestFileAppend{
ExtendedBlock b = blocks.get(i).getBlock();
System.out.println("testCopyOnWrite detaching block " + b);
assertTrue("Detaching block " + b + " should have returned false",
!dataset.unlinkBlock(b, 1));
!DataNodeTestUtils.unlinkBlock(dn[0], b, 1));
}
} finally {

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
@ -35,7 +36,7 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -199,8 +200,9 @@ public class TestFileAppend3 extends junit.framework.TestCase {
DatanodeInfo[] datanodeinfos = lb.getLocations();
assertEquals(repl, datanodeinfos.length);
final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
final FSDataset data = (FSDataset)dn.getFSDataset();
final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk), "rw");
final File f = DataNodeTestUtils.getBlockFile(
dn, blk.getBlockPoolId(), blk.getLocalBlock());
final RandomAccessFile raf = new RandomAccessFile(f, "rw");
AppendTestUtil.LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
assertEquals(len1, raf.length());
raf.setLength(0);

View File

@ -17,6 +17,21 @@
*/
package org.apache.hadoop.hdfs;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
@ -36,7 +51,6 @@ import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -45,7 +59,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
@ -829,10 +842,9 @@ public class TestFileCreation extends junit.framework.TestCase {
int successcount = 0;
for(DatanodeInfo datanodeinfo: locatedblock.getLocations()) {
DataNode datanode = cluster.getDataNode(datanodeinfo.ipcPort);
FSDataset dataset = (FSDataset)datanode.data;
ExtendedBlock blk = locatedblock.getBlock();
Block b = dataset.getStoredBlock(blk.getBlockPoolId(), blk.getBlockId());
final File blockfile = DataNodeTestUtils.getBlockFile(dataset,
Block b = datanode.data.getStoredBlock(blk.getBlockPoolId(), blk.getBlockId());
final File blockfile = DataNodeTestUtils.getFile(datanode,
blk.getBlockPoolId(), b.getBlockId());
System.out.println("blockfile=" + blockfile);
if (blockfile != null) {

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.net.URISyntaxException;
import java.net.URI;
import java.net.URL;
import java.net.HttpURLConnection;
import java.util.Random;
@ -232,4 +233,164 @@ public class TestHftpFileSystem {
in.seek(7);
assertEquals('7', in.read());
}
public void resetFileSystem() throws IOException {
// filesystem caching has a quirk/bug that it caches based on the user's
// given uri. the result is if a filesystem is instantiated with no port,
// it gets the default port. then if the default port is changed,
// and another filesystem is instantiated with no port, the prior fs
// is returned, not a new one using the changed port. so let's flush
// the cache between tests...
FileSystem.closeAll();
}
@Test
public void testHftpDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort());
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri());
assertEquals(
"127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
fs.getCanonicalServiceName()
);
}
@Test
public void testHftpCustomDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration();
conf.setInt("dfs.http.port", 123);
conf.setInt("dfs.https.port", 456);
URI uri = URI.create("hftp://localhost");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(123, fs.getDefaultPort());
assertEquals(456, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri());
assertEquals(
"127.0.0.1:456",
fs.getCanonicalServiceName()
);
}
@Test
public void testHftpCustomUriPortWithDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration();
URI uri = URI.create("hftp://localhost:123");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT, fs.getDefaultPort());
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri());
assertEquals(
"127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
fs.getCanonicalServiceName()
);
}
@Test
public void testHftpCustomUriPortWithCustomDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration();
conf.setInt("dfs.http.port", 123);
conf.setInt("dfs.https.port", 456);
URI uri = URI.create("hftp://localhost:789");
HftpFileSystem fs = (HftpFileSystem) FileSystem.get(uri, conf);
assertEquals(123, fs.getDefaultPort());
assertEquals(456, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri());
assertEquals(
"127.0.0.1:456",
fs.getCanonicalServiceName()
);
}
///
@Test
public void testHsftpDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration();
URI uri = URI.create("hsftp://localhost");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort());
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri());
assertEquals(
"127.0.0.1:"+DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT,
fs.getCanonicalServiceName()
);
}
@Test
public void testHsftpCustomDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration();
conf.setInt("dfs.http.port", 123);
conf.setInt("dfs.https.port", 456);
URI uri = URI.create("hsftp://localhost");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(456, fs.getDefaultPort());
assertEquals(456, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri());
assertEquals(
"127.0.0.1:456",
fs.getCanonicalServiceName()
);
}
@Test
public void testHsftpCustomUriPortWithDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration();
URI uri = URI.create("hsftp://localhost:123");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultPort());
assertEquals(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri());
assertEquals(
"127.0.0.1:123",
fs.getCanonicalServiceName()
);
}
@Test
public void testHsftpCustomUriPortWithCustomDefaultPorts() throws IOException {
resetFileSystem();
Configuration conf = new Configuration();
conf.setInt("dfs.http.port", 123);
conf.setInt("dfs.https.port", 456);
URI uri = URI.create("hsftp://localhost:789");
HsftpFileSystem fs = (HsftpFileSystem) FileSystem.get(uri, conf);
assertEquals(456, fs.getDefaultPort());
assertEquals(456, fs.getDefaultSecurePort());
assertEquals(uri, fs.getUri());
assertEquals(
"127.0.0.1:789",
fs.getCanonicalServiceName()
);
}
}

View File

@ -22,6 +22,8 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
/**
@ -39,7 +41,18 @@ public class DataNodeTestUtils {
return dn.getDNRegistrationForBP(bpid);
}
public static File getBlockFile(FSDataset fsdataset, String bpid, long bid) {
return fsdataset.getFile(bpid, bid);
public static File getFile(DataNode dn, String bpid, long bid) {
return ((FSDataset)dn.getFSDataset()).getFile(bpid, bid);
}
public static File getBlockFile(DataNode dn, String bpid, Block b
) throws IOException {
return ((FSDataset)dn.getFSDataset()).getBlockFile(bpid, b);
}
public static boolean unlinkBlock(DataNode dn, ExtendedBlock block, int numLinks
) throws IOException {
ReplicaInfo info = ((FSDataset)dn.getFSDataset()).getReplicaInfo(block);
return info.unlinkBlock(numLinks);
}
}

View File

@ -116,10 +116,12 @@ public class TestEditLog extends TestCase {
int numTransactions;
short replication = 3;
long blockSize = 64;
int startIndex;
Transactions(FSNamesystem ns, int num) {
Transactions(FSNamesystem ns, int numTx, int startIdx) {
namesystem = ns;
numTransactions = num;
numTransactions = numTx;
startIndex = startIdx;
}
// add a bunch of transactions.
@ -131,8 +133,8 @@ public class TestEditLog extends TestCase {
for (int i = 0; i < numTransactions; i++) {
INodeFileUnderConstruction inode = new INodeFileUnderConstruction(
p, replication, blockSize, 0, "", "", null);
editLog.logOpenFile("/filename" + i, inode);
editLog.logCloseFile("/filename" + i, inode);
editLog.logOpenFile("/filename" + startIndex + i, inode);
editLog.logCloseFile("/filename" + startIndex + i, inode);
editLog.logSync();
}
}
@ -280,7 +282,8 @@ public class TestEditLog extends TestCase {
// Create threads and make them run transactions concurrently.
Thread threadId[] = new Thread[NUM_THREADS];
for (int i = 0; i < NUM_THREADS; i++) {
Transactions trans = new Transactions(namesystem, NUM_TRANSACTIONS);
Transactions trans =
new Transactions(namesystem, NUM_TRANSACTIONS, i*NUM_TRANSACTIONS);
threadId[i] = new Thread(trans, "TransactionThread-" + i);
threadId[i].start();
}
@ -293,11 +296,16 @@ public class TestEditLog extends TestCase {
i--; // retry
}
}
// Reopen some files as for append
Transactions trans =
new Transactions(namesystem, NUM_TRANSACTIONS, NUM_TRANSACTIONS / 2);
trans.run();
// Roll another time to finalize edits_inprogress_3
fsimage.rollEditLog();
long expectedTxns = (NUM_THREADS * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns
long expectedTxns = ((NUM_THREADS+1) * 2 * NUM_TRANSACTIONS) + 2; // +2 for start/end txns
// Verify that we can read in all the transactions that we have written.
// If there were any corruptions, it is likely that the reading in

View File

@ -25,6 +25,7 @@ import javax.management.ObjectName;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.util.VersionInfo;
import org.junit.Test;
import junit.framework.Assert;
@ -57,6 +58,8 @@ public class TestNameNodeMXBean {
// get attribute "Version"
String version = (String) mbs.getAttribute(mxbeanName, "Version");
Assert.assertEquals(fsn.getVersion(), version);
Assert.assertTrue(version.equals(VersionInfo.getVersion()
+ ", r" + VersionInfo.getRevision()));
// get attribute "Used"
Long used = (Long) mbs.getAttribute(mxbeanName, "Used");
Assert.assertEquals(fsn.getUsed(), used.longValue());

View File

@ -0,0 +1,26 @@
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<configuration>
<!-- Turn off SSL server authentication for tests by default -->
<property>
<name>ssl.client.do.not.authenticate.server</name>
<value>true</value>
</property>
</configuration>

View File

@ -173,6 +173,13 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3774. Moved yarn-default.xml to hadoop-yarn-common from
hadoop-server-common. (Mahadev Konar via vinodkv)
MAPREDUCE-3771. Un-deprecated the old mapred apis, port of MAPREDUCE-1735.
(acmurthy)
MAPREDUCE-3784. Fixed CapacityScheduler so that maxActiveApplications and
maxActiveApplicationsPerUser per queue are not too low for small
clusters. (Arun C Murthy via vinodkv)
OPTIMIZATIONS
MAPREDUCE-3567. Extraneous JobConf objects in AM heap. (Vinod Kumar
@ -621,6 +628,37 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3427. Fix streaming unit tests broken after mavenization.
(Hitesh Shah via acmurthy)
MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files.
(Arun C Murthy via sseth)
MAPREDUCE-3752. Modified application limits to include queue max-capacities
besides the usual user limits. (Arun C Murthy via vinodkv)
MAPREDUCE-3744. Fix the yarn logs command line. Improve error messages for
mapred job -logs. (Jason Lowe via sseth)
MAPREDUCE-3780. Fixed a bug where applications killed before getting
activated were not getting cleaned up properly. (Hitesh Shah via acmurthy)
MAPREDUCE-3708. Metrics: Incorrect Apps Submitted Count (Bhallamudi via
mahadev)
MAPREDUCE-3727. jobtoken location property in jobconf refers to wrong
jobtoken file (tucu)
MAPREDUCE-3711. Fixed MR AM recovery so that only single selected task
output is recovered and thus reduce the unnecessarily bloated recovery
time. (Robert Joseph Evans via vinodkv)
MAPREDUCE-3760. Changed active nodes list to not contain unhealthy nodes
on the webUI and metrics. (vinodkv)
MAPREDUCE-3417. Fixed job-access-controls to work with MR AM and
JobHistoryServer web-apps. (Jonathan Eagles via vinodkv)
MAPREDUCE-3803. Fix broken build of raid contrib due to HDFS-2864.
(Ravi Prakash via suresh)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -431,9 +431,6 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
@Override
public boolean checkAccess(UserGroupInformation callerUGI,
JobACL jobOperation) {
if (!UserGroupInformation.isSecurityEnabled()) {
return true;
}
AccessControlList jobACL = jobACLs.get(jobOperation);
return aclsManager.checkAccess(callerUGI, jobOperation, username, jobACL);
}

View File

@ -559,6 +559,7 @@ public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
}
private void internalError(TaskEventType type) {
LOG.error("Invalid event " + type + " on Task " + this.taskId);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
this.taskId.getJobId(), "Invalid event " + type +
" on Task " + this.taskId));

View File

@ -103,6 +103,7 @@ public class LocalContainerAllocator extends RMCommunicator
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.INTERNAL_ERROR));
throw new YarnException("Could not contact RM after " +

View File

@ -32,8 +32,10 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser;
import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.JobInfo;
@ -191,6 +193,11 @@ public class RecoveryService extends CompositeService implements Recovery {
in = fc.open(historyFile);
JobHistoryParser parser = new JobHistoryParser(in);
jobInfo = parser.parse();
Exception parseException = parser.getParseException();
if (parseException != null) {
LOG.info("Got an error parsing job-history file " + historyFile +
", ignoring incomplete events.", parseException);
}
Map<org.apache.hadoop.mapreduce.TaskID, TaskInfo> taskInfos = jobInfo
.getAllTasks();
for (TaskInfo taskInfo : taskInfos.values()) {
@ -353,16 +360,24 @@ public class RecoveryService extends CompositeService implements Recovery {
//recover the task output
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
attInfo.getAttemptId());
try {
committer.recoverTask(taskContext);
try {
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);
if(type == TaskType.REDUCE || (type == TaskType.MAP && numReducers <= 0)) {
committer.recoverTask(taskContext);
LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
} else {
LOG.info("Will not try to recover output for "
+ taskContext.getTaskAttemptID());
}
} catch (IOException e) {
LOG.error("Caught an exception while trying to recover task "+aId, e);
actualHandler.handle(new JobDiagnosticsUpdateEvent(
aId.getTaskId().getJobId(), "Error in recovering task output " +
e.getMessage()));
actualHandler.handle(new JobEvent(aId.getTaskId().getJobId(),
JobEventType.INTERNAL_ERROR));
}
LOG.info("Recovered output from task attempt " + attInfo.getAttemptId());
// send the done event
LOG.info("Sending done event to " + aId);

View File

@ -543,6 +543,7 @@ public class RMContainerAllocator extends RMContainerRequestor
// This can happen when the connection to the RM has gone down. Keep
// re-trying until the retryInterval has expired.
if (System.currentTimeMillis() - retrystartTime >= retryInterval) {
LOG.error("Could not contact RM after " + retryInterval + " milliseconds.");
eventHandler.handle(new JobEvent(this.getJob().getID(),
JobEventType.INTERNAL_ERROR));
throw new YarnException("Could not contact RM after " +

View File

@ -95,7 +95,13 @@ public class AppController extends Controller implements AMParams {
* Render the /job page
*/
public void job() {
requireJob();
try {
requireJob();
}
catch (Exception e) {
renderText(e.getMessage());
return;
}
render(jobPage());
}
@ -110,7 +116,13 @@ public class AppController extends Controller implements AMParams {
* Render the /jobcounters page
*/
public void jobCounters() {
requireJob();
try {
requireJob();
}
catch (Exception e) {
renderText(e.getMessage());
return;
}
if (app.getJob() != null) {
setTitle(join("Counters for ", $(JOB_ID)));
}
@ -121,7 +133,13 @@ public class AppController extends Controller implements AMParams {
* Display a page showing a task's counters
*/
public void taskCounters() {
requireTask();
try {
requireTask();
}
catch (Exception e) {
renderText(e.getMessage());
return;
}
if (app.getTask() != null) {
setTitle(StringHelper.join("Counters for ", $(TASK_ID)));
}
@ -140,7 +158,13 @@ public class AppController extends Controller implements AMParams {
* @throws IOException on any error.
*/
public void singleJobCounter() throws IOException{
requireJob();
try {
requireJob();
}
catch (Exception e) {
renderText(e.getMessage());
return;
}
set(COUNTER_GROUP, URLDecoder.decode($(COUNTER_GROUP), "UTF-8"));
set(COUNTER_NAME, URLDecoder.decode($(COUNTER_NAME), "UTF-8"));
if (app.getJob() != null) {
@ -155,7 +179,13 @@ public class AppController extends Controller implements AMParams {
* @throws IOException on any error.
*/
public void singleTaskCounter() throws IOException{
requireTask();
try {
requireTask();
}
catch (Exception e) {
renderText(e.getMessage());
return;
}
set(COUNTER_GROUP, URLDecoder.decode($(COUNTER_GROUP), "UTF-8"));
set(COUNTER_NAME, URLDecoder.decode($(COUNTER_NAME), "UTF-8"));
if (app.getTask() != null) {
@ -176,7 +206,13 @@ public class AppController extends Controller implements AMParams {
* Render the /tasks page
*/
public void tasks() {
requireJob();
try {
requireJob();
}
catch (Exception e) {
renderText(e.getMessage());
return;
}
if (app.getJob() != null) {
try {
String tt = $(TASK_TYPE);
@ -201,7 +237,13 @@ public class AppController extends Controller implements AMParams {
* Render the /task page
*/
public void task() {
requireTask();
try {
requireTask();
}
catch (Exception e) {
renderText(e.getMessage());
return;
}
if (app.getTask() != null) {
setTitle(join("Attempts for ", $(TASK_ID)));
}
@ -219,7 +261,13 @@ public class AppController extends Controller implements AMParams {
* Render the attempts page
*/
public void attempts() {
requireJob();
try {
requireJob();
}
catch (Exception e) {
renderText(e.getMessage());
return;
}
if (app.getJob() != null) {
try {
String taskType = $(TASK_TYPE);
@ -252,6 +300,13 @@ public class AppController extends Controller implements AMParams {
*/
public void conf() {
requireJob();
try {
requireJob();
}
catch (Exception e) {
renderText(e.getMessage());
return;
}
render(confPage());
}
@ -280,41 +335,43 @@ public class AppController extends Controller implements AMParams {
void accessDenied(String s) {
setStatus(HttpServletResponse.SC_FORBIDDEN);
setTitle(join("Access denied: ", s));
throw new RuntimeException("Access denied: " + s);
}
/**
* check for job access.
* @param job the job that is being accessed
* @return True if the requesting user has permission to view the job
*/
void checkAccess(Job job) {
boolean checkAccess(Job job) {
UserGroupInformation callerUgi = UserGroupInformation.createRemoteUser(
request().getRemoteUser());
if (!job.checkAccess(callerUgi, JobACL.VIEW_JOB)) {
accessDenied("User " + request().getRemoteUser() + " does not have " +
" permissions.");
}
return job.checkAccess(callerUgi, JobACL.VIEW_JOB);
}
/**
* Ensure that a JOB_ID was passed into the page.
*/
public void requireJob() {
try {
if ($(JOB_ID).isEmpty()) {
throw new RuntimeException("missing job ID");
}
JobId jobID = MRApps.toJobID($(JOB_ID));
app.setJob(app.context.getJob(jobID));
if (app.getJob() == null) {
notFound($(JOB_ID));
}
/* check for acl access */
Job job = app.context.getJob(jobID);
checkAccess(job);
} catch (Exception e) {
badRequest(e.getMessage() == null ?
e.getClass().getName() : e.getMessage());
if ($(JOB_ID).isEmpty()) {
badRequest("missing job ID");
throw new RuntimeException("Bad Request: Missing job ID");
}
JobId jobID = MRApps.toJobID($(JOB_ID));
app.setJob(app.context.getJob(jobID));
if (app.getJob() == null) {
notFound($(JOB_ID));
throw new RuntimeException("Not Found: " + $(JOB_ID));
}
/* check for acl access */
Job job = app.context.getJob(jobID);
if (!checkAccess(job)) {
accessDenied("User " + request().getRemoteUser() + " does not have " +
" permission to view job " + $(JOB_ID));
throw new RuntimeException("Access denied: User " +
request().getRemoteUser() + " does not have permission to view job " +
$(JOB_ID));
}
}
@ -322,24 +379,30 @@ public class AppController extends Controller implements AMParams {
* Ensure that a TASK_ID was passed into the page.
*/
public void requireTask() {
try {
if ($(TASK_ID).isEmpty()) {
throw new RuntimeException("missing task ID");
if ($(TASK_ID).isEmpty()) {
badRequest("missing task ID");
throw new RuntimeException("missing task ID");
}
TaskId taskID = MRApps.toTaskID($(TASK_ID));
Job job = app.context.getJob(taskID.getJobId());
app.setJob(job);
if (app.getJob() == null) {
notFound(MRApps.toString(taskID.getJobId()));
throw new RuntimeException("Not Found: " + $(JOB_ID));
} else {
app.setTask(app.getJob().getTask(taskID));
if (app.getTask() == null) {
notFound($(TASK_ID));
throw new RuntimeException("Not Found: " + $(TASK_ID));
}
TaskId taskID = MRApps.toTaskID($(TASK_ID));
Job job = app.context.getJob(taskID.getJobId());
app.setJob(job);
if (app.getJob() == null) {
notFound(MRApps.toString(taskID.getJobId()));
} else {
app.setTask(app.getJob().getTask(taskID));
if (app.getTask() == null) {
notFound($(TASK_ID));
}
}
checkAccess(job);
} catch (Exception e) {
badRequest(e.getMessage());
}
if (!checkAccess(job)) {
accessDenied("User " + request().getRemoteUser() + " does not have " +
" permission to view job " + $(JOB_ID));
throw new RuntimeException("Access denied: User " +
request().getRemoteUser() + " does not have permission to view job " +
$(JOB_ID));
}
}
}

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
@SuppressWarnings({"unchecked", "rawtypes"})
public class TestRecovery {
private static final Log LOG = LogFactory.getLog(TestRecovery.class);
@ -112,7 +113,7 @@ public class TestRecovery {
Assert.assertEquals("Reduce Task state not correct",
TaskState.RUNNING, reduceTask.getReport().getTaskState());
//send the fail signal to the 1st map task attempt
//send the fail signal to the 1st map task attempt
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
@ -193,7 +194,7 @@ public class TestRecovery {
//RUNNING state
app.waitForState(task2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd map task
//send the done signal to the 2nd map task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
mapTask2.getAttempts().values().iterator().next().getID(),
@ -349,6 +350,151 @@ public class TestRecovery {
validateOutput();
}
@Test
public void testOutputRecoveryMapsOnly() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(2, 1, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task mapTask2 = it.next();
Task reduceTask1 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
// write output corresponding to map1 (This is just to validate that it is
//no included in the output)
writeBadOutput(task1Attempt1, conf);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
//stop the app before the job completes.
app.stop();
//rerun
//in rerun the map will be recovered from previous run
app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", true);
conf.setBoolean("mapred.reducer.new-api", true);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
mapTask2 = it.next();
reduceTask1 = it.next();
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port after recovery
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
app.waitForState(mapTask2, TaskState.RUNNING);
TaskAttempt task2Attempt1 = mapTask2.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task2Attempt1, TaskAttemptState.RUNNING);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task2Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask2, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task2Attempt1.getShufflePort());
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
// write output corresponding to reduce1
writeOutput(reduce1Attempt1, conf);
//send the done signal to the 1st reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first reduce task to complete
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
validateOutput();
}
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,
TypeConverter.fromYarn(attempt.getID()));
TextOutputFormat<?, ?> theOutputFormat = new TextOutputFormat();
RecordWriter theRecordWriter = theOutputFormat
.getRecordWriter(tContext);
NullWritable nullWritable = NullWritable.get();
try {
theRecordWriter.write(key2, val2);
theRecordWriter.write(null, nullWritable);
theRecordWriter.write(null, val2);
theRecordWriter.write(nullWritable, val1);
theRecordWriter.write(key1, nullWritable);
theRecordWriter.write(key2, null);
theRecordWriter.write(null, null);
theRecordWriter.write(key1, val1);
} finally {
theRecordWriter.close(tContext);
}
OutputFormat outputFormat = ReflectionUtils.newInstance(
tContext.getOutputFormatClass(), conf);
OutputCommitter committer = outputFormat.getOutputCommitter(tContext);
committer.commitTask(tContext);
}
private void writeOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,

View File

@ -28,6 +28,11 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.JobACL;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl.JobNoTasksCompletedTransition;
@ -37,6 +42,7 @@ import org.apache.hadoop.mapreduce.v2.api.records.JobState;
import org.apache.hadoop.mapreduce.v2.api.records.TaskId;
import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics;
import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.event.EventHandler;
import org.junit.Test;
import org.junit.Assert;
@ -134,4 +140,61 @@ public class TestJobImpl {
t.testCheckJobCompleteSuccess();
t.testCheckJobCompleteSuccessFailed();
}
@Test
public void testCheckAccess() {
// Create two unique users
String user1 = System.getProperty("user.name");
String user2 = user1 + "1234";
UserGroupInformation ugi1 = UserGroupInformation.createRemoteUser(user1);
UserGroupInformation ugi2 = UserGroupInformation.createRemoteUser(user2);
// Create the job
JobID jobID = JobID.forName("job_1234567890000_0001");
JobId jobId = TypeConverter.toYarn(jobID);
// Setup configuration access only to user1 (owner)
Configuration conf1 = new Configuration();
conf1.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
conf1.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
// Verify access
JobImpl job1 = new JobImpl(jobId, null, conf1, null, null, null, null, null,
null, null, null, true, null, 0, null);
Assert.assertTrue(job1.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertFalse(job1.checkAccess(ugi2, JobACL.VIEW_JOB));
// Setup configuration access to the user1 (owner) and user2
Configuration conf2 = new Configuration();
conf2.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
conf2.set(MRJobConfig.JOB_ACL_VIEW_JOB, user2);
// Verify access
JobImpl job2 = new JobImpl(jobId, null, conf2, null, null, null, null, null,
null, null, null, true, null, 0, null);
Assert.assertTrue(job2.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job2.checkAccess(ugi2, JobACL.VIEW_JOB));
// Setup configuration access with security enabled and access to all
Configuration conf3 = new Configuration();
conf3.setBoolean(MRConfig.MR_ACLS_ENABLED, true);
conf3.set(MRJobConfig.JOB_ACL_VIEW_JOB, "*");
// Verify access
JobImpl job3 = new JobImpl(jobId, null, conf3, null, null, null, null, null,
null, null, null, true, null, 0, null);
Assert.assertTrue(job3.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job3.checkAccess(ugi2, JobACL.VIEW_JOB));
// Setup configuration access without security enabled
Configuration conf4 = new Configuration();
conf4.setBoolean(MRConfig.MR_ACLS_ENABLED, false);
conf4.set(MRJobConfig.JOB_ACL_VIEW_JOB, "");
// Verify access
JobImpl job4 = new JobImpl(jobId, null, conf4, null, null, null, null, null,
null, null, null, true, null, 0, null);
Assert.assertTrue(job4.checkAccess(ugi1, JobACL.VIEW_JOB));
Assert.assertTrue(job4.checkAccess(ugi2, JobACL.VIEW_JOB));
}
}

View File

@ -119,9 +119,7 @@ import org.apache.hadoop.mapreduce.Job;
* @see org.apache.hadoop.mapred.JobConf
* @see org.apache.hadoop.mapred.JobClient
* @see org.apache.hadoop.mapreduce.Job
* @deprecated Use methods on {@link Job}.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class DistributedCache extends

View File

@ -16,8 +16,4 @@
* limitations under the License.
*
*/
/**
* <b>Deprecated.</b> Use {@link org.apache.hadoop.mapreduce.Job} instead.
*/
@Deprecated
package org.apache.hadoop.filecache;

View File

@ -62,9 +62,7 @@ import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
* {@link JobClient#getClusterStatus()}.</p>
*
* @see JobClient
* @deprecated Use {@link ClusterMetrics} or {@link TaskTrackerInfo} instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ClusterStatus implements Writable {

View File

@ -18,27 +18,24 @@
package org.apache.hadoop.mapred;
import static org.apache.hadoop.mapreduce.util.CountersStrings.parseEscapedCompactString;
import static org.apache.hadoop.mapreduce.util.CountersStrings.toEscapedCompactString;
import java.text.ParseException;
import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.mapreduce.FileSystemCounter;
import org.apache.hadoop.mapreduce.counters.AbstractCounterGroup;
import org.apache.hadoop.mapreduce.counters.AbstractCounters;
import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
import org.apache.hadoop.mapreduce.counters.CounterGroupFactory;
import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
import org.apache.hadoop.mapreduce.counters.FileSystemCounterGroup;
import org.apache.hadoop.mapreduce.counters.FrameworkCounterGroup;
import org.apache.hadoop.mapreduce.counters.GenericCounter;
import org.apache.hadoop.mapreduce.counters.Limits;
import static org.apache.hadoop.mapreduce.util.CountersStrings.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormatCounter;
/**
* A set of named counters.
@ -49,9 +46,7 @@ import static org.apache.hadoop.mapreduce.util.CountersStrings.*;
*
* <p><code>Counters</code> are bunched into {@link Group}s, each comprising of
* counters from a particular <code>Enum</code> class.
* @deprecated Use {@link org.apache.hadoop.mapreduce.Counters} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Counters

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.classification.InterfaceStability;
* Used when target file already exists for any operation and
* is not configured to be overwritten.
*/
@Deprecated // may be removed after 0.23
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileAlreadyExistsException

View File

@ -54,10 +54,7 @@ import org.apache.hadoop.util.StringUtils;
* Subclasses of <code>FileInputFormat</code> can also override the
* {@link #isSplitable(FileSystem, Path)} method to ensure input-files are
* not split-up and are processed as a whole by {@link Mapper}s.
* @deprecated Use {@link org.apache.hadoop.mapreduce.lib.input.FileInputFormat}
* instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileInputFormat<K, V> implements InputFormat<K, V> {

View File

@ -19,14 +19,12 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.URI;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
/** An {@link OutputCommitter} that commits files specified
@ -42,280 +40,140 @@ public class FileOutputCommitter extends OutputCommitter {
/**
* Temporary directory name
*/
public static final String TEMP_DIR_NAME = "_temporary";
public static final String SUCCEEDED_FILE_NAME = "_SUCCESS";
static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
"mapreduce.fileoutputcommitter.marksuccessfuljobs";
public static final String TEMP_DIR_NAME =
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.PENDING_DIR_NAME;
public static final String SUCCEEDED_FILE_NAME =
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCEEDED_FILE_NAME;
static final String SUCCESSFUL_JOB_OUTPUT_DIR_MARKER =
org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.SUCCESSFUL_JOB_OUTPUT_DIR_MARKER;
private static Path getOutputPath(JobContext context) {
JobConf conf = context.getJobConf();
return FileOutputFormat.getOutputPath(conf);
}
private static Path getOutputPath(TaskAttemptContext context) {
JobConf conf = context.getJobConf();
return FileOutputFormat.getOutputPath(conf);
}
private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter wrapped = null;
private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
getWrapped(JobContext context) throws IOException {
if(wrapped == null) {
wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
getOutputPath(context), context);
}
return wrapped;
}
private org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
getWrapped(TaskAttemptContext context) throws IOException {
if(wrapped == null) {
wrapped = new org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter(
getOutputPath(context), context);
}
return wrapped;
}
/**
* Compute the path where the output of a given job attempt will be placed.
* @param context the context of the job. This is used to get the
* application attempt id.
* @return the path to store job attempt data.
*/
@Private
Path getJobAttemptPath(JobContext context) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getJobAttemptPath(context, getOutputPath(context));
}
@Private
Path getTaskAttemptPath(TaskAttemptContext context) throws IOException {
return getTaskAttemptPath(context, getOutputPath(context));
}
private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException {
Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf());
if(workPath == null) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getTaskAttemptPath(context, out);
}
return workPath;
}
/**
* Compute the path where the output of a committed task is stored until
* the entire job is committed.
* @param context the context of the task attempt
* @return the path where the output of a committed task is stored until
* the entire job is committed.
*/
Path getCommittedTaskPath(TaskAttemptContext context) {
return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter
.getCommittedTaskPath(context, getOutputPath(context));
}
public Path getWorkPath(TaskAttemptContext context, Path outputPath)
throws IOException {
return getTaskAttemptPath(context, outputPath);
}
@Override
public void setupJob(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path tmpDir =
new Path(outputPath, getJobAttemptBaseDirName(context) +
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(conf);
if (!fileSys.mkdirs(tmpDir)) {
LOG.error("Mkdirs failed to create " + tmpDir.toString());
}
}
}
// True if the job requires output.dir marked on successful job.
// Note that by default it is set to true.
private boolean shouldMarkOutputDir(JobConf conf) {
return conf.getBoolean(SUCCESSFUL_JOB_OUTPUT_DIR_MARKER, true);
getWrapped(context).setupJob(context);
}
@Override
public void commitJob(JobContext context) throws IOException {
//delete the task temp directory from the current jobtempdir
JobConf conf = context.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
Path tmpDir = new Path(outputPath, getJobAttemptBaseDirName(context) +
Path.SEPARATOR + FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(context.getConfiguration());
if (fileSys.exists(tmpDir)) {
fileSys.delete(tmpDir, true);
} else {
LOG.warn("Task temp dir could not be deleted " + tmpDir);
}
//move the job output to final place
Path jobOutputPath =
new Path(outputPath, getJobAttemptBaseDirName(context));
moveJobOutputs(outputFileSystem,
jobOutputPath, outputPath, jobOutputPath);
// delete the _temporary folder in the output folder
cleanupJob(context);
// check if the output-dir marking is required
if (shouldMarkOutputDir(context.getJobConf())) {
// create a _success file in the output folder
markOutputDirSuccessful(context);
}
}
}
// Create a _success file in the job's output folder
private void markOutputDirSuccessful(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
// get the o/p path
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
// get the filesys
FileSystem fileSys = outputPath.getFileSystem(conf);
// create a file in the output folder to mark the job completion
Path filePath = new Path(outputPath, SUCCEEDED_FILE_NAME);
fileSys.create(filePath).close();
}
}
private void moveJobOutputs(FileSystem fs, final Path origJobOutputPath,
Path finalOutputDir, Path jobOutput) throws IOException {
LOG.debug("Told to move job output from " + jobOutput
+ " to " + finalOutputDir +
" and orig job output path is " + origJobOutputPath);
if (fs.isFile(jobOutput)) {
Path finalOutputPath =
getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
if (!fs.rename(jobOutput, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) {
throw new IOException("Failed to delete earlier output of job");
}
if (!fs.rename(jobOutput, finalOutputPath)) {
throw new IOException("Failed to save output of job");
}
}
LOG.debug("Moved job output file from " + jobOutput + " to " +
finalOutputPath);
} else if (fs.getFileStatus(jobOutput).isDirectory()) {
LOG.debug("Job output file " + jobOutput + " is a dir");
FileStatus[] paths = fs.listStatus(jobOutput);
Path finalOutputPath =
getFinalPath(fs, finalOutputDir, jobOutput, origJobOutputPath);
fs.mkdirs(finalOutputPath);
LOG.debug("Creating dirs along job output path " + finalOutputPath);
if (paths != null) {
for (FileStatus path : paths) {
moveJobOutputs(fs, origJobOutputPath, finalOutputDir, path.getPath());
}
}
}
getWrapped(context).commitJob(context);
}
@Override
@Deprecated
public void cleanupJob(JobContext context) throws IOException {
JobConf conf = context.getJobConf();
// do the clean up of temporary directory
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path tmpDir = new Path(outputPath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fileSys = tmpDir.getFileSystem(conf);
context.getProgressible().progress();
if (fileSys.exists(tmpDir)) {
fileSys.delete(tmpDir, true);
} else {
LOG.warn("Output Path is Null in cleanup");
}
}
getWrapped(context).cleanupJob(context);
}
@Override
public void abortJob(JobContext context, int runState)
throws IOException {
// simply delete the _temporary dir from the o/p folder of the job
cleanupJob(context);
JobStatus.State state;
if(runState == JobStatus.State.RUNNING.getValue()) {
state = JobStatus.State.RUNNING;
} else if(runState == JobStatus.State.SUCCEEDED.getValue()) {
state = JobStatus.State.SUCCEEDED;
} else if(runState == JobStatus.State.FAILED.getValue()) {
state = JobStatus.State.FAILED;
} else if(runState == JobStatus.State.PREP.getValue()) {
state = JobStatus.State.PREP;
} else if(runState == JobStatus.State.KILLED.getValue()) {
state = JobStatus.State.KILLED;
} else {
throw new IllegalArgumentException(runState+" is not a valid runState.");
}
getWrapped(context).abortJob(context, state);
}
public void setupTask(TaskAttemptContext context) throws IOException {
// FileOutputCommitter's setupTask doesn't do anything. Because the
// temporary task directory is created on demand when the
// task is writing.
}
public void commitTask(TaskAttemptContext context)
throws IOException {
Path taskOutputPath = getTempTaskOutputPath(context);
TaskAttemptID attemptId = context.getTaskAttemptID();
JobConf job = context.getJobConf();
if (taskOutputPath != null) {
FileSystem fs = taskOutputPath.getFileSystem(job);
context.getProgressible().progress();
if (fs.exists(taskOutputPath)) {
// Move the task outputs to the current job attempt output dir
JobConf conf = context.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
FileSystem outputFileSystem = outputPath.getFileSystem(conf);
Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
moveTaskOutputs(context, outputFileSystem, jobOutputPath,
taskOutputPath);
// Delete the temporary task-specific output directory
if (!fs.delete(taskOutputPath, true)) {
LOG.info("Failed to delete the temporary output" +
" directory of task: " + attemptId + " - " + taskOutputPath);
}
LOG.info("Saved output of task '" + attemptId + "' to " +
jobOutputPath);
}
}
}
private void moveTaskOutputs(TaskAttemptContext context,
FileSystem fs,
Path jobOutputDir,
Path taskOutput)
throws IOException {
TaskAttemptID attemptId = context.getTaskAttemptID();
context.getProgressible().progress();
LOG.debug("Told to move taskoutput from " + taskOutput
+ " to " + jobOutputDir);
if (fs.isFile(taskOutput)) {
Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
getTempTaskOutputPath(context));
if (!fs.rename(taskOutput, finalOutputPath)) {
if (!fs.delete(finalOutputPath, true)) {
throw new IOException("Failed to delete earlier output of task: " +
attemptId);
}
if (!fs.rename(taskOutput, finalOutputPath)) {
throw new IOException("Failed to save output of task: " +
attemptId);
}
}
LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
} else if(fs.getFileStatus(taskOutput).isDirectory()) {
LOG.debug("Taskoutput " + taskOutput + " is a dir");
FileStatus[] paths = fs.listStatus(taskOutput);
Path finalOutputPath = getFinalPath(fs, jobOutputDir, taskOutput,
getTempTaskOutputPath(context));
fs.mkdirs(finalOutputPath);
LOG.debug("Creating dirs along path " + finalOutputPath);
if (paths != null) {
for (FileStatus path : paths) {
moveTaskOutputs(context, fs, jobOutputDir, path.getPath());
}
}
}
}
public void abortTask(TaskAttemptContext context) throws IOException {
Path taskOutputPath = getTempTaskOutputPath(context);
if (taskOutputPath != null) {
FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
context.getProgressible().progress();
fs.delete(taskOutputPath, true);
}
}
@SuppressWarnings("deprecation")
private Path getFinalPath(FileSystem fs, Path jobOutputDir, Path taskOutput,
Path taskOutputPath) throws IOException {
URI taskOutputUri = taskOutput.makeQualified(fs).toUri();
URI taskOutputPathUri = taskOutputPath.makeQualified(fs).toUri();
URI relativePath = taskOutputPathUri.relativize(taskOutputUri);
if (taskOutputUri == relativePath) {
//taskOutputPath is not a parent of taskOutput
throw new IOException("Can not get the relative path: base = " +
taskOutputPathUri + " child = " + taskOutputUri);
}
if (relativePath.getPath().length() > 0) {
return new Path(jobOutputDir, relativePath.getPath());
} else {
return jobOutputDir;
}
}
public boolean needsTaskCommit(TaskAttemptContext context)
throws IOException {
Path taskOutputPath = getTempTaskOutputPath(context);
if (taskOutputPath != null) {
context.getProgressible().progress();
// Get the file-system for the task output directory
FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
// since task output path is created on demand,
// if it exists, task needs a commit
if (fs.exists(taskOutputPath)) {
return true;
}
}
return false;
}
Path getTempTaskOutputPath(TaskAttemptContext taskContext)
throws IOException {
JobConf conf = taskContext.getJobConf();
Path outputPath = FileOutputFormat.getOutputPath(conf);
if (outputPath != null) {
Path p = new Path(outputPath,
(FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
"_" + taskContext.getTaskAttemptID().toString()));
FileSystem fs = p.getFileSystem(conf);
return p.makeQualified(fs);
}
return null;
getWrapped(context).setupTask(context);
}
Path getWorkPath(TaskAttemptContext taskContext, Path basePath)
@Override
public void commitTask(TaskAttemptContext context) throws IOException {
getWrapped(context).commitTask(context, getTaskAttemptPath(context));
}
@Override
public void abortTask(TaskAttemptContext context) throws IOException {
getWrapped(context).abortTask(context, getTaskAttemptPath(context));
}
@Override
public boolean needsTaskCommit(TaskAttemptContext context)
throws IOException {
// ${mapred.out.dir}/_temporary
Path jobTmpDir = new Path(basePath, FileOutputCommitter.TEMP_DIR_NAME);
FileSystem fs = jobTmpDir.getFileSystem(taskContext.getJobConf());
if (!fs.exists(jobTmpDir)) {
throw new IOException("The temporary job-output directory " +
jobTmpDir.toString() + " doesn't exist!");
}
// ${mapred.out.dir}/_temporary/_${taskid}
String taskid = taskContext.getTaskAttemptID().toString();
Path taskTmpDir = new Path(jobTmpDir, "_" + taskid);
if (!fs.mkdirs(taskTmpDir)) {
throw new IOException("Mkdirs failed to create "
+ taskTmpDir.toString());
}
return taskTmpDir;
return getWrapped(context).needsTaskCommit(context, getTaskAttemptPath(context));
}
@Override
@ -326,54 +184,6 @@ public class FileOutputCommitter extends OutputCommitter {
@Override
public void recoverTask(TaskAttemptContext context)
throws IOException {
Path outputPath = FileOutputFormat.getOutputPath(context.getJobConf());
context.progress();
Path jobOutputPath = new Path(outputPath, getJobTempDirName(context));
int previousAttempt =
context.getConfiguration().getInt(
MRConstants.APPLICATION_ATTEMPT_ID, 0) - 1;
if (previousAttempt < 0) {
LOG.warn("Cannot recover task output for first attempt...");
return;
}
FileSystem outputFileSystem =
outputPath.getFileSystem(context.getJobConf());
Path pathToRecover =
new Path(outputPath, getJobAttemptBaseDirName(previousAttempt));
if (outputFileSystem.exists(pathToRecover)) {
// Move the task outputs to their final place
LOG.debug("Trying to recover task from " + pathToRecover
+ " into " + jobOutputPath);
moveJobOutputs(outputFileSystem,
pathToRecover, jobOutputPath, pathToRecover);
LOG.info("Saved output of job to " + jobOutputPath);
}
}
protected static String getJobAttemptBaseDirName(JobContext context) {
int appAttemptId =
context.getJobConf().getInt(
MRConstants.APPLICATION_ATTEMPT_ID, 0);
return getJobAttemptBaseDirName(appAttemptId);
}
protected static String getJobTempDirName(TaskAttemptContext context) {
int appAttemptId =
context.getJobConf().getInt(
MRConstants.APPLICATION_ATTEMPT_ID, 0);
return getJobAttemptBaseDirName(appAttemptId);
}
protected static String getJobAttemptBaseDirName(int appAttemptId) {
return FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
+ appAttemptId;
}
protected static String getTaskAttemptBaseDirName(
TaskAttemptContext context) {
return getJobTempDirName(context) + Path.SEPARATOR +
FileOutputCommitter.TEMP_DIR_NAME + Path.SEPARATOR +
"_" + context.getTaskAttemptID().toString();
getWrapped(context).recoverTask(context);
}
}

View File

@ -29,10 +29,7 @@ import org.apache.hadoop.fs.Path;
/** A section of an input file. Returned by {@link
* InputFormat#getSplits(JobConf, int)} and passed to
* {@link InputFormat#getRecordReader(InputSplit,JobConf,Reporter)}.
* @deprecated Use {@link org.apache.hadoop.mapreduce.lib.input.FileSplit}
* instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit

View File

@ -30,7 +30,6 @@ import org.apache.hadoop.classification.InterfaceStability;
* @see TaskID
* @see TaskAttemptID
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class ID extends org.apache.hadoop.mapreduce.ID {

View File

@ -63,9 +63,7 @@ import org.apache.hadoop.fs.FileSystem;
* @see RecordReader
* @see JobClient
* @see FileInputFormat
* @deprecated Use {@link org.apache.hadoop.mapreduce.InputFormat} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface InputFormat<K, V> {

View File

@ -34,9 +34,7 @@ import org.apache.hadoop.io.Writable;
*
* @see InputFormat
* @see RecordReader
* @deprecated Use {@link org.apache.hadoop.mapreduce.InputSplit} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface InputSplit extends Writable {

View File

@ -29,6 +29,9 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.ClusterMetrics;
@ -40,13 +43,10 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.mapreduce.tools.CLI;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
@ -132,9 +132,7 @@ import org.apache.hadoop.util.ToolRunner;
* @see ClusterStatus
* @see Tool
* @see DistributedCache
* @deprecated Use {@link Job} and {@link Cluster} instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class JobClient extends CLI {

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.URL;
import java.net.URLDecoder;
import java.util.Enumeration;
@ -28,24 +27,26 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.*;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapred.lib.HashPartitioner;
import org.apache.hadoop.mapred.lib.KeyFieldBasedComparator;
import org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.util.ConfigUtil;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.util.ReflectionUtils;
@ -107,9 +108,7 @@ import org.apache.log4j.Level;
* @see ClusterStatus
* @see Tool
* @see DistributedCache
* @deprecated Use {@link Configuration} instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class JobConf extends Configuration {

View File

@ -22,7 +22,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/** That what may be configured. */
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface JobConfigurable {

View File

@ -22,10 +22,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Progressable;
/**
* @deprecated Use {@link org.apache.hadoop.mapreduce.JobContext} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface JobContext extends org.apache.hadoop.mapreduce.JobContext {

View File

@ -21,10 +21,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.util.Progressable;
/**
* @deprecated Use {@link org.apache.hadoop.mapreduce.JobContext} instead.
*/
@Deprecated
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class JobContextImpl

View File

@ -41,7 +41,6 @@ import org.apache.hadoop.classification.InterfaceStability;
* @see TaskID
* @see TaskAttemptID
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class JobID extends org.apache.hadoop.mapreduce.JobID {

View File

@ -22,9 +22,7 @@ import org.apache.hadoop.classification.InterfaceStability;
/**
* Used to describe the priority of the running job.
* @deprecated Use {@link org.apache.hadoop.mapreduce.JobPriority} instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public enum JobPriority {

View File

@ -29,9 +29,7 @@ import org.apache.hadoop.mapreduce.QueueState;
/**
* Class that contains the information regarding the Job Queues which are
* maintained by the Hadoop Map/Reduce framework.
* @deprecated Use {@link QueueInfo} instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class JobQueueInfo extends QueueInfo {

View File

@ -29,9 +29,7 @@ import org.apache.hadoop.security.authorize.AccessControlList;
* not intended to be a comprehensive piece of data.
* For that, look at JobProfile.
*************************************************
*@deprecated Use {@link org.apache.hadoop.mapreduce.JobStatus} instead
**/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class JobStatus extends org.apache.hadoop.mapreduce.JobStatus {

View File

@ -31,12 +31,7 @@ import org.apache.hadoop.io.Text;
* separator character. The separator can be specified in config file
* under the attribute name mapreduce.input.keyvaluelinerecordreader.key.value.separator. The default
* separator is the tab character ('\t').
*
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader}
* instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class KeyValueLineRecordReader implements RecordReader<Text, Text> {

View File

@ -34,12 +34,7 @@ import org.apache.hadoop.io.compress.SplittableCompressionCodec;
* Either linefeed or carriage-return are used to signal end of line. Each line
* is divided into key and value parts by a separator byte. If no such a byte
* exists, the key will be the entire line and value will be empty.
*
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat}
* instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class KeyValueTextInputFormat extends FileInputFormat<Text, Text>

View File

@ -41,10 +41,7 @@ import org.apache.commons.logging.Log;
/**
* Treats keys as offset in file and value as line.
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader} instead.
*/
@Deprecated
@InterfaceAudience.LimitedPrivate({"MapReduce", "Pig"})
@InterfaceStability.Unstable
public class LineRecordReader implements RecordReader<LongWritable, Text> {

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.MRConfig;

View File

@ -36,10 +36,7 @@ import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
/** An {@link OutputFormat} that writes {@link MapFile}s.
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.output.MapFileOutputFormat} instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MapFileOutputFormat

View File

@ -23,7 +23,6 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.Closeable;
import org.apache.hadoop.mapred.JobConfigurable;
/**
* Base class for {@link Mapper} and {@link Reducer} implementations.
@ -31,7 +30,6 @@ import org.apache.hadoop.mapred.JobConfigurable;
* <p>Provides default no-op implementations for a few methods, most non-trivial
* applications need to override some of them.</p>
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MapReduceBase implements Closeable, JobConfigurable {

View File

@ -30,9 +30,7 @@ import org.apache.hadoop.classification.InterfaceStability;
* control on map processing e.g. multi-threaded, asynchronous mappers etc.</p>
*
* @see Mapper
* @deprecated Use {@link org.apache.hadoop.mapreduce.Mapper} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface MapRunnable<K1, V1, K2, V2>

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystem.Statistics;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;

View File

@ -129,9 +129,7 @@ import org.apache.hadoop.io.compress.CompressionCodec;
* @see MapReduceBase
* @see MapRunnable
* @see SequenceFile
* @deprecated Use {@link org.apache.hadoop.mapreduce.Mapper} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable {

View File

@ -21,12 +21,16 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@Private
@Unstable
public class Master {
public enum State {

View File

@ -36,9 +36,7 @@ import org.apache.hadoop.fs.Path;
* Subclasses implement {@link #getRecordReader(InputSplit, JobConf, Reporter)}
* to construct <code>RecordReader</code>'s for <code>MultiFileSplit</code>'s.
* @see MultiFileSplit
* @deprecated Use {@link org.apache.hadoop.mapred.lib.CombineFileInputFormat} instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class MultiFileInputFormat<K, V>

View File

@ -18,20 +18,16 @@
package org.apache.hadoop.mapred;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.lib.CombineFileSplit;
/**
@ -42,9 +38,7 @@ import org.apache.hadoop.mapred.lib.CombineFileSplit;
* reading one record per file.
* @see FileSplit
* @see MultiFileInputFormat
* @deprecated Use {@link org.apache.hadoop.mapred.lib.CombineFileSplit} instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class MultiFileSplit extends CombineFileSplit {

View File

@ -56,9 +56,7 @@ import org.apache.hadoop.classification.InterfaceStability;
* @see FileOutputCommitter
* @see JobContext
* @see TaskAttemptContext
* @deprecated Use {@link org.apache.hadoop.mapreduce.OutputCommitter} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class OutputCommitter

View File

@ -44,9 +44,7 @@ import org.apache.hadoop.util.Progressable;
*
* @see RecordWriter
* @see JobConf
* @deprecated Use {@link org.apache.hadoop.mapreduce.OutputFormat} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface OutputFormat<K, V> {

View File

@ -29,9 +29,6 @@ import org.apache.hadoop.fs.PathFilter;
* This can be used to list paths of output directory as follows:
* Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
* new OutputLogFilter()));
* @deprecated Use
* {@link org.apache.hadoop.mapred.Utils.OutputFileUtils.OutputLogFilter}
* instead.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable

View File

@ -32,9 +32,7 @@ import org.apache.hadoop.classification.InterfaceStability;
* record) is sent for reduction.</p>
*
* @see Reducer
* @deprecated Use {@link org.apache.hadoop.mapreduce.Partitioner} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Partitioner<K2, V2> extends JobConfigurable {

View File

@ -18,6 +18,9 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/**
*
* This abstract class that represents a bucketed series of
@ -33,6 +36,8 @@ package org.apache.hadoop.mapred;
* bucket and how we interpret the readings by overriding
* {@code extendInternal(...)} and {@code initializeInterval()}
*/
@Private
@Unstable
public abstract class PeriodicStatsAccumulator {
// The range of progress from 0.0D through 1.0D is divided into
// count "progress segments". This object accumulates an

View File

@ -18,13 +18,16 @@
package org.apache.hadoop.mapred;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
/*
* This object gathers the [currently four] PeriodStatset's that we
* are gathering for a particular task attempt for packaging and
* handling as a single object.
*/
@Private
@Unstable
public class ProgressSplitsBlock {
final PeriodicStatsAccumulator progressWallclockTime;
final PeriodicStatsAccumulator progressCPUTime;

View File

@ -20,9 +20,7 @@ package org.apache.hadoop.mapred;
/**
* Class to encapsulate Queue ACLs for a particular
* user.
* @deprecated Use {@link org.apache.hadoop.mapreduce.QueueAclsInfo} instead
*/
@Deprecated
class QueueAclsInfo extends org.apache.hadoop.mapreduce.QueueAclsInfo {
/**

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
import java.io.DataInput;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -162,9 +162,7 @@ import org.apache.hadoop.io.Closeable;
* @see Partitioner
* @see Reporter
* @see MapReduceBase
* @deprecated Use {@link org.apache.hadoop.mapreduce.Reducer} instead.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Reducer<K2, V2, K3, V3> extends JobConfigurable, Closeable {

View File

@ -34,9 +34,7 @@ import org.apache.hadoop.conf.Configuration;
* progress etc.</p>
*
* @see JobClient
* @deprecated Use {@link org.apache.hadoop.mapreduce.Job} instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface RunningJob {

View File

@ -27,20 +27,11 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
/**
* InputFormat reading keys, values from SequenceFiles in binary (raw)
* format.
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileAsBinaryInputFormat}
* instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsBinaryInputFormat

View File

@ -23,26 +23,20 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
/**
* An {@link OutputFormat} that writes keys, values to
* {@link SequenceFile}s in binary(raw) format
*
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.output.SequenceFileAsBinaryOutputFormat}
* instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsBinaryOutputFormat

View File

@ -29,12 +29,7 @@ import org.apache.hadoop.io.Text;
* except it generates SequenceFileAsTextRecordReader
* which converts the input keys and values to their
* String forms by calling toString() method.
*
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextInputFormat}
* instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsTextInputFormat

View File

@ -31,11 +31,7 @@ import org.apache.hadoop.io.WritableComparable;
* This class converts the input keys and values to their String forms by calling toString()
* method. This class to SequenceFileAsTextInputFormat class is as LineRecordReader
* class to TextInputFormat class.
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileAsTextRecordReader}
* instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileAsTextRecordReader

View File

@ -29,11 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils;
/**
* A class that allows a map/red job to work on a sample of sequence files.
* The sample is decided by the filter class set by the job.
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFilter}
* instead
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileInputFilter<K, V>

View File

@ -29,12 +29,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.MapFile;
/** An {@link InputFormat} for {@link SequenceFile}s.
* @deprecated Use
* {@link org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat}
* instead.
/**
* An {@link InputFormat} for {@link SequenceFile}s.
*/
@Deprecated
@InterfaceAudience.Public
@InterfaceStability.Stable
public class SequenceFileInputFormat<K, V> extends FileInputFormat<K, V> {

Some files were not shown because too many files have changed in this diff Show More