Merge trunk into auto-failover branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1310905 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-04-07 22:53:55 +00:00
commit ca6f0940fd
43 changed files with 1375 additions and 384 deletions

View File

@ -331,7 +331,14 @@ public void doFilter(ServletRequest request, ServletResponse response, FilterCha
HttpServletResponse httpResponse = (HttpServletResponse) response; HttpServletResponse httpResponse = (HttpServletResponse) response;
try { try {
boolean newToken = false; boolean newToken = false;
AuthenticationToken token = getToken(httpRequest); AuthenticationToken token;
try {
token = getToken(httpRequest);
}
catch (AuthenticationException ex) {
LOG.warn("AuthenticationToken ignored: " + ex.getMessage());
token = null;
}
if (token == null) { if (token == null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Request [{}] triggering authentication", getRequestURL(httpRequest)); LOG.debug("Request [{}] triggering authentication", getRequestURL(httpRequest));
@ -371,6 +378,9 @@ public Principal getUserPrincipal() {
} }
filterChain.doFilter(httpRequest, httpResponse); filterChain.doFilter(httpRequest, httpResponse);
} }
else {
throw new AuthenticationException("Missing AuthenticationToken");
}
} catch (AuthenticationException ex) { } catch (AuthenticationException ex) {
if (!httpResponse.isCommitted()) { if (!httpResponse.isCommitted()) {
Cookie cookie = createCookie(""); Cookie cookie = createCookie("");

View File

@ -23,10 +23,11 @@
import java.util.List; import java.util.List;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.lang.reflect.Method;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
@ -38,6 +39,8 @@
@InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"}) @InterfaceAudience.LimitedPrivate({"HDFS", "MapReduce"})
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class KerberosName { public class KerberosName {
private static final Logger LOG = LoggerFactory.getLogger(KerberosName.class);
/** The first component of the name */ /** The first component of the name */
private final String serviceName; private final String serviceName;
/** The second component of the name. It may be null. */ /** The second component of the name. It may be null. */
@ -81,6 +84,7 @@ public class KerberosName {
try { try {
defaultRealm = KerberosUtil.getDefaultRealm(); defaultRealm = KerberosUtil.getDefaultRealm();
} catch (Exception ke) { } catch (Exception ke) {
LOG.warn("Kerberos krb5 configuration not found, setting default realm to empty");
defaultRealm=""; defaultRealm="";
} }
} }

View File

@ -349,7 +349,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
} }
} }
private void _testDoFilterAuthentication(boolean withDomainPath) throws Exception { private void _testDoFilterAuthentication(boolean withDomainPath, boolean invalidToken) throws Exception {
AuthenticationFilter filter = new AuthenticationFilter(); AuthenticationFilter filter = new AuthenticationFilter();
try { try {
FilterConfig config = Mockito.mock(FilterConfig.class); FilterConfig config = Mockito.mock(FilterConfig.class);
@ -380,6 +380,12 @@ private void _testDoFilterAuthentication(boolean withDomainPath) throws Exceptio
Mockito.when(request.getRequestURL()).thenReturn(new StringBuffer("http://foo:8080/bar")); Mockito.when(request.getRequestURL()).thenReturn(new StringBuffer("http://foo:8080/bar"));
Mockito.when(request.getQueryString()).thenReturn("authenticated=true"); Mockito.when(request.getQueryString()).thenReturn("authenticated=true");
if (invalidToken) {
Mockito.when(request.getCookies()).thenReturn(
new Cookie[] { new Cookie(AuthenticatedURL.AUTH_COOKIE, "foo")}
);
}
HttpServletResponse response = Mockito.mock(HttpServletResponse.class); HttpServletResponse response = Mockito.mock(HttpServletResponse.class);
FilterChain chain = Mockito.mock(FilterChain.class); FilterChain chain = Mockito.mock(FilterChain.class);
@ -437,11 +443,15 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
} }
public void testDoFilterAuthentication() throws Exception { public void testDoFilterAuthentication() throws Exception {
_testDoFilterAuthentication(false); _testDoFilterAuthentication(false, false);
}
public void testDoFilterAuthenticationWithInvalidToken() throws Exception {
_testDoFilterAuthentication(false, true);
} }
public void testDoFilterAuthenticationWithDomainPath() throws Exception { public void testDoFilterAuthenticationWithDomainPath() throws Exception {
_testDoFilterAuthentication(true); _testDoFilterAuthentication(true, false);
} }
public void testDoFilterAuthenticated() throws Exception { public void testDoFilterAuthenticated() throws Exception {

View File

@ -255,6 +255,9 @@ Release 2.0.0 - UNRELEASED
HADOOP-8077. HA: fencing method should be able to be configured on HADOOP-8077. HA: fencing method should be able to be configured on
a per-NN or per-NS basis (todd) a per-NN or per-NS basis (todd)
HADOOP-8086. KerberosName silently sets defaultRealm to "" if the
Kerberos config is not found, it should log a WARN (tucu)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -329,6 +332,9 @@ Release 2.0.0 - UNRELEASED
HADOOP-8251. Fix SecurityUtil.fetchServiceTicket after HADOOP-6941 (todd) HADOOP-8251. Fix SecurityUtil.fetchServiceTicket after HADOOP-6941 (todd)
HADOOP-8249. invalid hadoop-auth cookies should trigger authentication
if info is avail before returning HTTP 401 (tucu)
BREAKDOWN OF HADOOP-7454 SUBTASKS BREAKDOWN OF HADOOP-7454 SUBTASKS
HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh) HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)
@ -405,6 +411,9 @@ Release 0.23.3 - UNRELEASED
HADOOP-8180. Remove hsqldb since its not needed from pom.xml (Ravi Prakash HADOOP-8180. Remove hsqldb since its not needed from pom.xml (Ravi Prakash
via tgraves) via tgraves)
HADOOP-8014. ViewFileSystem does not correctly implement getDefaultBlockSize,
getDefaultReplication, getContentSummary (John George via bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -579,7 +579,8 @@ public BlockLocation[] getFileBlockLocations(FileStatus file,
* *
* The FileSystem will simply return an elt containing 'localhost'. * The FileSystem will simply return an elt containing 'localhost'.
* *
* @param p path of file to get locations for * @param p path is used to identify an FS since an FS could have
* another FS that it could be delegating the call to
* @param start offset into the given file * @param start offset into the given file
* @param len length for which to get locations for * @param len length for which to get locations for
*/ */
@ -602,10 +603,21 @@ public FsServerDefaults getServerDefaults() throws IOException {
return new FsServerDefaults(getDefaultBlockSize(), return new FsServerDefaults(getDefaultBlockSize(),
conf.getInt("io.bytes.per.checksum", 512), conf.getInt("io.bytes.per.checksum", 512),
64 * 1024, 64 * 1024,
getDefaultReplication(), getDefaultReplication(),
conf.getInt("io.file.buffer.size", 4096)); conf.getInt("io.file.buffer.size", 4096));
} }
/**
* Return a set of server default configuration values
* @param p path is used to identify an FS since an FS could have
* another FS that it could be delegating the call to
* @return server default configuration values
* @throws IOException
*/
public FsServerDefaults getServerDefaults(Path p) throws IOException {
return getServerDefaults();
}
/** /**
* Return the fully-qualified path of path f resolving the path * Return the fully-qualified path of path f resolving the path
* through any symlinks or mount point * through any symlinks or mount point
@ -653,8 +665,8 @@ public FSDataOutputStream create(Path f, boolean overwrite)
throws IOException { throws IOException {
return create(f, overwrite, return create(f, overwrite,
getConf().getInt("io.file.buffer.size", 4096), getConf().getInt("io.file.buffer.size", 4096),
getDefaultReplication(), getDefaultReplication(f),
getDefaultBlockSize()); getDefaultBlockSize(f));
} }
/** /**
@ -668,8 +680,8 @@ public FSDataOutputStream create(Path f, Progressable progress)
throws IOException { throws IOException {
return create(f, true, return create(f, true,
getConf().getInt("io.file.buffer.size", 4096), getConf().getInt("io.file.buffer.size", 4096),
getDefaultReplication(), getDefaultReplication(f),
getDefaultBlockSize(), progress); getDefaultBlockSize(f), progress);
} }
/** /**
@ -683,7 +695,7 @@ public FSDataOutputStream create(Path f, short replication)
return create(f, true, return create(f, true,
getConf().getInt("io.file.buffer.size", 4096), getConf().getInt("io.file.buffer.size", 4096),
replication, replication,
getDefaultBlockSize()); getDefaultBlockSize(f));
} }
/** /**
@ -699,7 +711,7 @@ public FSDataOutputStream create(Path f, short replication,
return create(f, true, return create(f, true,
getConf().getInt("io.file.buffer.size", 4096), getConf().getInt("io.file.buffer.size", 4096),
replication, replication,
getDefaultBlockSize(), progress); getDefaultBlockSize(f), progress);
} }
@ -715,8 +727,8 @@ public FSDataOutputStream create(Path f,
int bufferSize int bufferSize
) throws IOException { ) throws IOException {
return create(f, overwrite, bufferSize, return create(f, overwrite, bufferSize,
getDefaultReplication(), getDefaultReplication(f),
getDefaultBlockSize()); getDefaultBlockSize(f));
} }
/** /**
@ -733,8 +745,8 @@ public FSDataOutputStream create(Path f,
Progressable progress Progressable progress
) throws IOException { ) throws IOException {
return create(f, overwrite, bufferSize, return create(f, overwrite, bufferSize,
getDefaultReplication(), getDefaultReplication(f),
getDefaultBlockSize(), progress); getDefaultBlockSize(f), progress);
} }
@ -1916,11 +1928,31 @@ public long getDefaultBlockSize() {
return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024); return getConf().getLong("fs.local.block.size", 32 * 1024 * 1024);
} }
/** Return the number of bytes that large input files should be optimally
* be split into to minimize i/o time. The given path will be used to
* locate the actual filesystem. The full path does not have to exist.
* @param f path of file
* @return the default block size for the path's filesystem
*/
public long getDefaultBlockSize(Path f) {
return getDefaultBlockSize();
}
/** /**
* Get the default replication. * Get the default replication.
*/ */
public short getDefaultReplication() { return 1; } public short getDefaultReplication() { return 1; }
/**
* Get the default replication for a path. The given path will be used to
* locate the actual filesystem. The full path does not have to exist.
* @param path of the file
* @return default replication for the path's filesystem
*/
public short getDefaultReplication(Path path) {
return getDefaultReplication();
}
/** /**
* Return a file status object that represents the path. * Return a file status object that represents the path.
* @param f The path we want information from * @param f The path we want information from

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Progressable;
@ -267,6 +268,7 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
return fs.mkdirs(f, permission); return fs.mkdirs(f, permission);
} }
/** /**
* The src file is on the local disk. Add it to FS at * The src file is on the local disk. Add it to FS at
* the given dst name. * the given dst name.
@ -336,19 +338,42 @@ public long getUsed() throws IOException{
return fs.getUsed(); return fs.getUsed();
} }
/** Return the number of bytes that large input files should be optimally @Override
* be split into to minimize i/o time. */
public long getDefaultBlockSize() { public long getDefaultBlockSize() {
return fs.getDefaultBlockSize(); return fs.getDefaultBlockSize();
} }
/** @Override
* Get the default replication.
*/
public short getDefaultReplication() { public short getDefaultReplication() {
return fs.getDefaultReplication(); return fs.getDefaultReplication();
} }
@Override
public FsServerDefaults getServerDefaults() throws IOException {
return fs.getServerDefaults();
}
// path variants delegate to underlying filesystem
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
return fs.getContentSummary(f);
}
@Override
public long getDefaultBlockSize(Path f) {
return fs.getDefaultBlockSize(f);
}
@Override
public short getDefaultReplication(Path f) {
return fs.getDefaultReplication(f);
}
@Override
public FsServerDefaults getServerDefaults(Path f) throws IOException {
return fs.getServerDefaults(f);
}
/** /**
* Get file status. * Get file status.
*/ */
@ -441,4 +466,4 @@ public List<Token<?>> getDelegationTokens(String renewer,
Credentials credentials) throws IOException { Credentials credentials) throws IOException {
return fs.getDelegationTokens(renewer, credentials); return fs.getDelegationTokens(renewer, credentials);
} }
} }

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileChecksum;
@ -208,11 +209,6 @@ public FsStatus getStatus(Path p) throws IOException {
return super.getStatus(fullPath(p)); return super.getStatus(fullPath(p));
} }
@Override
public FsServerDefaults getServerDefaults() throws IOException {
return super.getServerDefaults();
}
@Override @Override
public FileStatus[] listStatus(final Path f) public FileStatus[] listStatus(final Path f)
throws IOException { throws IOException {
@ -273,4 +269,42 @@ public void setTimes(final Path f, final long mtime, final long atime)
public Path resolvePath(final Path p) throws IOException { public Path resolvePath(final Path p) throws IOException {
return super.resolvePath(fullPath(p)); return super.resolvePath(fullPath(p));
} }
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
return super.getContentSummary(fullPath(f));
}
private static Path rootPath = new Path(Path.SEPARATOR);
@Override
public long getDefaultBlockSize() {
return getDefaultBlockSize(fullPath(rootPath));
}
@Override
public long getDefaultBlockSize(Path f) {
return super.getDefaultBlockSize(fullPath(f));
}
@Override
public short getDefaultReplication() {
return getDefaultReplication(fullPath(rootPath));
}
@Override
public short getDefaultReplication(Path f) {
return super.getDefaultReplication(fullPath(f));
}
@Override
public FsServerDefaults getServerDefaults() throws IOException {
return getServerDefaults(fullPath(rootPath));
}
@Override
public FsServerDefaults getServerDefaults(Path f) throws IOException {
return super.getServerDefaults(fullPath(f));
}
} }

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import org.apache.hadoop.fs.Path;
/**
* NotInMountpointException extends the UnsupportedOperationException.
* Exception class used in cases where the given path is not mounted
* through viewfs.
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
@SuppressWarnings("serial")
public class NotInMountpointException extends UnsupportedOperationException {
final String msg;
public NotInMountpointException(Path path, String operation) {
msg = operation + " on path `" + path + "' is not within a mount point";
}
public NotInMountpointException(String operation) {
msg = operation + " on empty path is invalid";
}
@Override
public String getMessage() {
return msg;
}
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
@ -41,6 +42,7 @@
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants; import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.InvalidPathException;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException; import org.apache.hadoop.fs.UnsupportedFileSystemException;
@ -470,6 +472,57 @@ public void setVerifyChecksum(final boolean verifyChecksum) {
} }
} }
@Override
public long getDefaultBlockSize() {
throw new NotInMountpointException("getDefaultBlockSize");
}
@Override
public short getDefaultReplication() {
throw new NotInMountpointException("getDefaultReplication");
}
@Override
public FsServerDefaults getServerDefaults() throws IOException {
throw new NotInMountpointException("getServerDefaults");
}
@Override
public long getDefaultBlockSize(Path f) {
try {
InodeTree.ResolveResult<FileSystem> res =
fsState.resolve(getUriPath(f), true);
return res.targetFileSystem.getDefaultBlockSize(res.remainingPath);
} catch (FileNotFoundException e) {
throw new NotInMountpointException(f, "getDefaultBlockSize");
}
}
@Override
public short getDefaultReplication(Path f) {
try {
InodeTree.ResolveResult<FileSystem> res =
fsState.resolve(getUriPath(f), true);
return res.targetFileSystem.getDefaultReplication(res.remainingPath);
} catch (FileNotFoundException e) {
throw new NotInMountpointException(f, "getDefaultReplication");
}
}
@Override
public FsServerDefaults getServerDefaults(Path f) throws IOException {
InodeTree.ResolveResult<FileSystem> res =
fsState.resolve(getUriPath(f), true);
return res.targetFileSystem.getServerDefaults(res.remainingPath);
}
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
InodeTree.ResolveResult<FileSystem> res =
fsState.resolve(getUriPath(f), true);
return res.targetFileSystem.getContentSummary(res.remainingPath);
}
@Override @Override
public void setWriteChecksum(final boolean writeChecksum) { public void setWriteChecksum(final boolean writeChecksum) {
List<InodeTree.MountPoint<FileSystem>> mountPoints = List<InodeTree.MountPoint<FileSystem>> mountPoints =
@ -742,5 +795,20 @@ public void setTimes(Path f, long mtime, long atime)
public void setVerifyChecksum(boolean verifyChecksum) { public void setVerifyChecksum(boolean verifyChecksum) {
// Noop for viewfs // Noop for viewfs
} }
@Override
public FsServerDefaults getServerDefaults(Path f) throws IOException {
throw new NotInMountpointException(f, "getServerDefaults");
}
@Override
public long getDefaultBlockSize(Path f) {
throw new NotInMountpointException(f, "getDefaultBlockSize");
}
@Override
public short getDefaultReplication(Path f) {
throw new NotInMountpointException(f, "getDefaultReplication");
}
} }
} }

View File

@ -36,6 +36,7 @@ public final class FileSystemTestHelper {
System.getProperty("test.build.data", "target/test/data") + "/test"; System.getProperty("test.build.data", "target/test/data") + "/test";
private static final int DEFAULT_BLOCK_SIZE = 1024; private static final int DEFAULT_BLOCK_SIZE = 1024;
private static final int DEFAULT_NUM_BLOCKS = 2; private static final int DEFAULT_NUM_BLOCKS = 2;
private static final short DEFAULT_NUM_REPL = 1;
private static String absTestRootDir = null; private static String absTestRootDir = null;
/** Hidden constructor */ /** Hidden constructor */
@ -99,9 +100,9 @@ public static Path getDefaultWorkingDirectory(FileSystem fSys)
* Create files with numBlocks blocks each with block size blockSize. * Create files with numBlocks blocks each with block size blockSize.
*/ */
public static long createFile(FileSystem fSys, Path path, int numBlocks, public static long createFile(FileSystem fSys, Path path, int numBlocks,
int blockSize, boolean createParent) throws IOException { int blockSize, short numRepl, boolean createParent) throws IOException {
FSDataOutputStream out = FSDataOutputStream out =
fSys.create(path, false, 4096, fSys.getDefaultReplication(), blockSize ); fSys.create(path, false, 4096, numRepl, blockSize );
byte[] data = getFileData(numBlocks, blockSize); byte[] data = getFileData(numBlocks, blockSize);
out.write(data, 0, data.length); out.write(data, 0, data.length);
@ -109,13 +110,19 @@ public static long createFile(FileSystem fSys, Path path, int numBlocks,
return data.length; return data.length;
} }
public static long createFile(FileSystem fSys, Path path, int numBlocks,
int blockSize, boolean createParent) throws IOException {
return createFile(fSys, path, numBlocks, blockSize, fSys.getDefaultReplication(), true);
}
public static long createFile(FileSystem fSys, Path path, int numBlocks, public static long createFile(FileSystem fSys, Path path, int numBlocks,
int blockSize) throws IOException { int blockSize) throws IOException {
return createFile(fSys, path, numBlocks, blockSize, true); return createFile(fSys, path, numBlocks, blockSize, true);
} }
public static long createFile(FileSystem fSys, Path path) throws IOException { public static long createFile(FileSystem fSys, Path path) throws IOException {
return createFile(fSys, path, DEFAULT_NUM_BLOCKS, DEFAULT_BLOCK_SIZE, true); return createFile(fSys, path, DEFAULT_NUM_BLOCKS, DEFAULT_BLOCK_SIZE, DEFAULT_NUM_REPL, true);
} }
public static long createFile(FileSystem fSys, String name) throws IOException { public static long createFile(FileSystem fSys, String name) throws IOException {

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants; import org.apache.hadoop.fs.FsConstants;
@ -170,7 +171,15 @@ public void testRename() throws IOException {
Assert.assertTrue(fSys.isDirectory(FileSystemTestHelper.getTestRootPath(fSys,"/newDir/dirFooBar"))); Assert.assertTrue(fSys.isDirectory(FileSystemTestHelper.getTestRootPath(fSys,"/newDir/dirFooBar")));
Assert.assertTrue(fSysTarget.isDirectory(new Path(chrootedTo,"newDir/dirFooBar"))); Assert.assertTrue(fSysTarget.isDirectory(new Path(chrootedTo,"newDir/dirFooBar")));
} }
@Test
public void testGetContentSummary() throws IOException {
// GetContentSummary of a dir
fSys.mkdirs(new Path("/newDir/dirFoo"));
ContentSummary cs = fSys.getContentSummary(new Path("/newDir/dirFoo"));
Assert.assertEquals(-1L, cs.getQuota());
Assert.assertEquals(-1L, cs.getSpaceQuota());
}
/** /**
* We would have liked renames across file system to fail but * We would have liked renames across file system to fail but

View File

@ -62,14 +62,14 @@ Trunk (unreleased changes)
HDFS-3178. Add states and state handler for journal synchronization in HDFS-3178. Add states and state handler for journal synchronization in
JournalService. (szetszwo) JournalService. (szetszwo)
HDFS-3204. Minor modification to JournalProtocol.proto to make
it generic. (suresh)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
(Henry Robinson via todd) (Henry Robinson via todd)
HDFS-3110. Use directRead API to reduce the number of buffer copies in
libhdfs (Henry Robinson via todd)
BUG FIXES BUG FIXES
HDFS-2299. TestOfflineEditsViewer is failing on trunk. (Uma Maheswara Rao G HDFS-2299. TestOfflineEditsViewer is failing on trunk. (Uma Maheswara Rao G
@ -114,6 +114,9 @@ Trunk (unreleased changes)
HDFS-3126. Journal stream from Namenode to BackupNode needs to have HDFS-3126. Journal stream from Namenode to BackupNode needs to have
timeout. (Hari Mankude via suresh) timeout. (Hari Mankude via suresh)
HDFS-3121. Add HDFS tests for HADOOP-8014 change. (John George via
suresh)
Release 2.0.0 - UNRELEASED Release 2.0.0 - UNRELEASED
@ -327,6 +330,17 @@ Release 2.0.0 - UNRELEASED
HDFS-3050. rework OEV to share more code with the NameNode. HDFS-3050. rework OEV to share more code with the NameNode.
(Colin Patrick McCabe via eli) (Colin Patrick McCabe via eli)
HDFS-3226. Allow GetConf tool to print arbitrary keys (todd)
HDFS-3204. Minor modification to JournalProtocol.proto to make
it generic. (suresh)
HDFS-2505. Add a test to verify getFileChecksum(..) with ViewFS. (Ravi
Prakash via szetszwo)
HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo
and epoch in JournalProtocol. (suresh via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-3024. Improve performance of stringification in addStoredBlock (todd) HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
@ -436,6 +450,9 @@ Release 2.0.0 - UNRELEASED
HDFS-3208. Bogus entries in hosts files are incorrectly displayed HDFS-3208. Bogus entries in hosts files are incorrectly displayed
in the report. (eli) in the report. (eli)
HDFS-3136. Remove SLF4J dependency as HDFS does not need it to fix
unnecessary warnings. (Jason Lowe via suresh)
BREAKDOWN OF HDFS-1623 SUBTASKS BREAKDOWN OF HDFS-1623 SUBTASKS
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd) HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

View File

@ -90,16 +90,6 @@
<artifactId>junit</artifactId> <artifactId>junit</artifactId>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<scope>compile</scope>
</dependency>
<dependency> <dependency>
<groupId>org.mockito</groupId> <groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId> <artifactId>mockito-all</artifactId>

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
/** /**
@ -33,6 +34,10 @@
public class UnregisteredNodeException extends IOException { public class UnregisteredNodeException extends IOException {
private static final long serialVersionUID = -5620209396945970810L; private static final long serialVersionUID = -5620209396945970810L;
public UnregisteredNodeException(JournalInfo info) {
super("Unregistered server: " + info.toString());
}
public UnregisteredNodeException(NodeRegistration nodeReg) { public UnregisteredNodeException(NodeRegistration nodeReg) {
super("Unregistered server: " + nodeReg.toString()); super("Unregistered server: " + nodeReg.toString());
} }

View File

@ -20,10 +20,13 @@
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -48,9 +51,8 @@ public JournalProtocolServerSideTranslatorPB(JournalProtocol impl) {
public JournalResponseProto journal(RpcController unused, public JournalResponseProto journal(RpcController unused,
JournalRequestProto req) throws ServiceException { JournalRequestProto req) throws ServiceException {
try { try {
impl.journal(PBHelper.convert(req.getJournalInfo()), impl.journal(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
req.getFirstTxnId(), req.getNumTxns(), req.getRecords() req.getFirstTxnId(), req.getNumTxns(), req.getRecords().toByteArray());
.toByteArray());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
@ -63,10 +65,24 @@ public StartLogSegmentResponseProto startLogSegment(RpcController controller,
StartLogSegmentRequestProto req) throws ServiceException { StartLogSegmentRequestProto req) throws ServiceException {
try { try {
impl.startLogSegment(PBHelper.convert(req.getJournalInfo()), impl.startLogSegment(PBHelper.convert(req.getJournalInfo()),
req.getTxid()); req.getEpoch(), req.getTxid());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
return StartLogSegmentResponseProto.newBuilder().build(); return StartLogSegmentResponseProto.newBuilder().build();
} }
@Override
public FenceResponseProto fence(RpcController controller,
FenceRequestProto req) throws ServiceException {
try {
FenceResponse resp = impl.fence(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
req.getFencerInfo());
return FenceResponseProto.newBuilder().setInSync(resp.isInSync())
.setLastTransactionId(resp.getLastTransactionId())
.setPreviousEpoch(resp.getPreviousEpoch()).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
} }

View File

@ -22,10 +22,13 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -58,10 +61,11 @@ public void close() {
} }
@Override @Override
public void journal(NamenodeRegistration reg, long firstTxnId, public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
int numTxns, byte[] records) throws IOException { int numTxns, byte[] records) throws IOException {
JournalRequestProto req = JournalRequestProto.newBuilder() JournalRequestProto req = JournalRequestProto.newBuilder()
.setJournalInfo(PBHelper.convertToJournalInfo(reg)) .setJournalInfo(PBHelper.convert(journalInfo))
.setEpoch(epoch)
.setFirstTxnId(firstTxnId) .setFirstTxnId(firstTxnId)
.setNumTxns(numTxns) .setNumTxns(numTxns)
.setRecords(PBHelper.getByteString(records)) .setRecords(PBHelper.getByteString(records))
@ -74,10 +78,11 @@ public void journal(NamenodeRegistration reg, long firstTxnId,
} }
@Override @Override
public void startLogSegment(NamenodeRegistration registration, long txid) public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
throws IOException { throws IOException {
StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
.setJournalInfo(PBHelper.convertToJournalInfo(registration)) .setJournalInfo(PBHelper.convert(journalInfo))
.setEpoch(epoch)
.setTxid(txid) .setTxid(txid)
.build(); .build();
try { try {
@ -86,6 +91,20 @@ public void startLogSegment(NamenodeRegistration registration, long txid)
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch)
.setJournalInfo(PBHelper.convert(journalInfo)).build();
try {
FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req);
return new FenceResponse(resp.getPreviousEpoch(),
resp.getLastTransactionId(), resp.getInSync());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override @Override
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {

View File

@ -110,6 +110,7 @@
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -117,6 +118,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -127,7 +129,6 @@
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -1347,25 +1348,19 @@ public static StorageReportProto convert(StorageReport r) {
.setStorageID(r.getStorageID()).build(); .setStorageID(r.getStorageID()).build();
} }
public static NamenodeRegistration convert(JournalInfoProto info) { public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0); return new JournalInfo(lv, info.getClusterID(), nsID);
// Note that the role is always {@link NamenodeRole#NAMENODE} as this
// conversion happens for messages from Namenode to Journal receivers.
// Addresses in the registration are unused.
return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE);
} }
/** /**
* Method used for converting {@link JournalInfoProto} sent from Namenode * Method used for converting {@link JournalInfoProto} sent from Namenode
* to Journal receivers to {@link NamenodeRegistration}. * to Journal receivers to {@link NamenodeRegistration}.
*/ */
public static JournalInfoProto convertToJournalInfo( public static JournalInfoProto convert(JournalInfo j) {
NamenodeRegistration reg) { return JournalInfoProto.newBuilder().setClusterID(j.getClusterId())
return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID()) .setLayoutVersion(j.getLayoutVersion())
.setLayoutVersion(reg.getLayoutVersion()) .setNamespaceID(j.getNamespaceId()).build();
.setNamespaceID(reg.getNamespaceID()).build();
} }
} }

View File

@ -31,6 +31,9 @@
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.FencedException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -40,6 +43,7 @@
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
/** /**
@ -66,6 +70,8 @@ public class JournalService implements JournalProtocol {
private final NamenodeProtocol namenode; private final NamenodeProtocol namenode;
private final StateHandler stateHandler = new StateHandler(); private final StateHandler stateHandler = new StateHandler();
private final RPC.Server rpcServer; private final RPC.Server rpcServer;
private long epoch = 0;
private String fencerInfo;
enum State { enum State {
/** The service is initialized and ready to start. */ /** The service is initialized and ready to start. */
@ -115,7 +121,7 @@ synchronized void waitForRoll() {
current = State.WAITING_FOR_ROLL; current = State.WAITING_FOR_ROLL;
} }
synchronized void startLogSegment() throws IOException { synchronized void startLogSegment() {
if (current == State.WAITING_FOR_ROLL) { if (current == State.WAITING_FOR_ROLL) {
current = State.SYNCING; current = State.SYNCING;
} }
@ -232,28 +238,42 @@ public void stop() {
} }
@Override @Override
public void journal(NamenodeRegistration registration, long firstTxnId, public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
int numTxns, byte[] records) throws IOException { int numTxns, byte[] records) throws IOException {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Received journal " + firstTxnId + " " + numTxns); LOG.trace("Received journal " + firstTxnId + " " + numTxns);
} }
stateHandler.isJournalAllowed(); stateHandler.isJournalAllowed();
verify(registration); verify(epoch, journalInfo);
listener.journal(this, firstTxnId, numTxns, records); listener.journal(this, firstTxnId, numTxns, records);
} }
@Override @Override
public void startLogSegment(NamenodeRegistration registration, long txid) public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
throws IOException { throws IOException {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Received startLogSegment " + txid); LOG.trace("Received startLogSegment " + txid);
} }
stateHandler.isStartLogSegmentAllowed(); stateHandler.isStartLogSegmentAllowed();
verify(registration); verify(epoch, journalInfo);
listener.rollLogs(this, txid); listener.rollLogs(this, txid);
stateHandler.startLogSegment(); stateHandler.startLogSegment();
} }
@Override
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
verifyFence(epoch, fencerInfo);
verify(journalInfo);
long previousEpoch = epoch;
this.epoch = epoch;
this.fencerInfo = fencerInfo;
// TODO:HDFS-3092 set lastTransId and inSync
return new FenceResponse(previousEpoch, 0, false);
}
/** Create an RPC server. */ /** Create an RPC server. */
private static RPC.Server createRpcServer(Configuration conf, private static RPC.Server createRpcServer(Configuration conf,
InetSocketAddress address, JournalProtocol impl) throws IOException { InetSocketAddress address, JournalProtocol impl) throws IOException {
@ -267,15 +287,54 @@ private static RPC.Server createRpcServer(Configuration conf,
address.getHostName(), address.getPort(), 1, false, conf, null); address.getHostName(), address.getPort(), 1, false, conf, null);
} }
private void verify(NamenodeRegistration reg) throws IOException { private void verifyEpoch(long e) throws FencedException {
if (!registration.getRegistrationID().equals(reg.getRegistrationID())) { if (epoch != e) {
LOG.warn("Invalid registrationID - expected: " String errorMsg = "Epoch " + e + " is not valid. "
+ registration.getRegistrationID() + " received: " + "Resource has already been fenced by " + fencerInfo
+ reg.getRegistrationID()); + " with epoch " + epoch;
throw new UnregisteredNodeException(reg); LOG.warn(errorMsg);
throw new FencedException(errorMsg);
} }
} }
private void verifyFence(long e, String fencer) throws FencedException {
if (e <= epoch) {
String errorMsg = "Epoch " + e + " from fencer " + fencer
+ " is not valid. " + "Resource has already been fenced by "
+ fencerInfo + " with epoch " + epoch;
LOG.warn(errorMsg);
throw new FencedException(errorMsg);
}
}
/**
* Verifies a journal request
*/
private void verify(JournalInfo journalInfo) throws IOException {
String errorMsg = null;
int expectedNamespaceID = registration.getNamespaceID();
if (journalInfo.getNamespaceId() != expectedNamespaceID) {
errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
+ " actual " + journalInfo.getNamespaceId();
LOG.warn(errorMsg);
throw new UnregisteredNodeException(journalInfo);
}
if (!journalInfo.getClusterId().equals(registration.getClusterID())) {
errorMsg = "Invalid clusterId in journal request - expected "
+ journalInfo.getClusterId() + " actual " + registration.getClusterID();
LOG.warn(errorMsg);
throw new UnregisteredNodeException(journalInfo);
}
}
/**
* Verifies a journal request
*/
private void verify(long e, JournalInfo journalInfo) throws IOException {
verifyEpoch(e);
verify(journalInfo);
}
/** /**
* Register this service with the active namenode. * Register this service with the active namenode.
*/ */
@ -298,4 +357,9 @@ private void handshake() throws IOException {
listener.verifyVersion(this, nsInfo); listener.verifyVersion(this, nsInfo);
registration.setStorageInfo(nsInfo); registration.setStorageInfo(nsInfo);
} }
}
@VisibleForTesting
long getEpoch() {
return epoch;
}
}

View File

@ -19,6 +19,7 @@
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
/** /**
@ -26,19 +27,20 @@
* to a BackupNode. * to a BackupNode.
*/ */
class BackupJournalManager implements JournalManager { class BackupJournalManager implements JournalManager {
private final NamenodeRegistration nnReg;
private final NamenodeRegistration bnReg; private final NamenodeRegistration bnReg;
private final JournalInfo journalInfo;
BackupJournalManager(NamenodeRegistration bnReg, BackupJournalManager(NamenodeRegistration bnReg,
NamenodeRegistration nnReg) { NamenodeRegistration nnReg) {
journalInfo = new JournalInfo(nnReg.getLayoutVersion(),
nnReg.getClusterID(), nnReg.getNamespaceID());
this.bnReg = bnReg; this.bnReg = bnReg;
this.nnReg = nnReg;
} }
@Override @Override
public EditLogOutputStream startLogSegment(long txId) throws IOException { public EditLogOutputStream startLogSegment(long txId) throws IOException {
EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg); EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg,
journalInfo);
stm.startLogSegment(txId); stm.startLogSegment(txId);
return stm; return stm;
} }

View File

@ -35,6 +35,8 @@
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -217,7 +219,8 @@ void stop(boolean reportError) {
} }
/* @Override */// NameNode /* @Override */// NameNode
public boolean setSafeMode(SafeModeAction action) throws IOException { public boolean setSafeMode(@SuppressWarnings("unused") SafeModeAction action)
throws IOException {
throw new UnsupportedActionException("setSafeMode"); throw new UnsupportedActionException("setSafeMode");
} }
@ -236,51 +239,56 @@ private BackupNodeRpcServer(Configuration conf, BackupNode nn)
/** /**
* Verifies a journal request * Verifies a journal request
* @param nodeReg node registration
* @throws UnregisteredNodeException if the registration is invalid
*/ */
void verifyJournalRequest(NamenodeRegistration reg) throws IOException { private void verifyJournalRequest(JournalInfo journalInfo)
verifyVersion(reg.getLayoutVersion()); throws IOException {
verifyVersion(journalInfo.getLayoutVersion());
String errorMsg = null; String errorMsg = null;
int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID(); int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID();
if (reg.getNamespaceID() != expectedNamespaceID) { if (journalInfo.getNamespaceId() != expectedNamespaceID) {
errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
+ " actual " + reg.getNamespaceID(); + " actual " + journalInfo.getNamespaceId();
LOG.warn(errorMsg); LOG.warn(errorMsg);
throw new UnregisteredNodeException(reg); throw new UnregisteredNodeException(journalInfo);
} }
if (!reg.getClusterID().equals(namesystem.getClusterId())) { if (!journalInfo.getClusterId().equals(namesystem.getClusterId())) {
errorMsg = "Invalid clusterId in journal request - expected " errorMsg = "Invalid clusterId in journal request - expected "
+ reg.getClusterID() + " actual " + namesystem.getClusterId(); + journalInfo.getClusterId() + " actual " + namesystem.getClusterId();
LOG.warn(errorMsg); LOG.warn(errorMsg);
throw new UnregisteredNodeException(reg); throw new UnregisteredNodeException(journalInfo);
} }
} }
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
// BackupNodeProtocol implementation for backup node. // BackupNodeProtocol implementation for backup node.
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
@Override @Override
public void startLogSegment(NamenodeRegistration registration, long txid) public void startLogSegment(JournalInfo journalInfo, long epoch,
throws IOException { long txid) throws IOException {
namesystem.checkOperation(OperationCategory.JOURNAL); namesystem.checkOperation(OperationCategory.JOURNAL);
verifyJournalRequest(registration); verifyJournalRequest(journalInfo);
getBNImage().namenodeStartedLogSegment(txid); getBNImage().namenodeStartedLogSegment(txid);
} }
@Override @Override
public void journal(NamenodeRegistration nnReg, public void journal(JournalInfo journalInfo, long epoch, long firstTxId,
long firstTxId, int numTxns, int numTxns, byte[] records) throws IOException {
byte[] records) throws IOException {
namesystem.checkOperation(OperationCategory.JOURNAL); namesystem.checkOperation(OperationCategory.JOURNAL);
verifyJournalRequest(nnReg); verifyJournalRequest(journalInfo);
getBNImage().journal(firstTxId, numTxns, records); getBNImage().journal(firstTxId, numTxns, records);
} }
private BackupImage getBNImage() { private BackupImage getBNImage() {
return (BackupImage)nn.getFSImage(); return (BackupImage)nn.getFSImage();
} }
@Override
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
throw new UnsupportedOperationException(
"BackupNode does not support fence");
}
} }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
@ -42,18 +43,18 @@
class EditLogBackupOutputStream extends EditLogOutputStream { class EditLogBackupOutputStream extends EditLogOutputStream {
static int DEFAULT_BUFFER_SIZE = 256; static int DEFAULT_BUFFER_SIZE = 256;
private JournalProtocol backupNode; // RPC proxy to backup node private final JournalProtocol backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration private final NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration private final JournalInfo journalInfo; // active node registration
private final DataOutputBuffer out; // serialized output sent to backup node
private EditsDoubleBuffer doubleBuf; private EditsDoubleBuffer doubleBuf;
private DataOutputBuffer out; // serialized output sent to backup node
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node JournalInfo journalInfo) // active name-node
throws IOException { throws IOException {
super(); super();
this.bnRegistration = bnReg; this.bnRegistration = bnReg;
this.nnRegistration = nnReg; this.journalInfo = journalInfo;
InetSocketAddress bnAddress = InetSocketAddress bnAddress =
NetUtils.createSocketAddr(bnRegistration.getAddress()); NetUtils.createSocketAddr(bnRegistration.getAddress());
try { try {
@ -127,8 +128,7 @@ protected void flushAndSync() throws IOException {
out.reset(); out.reset();
assert out.getLength() == 0 : "Output buffer is not empty"; assert out.getLength() == 0 : "Output buffer is not empty";
backupNode.journal(nnRegistration, backupNode.journal(journalInfo, 0, firstTxToFlush, numReadyTxns, data);
firstTxToFlush, numReadyTxns, data);
} }
} }
@ -140,6 +140,6 @@ NamenodeRegistration getRegistration() {
} }
void startLogSegment(long txId) throws IOException { void startLogSegment(long txId) throws IOException {
backupNode.startLogSegment(nnRegistration, txId); backupNode.startLogSegment(journalInfo, 0, txId);
} }
} }

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Response to a journal fence request. See {@link JournalProtocol#fence}
*/
@InterfaceAudience.Private
public class FenceResponse {
private final long previousEpoch;
private final long lastTransactionId;
private final boolean isInSync;
public FenceResponse(long previousEpoch, long lastTransId, boolean inSync) {
this.previousEpoch = previousEpoch;
this.lastTransactionId = lastTransId;
this.isInSync = inSync;
}
public boolean isInSync() {
return isInSync;
}
public long getLastTransactionId() {
return lastTransactionId;
}
public long getPreviousEpoch() {
return previousEpoch;
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import java.io.IOException;
/**
* If a previous user of a resource tries to use a shared resource, after
* fenced by another user, this exception is thrown.
*/
public class FencedException extends IOException {
private static final long serialVersionUID = 1L;
public FencedException(String errorMsg) {
super(errorMsg);
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Information that describes a journal
*/
@InterfaceAudience.Private
public class JournalInfo {
private final int layoutVersion;
private final String clusterId;
private final int namespaceId;
public JournalInfo(int lv, String clusterId, int nsId) {
this.layoutVersion = lv;
this.clusterId = clusterId;
this.namespaceId = nsId;
}
public int getLayoutVersion() {
return layoutVersion;
}
public String getClusterId() {
return clusterId;
}
public int getNamespaceId() {
return namespaceId;
}
}

View File

@ -21,7 +21,6 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
/** /**
@ -53,12 +52,15 @@ public interface JournalProtocol {
* via {@code EditLogBackupOutputStream} in order to synchronize meta-data * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
* changes with the backup namespace image. * changes with the backup namespace image.
* *
* @param registration active node registration * @param journalInfo journal information
* @param epoch marks beginning a new journal writer
* @param firstTxnId the first transaction of this batch * @param firstTxnId the first transaction of this batch
* @param numTxns number of transactions * @param numTxns number of transactions
* @param records byte array containing serialized journal records * @param records byte array containing serialized journal records
* @throws FencedException if the resource has been fenced
*/ */
public void journal(NamenodeRegistration registration, public void journal(JournalInfo journalInfo,
long epoch,
long firstTxnId, long firstTxnId,
int numTxns, int numTxns,
byte[] records) throws IOException; byte[] records) throws IOException;
@ -66,9 +68,24 @@ public void journal(NamenodeRegistration registration,
/** /**
* Notify the BackupNode that the NameNode has rolled its edit logs * Notify the BackupNode that the NameNode has rolled its edit logs
* and is now writing a new log segment. * and is now writing a new log segment.
* @param registration the registration of the active NameNode * @param journalInfo journal information
* @param epoch marks beginning a new journal writer
* @param txid the first txid in the new log * @param txid the first txid in the new log
* @throws FencedException if the resource has been fenced
*/ */
public void startLogSegment(NamenodeRegistration registration, public void startLogSegment(JournalInfo journalInfo, long epoch,
long txid) throws IOException; long txid) throws IOException;
/**
* Request to fence any other journal writers.
* Older writers with at previous epoch will be fenced and can no longer
* perform journal operations.
*
* @param journalInfo journal information
* @param epoch marks beginning a new journal writer
* @param fencerInfo info about fencer for debugging purposes
* @throws FencedException if the resource has been fenced
*/
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException;
} }

View File

@ -21,10 +21,12 @@
import java.io.PrintStream; import java.io.PrintStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
@ -70,7 +72,8 @@ enum Command {
EXCLUDE_FILE("-excludeFile", EXCLUDE_FILE("-excludeFile",
"gets the exclude file path that defines the datanodes " + "gets the exclude file path that defines the datanodes " +
"that need to decommissioned."), "that need to decommissioned."),
NNRPCADDRESSES("-nnRpcAddresses", "gets the namenode rpc addresses"); NNRPCADDRESSES("-nnRpcAddresses", "gets the namenode rpc addresses"),
CONFKEY("-confKey [key]", "gets a specific key from the configuration");
private static Map<String, CommandHandler> map; private static Map<String, CommandHandler> map;
static { static {
@ -87,6 +90,8 @@ enum Command {
new CommandHandler("DFSConfigKeys.DFS_HOSTS_EXCLUDE")); new CommandHandler("DFSConfigKeys.DFS_HOSTS_EXCLUDE"));
map.put(NNRPCADDRESSES.getName().toLowerCase(), map.put(NNRPCADDRESSES.getName().toLowerCase(),
new NNRpcAddressesCommandHandler()); new NNRpcAddressesCommandHandler());
map.put(CONFKEY.getName().toLowerCase(),
new PrintConfKeyCommandHandler());
} }
private final String cmd; private final String cmd;
@ -98,6 +103,10 @@ enum Command {
} }
public String getName() { public String getName() {
return cmd.split(" ")[0];
}
public String getUsage() {
return cmd; return cmd;
} }
@ -105,8 +114,8 @@ public String getDescription() {
return description; return description;
} }
public static CommandHandler getHandler(String name) { public static CommandHandler getHandler(String cmd) {
return map.get(name.toLowerCase()); return map.get(cmd.toLowerCase());
} }
} }
@ -118,7 +127,7 @@ public static CommandHandler getHandler(String name) {
StringBuilder usage = new StringBuilder(DESCRIPTION); StringBuilder usage = new StringBuilder(DESCRIPTION);
usage.append("\nhadoop getconf \n"); usage.append("\nhadoop getconf \n");
for (Command cmd : Command.values()) { for (Command cmd : Command.values()) {
usage.append("\t[" + cmd.getName() + "]\t\t\t" + cmd.getDescription() usage.append("\t[" + cmd.getUsage() + "]\t\t\t" + cmd.getDescription()
+ "\n"); + "\n");
} }
USAGE = usage.toString(); USAGE = usage.toString();
@ -128,7 +137,7 @@ public static CommandHandler getHandler(String name) {
* Handler to return value for key corresponding to the {@link Command} * Handler to return value for key corresponding to the {@link Command}
*/ */
static class CommandHandler { static class CommandHandler {
final String key; // Configuration key to lookup String key; // Configuration key to lookup
CommandHandler() { CommandHandler() {
this(null); this(null);
@ -138,18 +147,30 @@ static class CommandHandler {
this.key = key; this.key = key;
} }
final int doWork(GetConf tool) { final int doWork(GetConf tool, String[] args) {
try { try {
return doWorkInternal(tool); checkArgs(args);
return doWorkInternal(tool, args);
} catch (Exception e) { } catch (Exception e) {
tool.printError(e.getMessage()); tool.printError(e.getMessage());
} }
return -1; return -1;
} }
protected void checkArgs(String args[]) {
if (args.length > 0) {
throw new HadoopIllegalArgumentException(
"Did not expect argument: " + args[0]);
}
}
/** Method to be overridden by sub classes for specific behavior */ /** Method to be overridden by sub classes for specific behavior
int doWorkInternal(GetConf tool) throws Exception { * @param args */
String value = tool.getConf().get(key); int doWorkInternal(GetConf tool, String[] args) throws Exception {
String value = tool.getConf().getTrimmed(key);
if (value != null) { if (value != null) {
tool.printOut(value); tool.printOut(value);
return 0; return 0;
@ -164,7 +185,7 @@ int doWorkInternal(GetConf tool) throws Exception {
*/ */
static class NameNodesCommandHandler extends CommandHandler { static class NameNodesCommandHandler extends CommandHandler {
@Override @Override
int doWorkInternal(GetConf tool) throws IOException { int doWorkInternal(GetConf tool, String []args) throws IOException {
tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf())); tool.printMap(DFSUtil.getNNServiceRpcAddresses(tool.getConf()));
return 0; return 0;
} }
@ -175,7 +196,7 @@ int doWorkInternal(GetConf tool) throws IOException {
*/ */
static class BackupNodesCommandHandler extends CommandHandler { static class BackupNodesCommandHandler extends CommandHandler {
@Override @Override
public int doWorkInternal(GetConf tool) throws IOException { public int doWorkInternal(GetConf tool, String []args) throws IOException {
tool.printMap(DFSUtil.getBackupNodeAddresses(tool.getConf())); tool.printMap(DFSUtil.getBackupNodeAddresses(tool.getConf()));
return 0; return 0;
} }
@ -186,7 +207,7 @@ public int doWorkInternal(GetConf tool) throws IOException {
*/ */
static class SecondaryNameNodesCommandHandler extends CommandHandler { static class SecondaryNameNodesCommandHandler extends CommandHandler {
@Override @Override
public int doWorkInternal(GetConf tool) throws IOException { public int doWorkInternal(GetConf tool, String []args) throws IOException {
tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf())); tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf()));
return 0; return 0;
} }
@ -199,7 +220,7 @@ public int doWorkInternal(GetConf tool) throws IOException {
*/ */
static class NNRpcAddressesCommandHandler extends CommandHandler { static class NNRpcAddressesCommandHandler extends CommandHandler {
@Override @Override
public int doWorkInternal(GetConf tool) throws IOException { public int doWorkInternal(GetConf tool, String []args) throws IOException {
Configuration config = tool.getConf(); Configuration config = tool.getConf();
List<ConfiguredNNAddress> cnnlist = DFSUtil.flattenAddressMap( List<ConfiguredNNAddress> cnnlist = DFSUtil.flattenAddressMap(
DFSUtil.getNNServiceRpcAddresses(config)); DFSUtil.getNNServiceRpcAddresses(config));
@ -215,6 +236,23 @@ public int doWorkInternal(GetConf tool) throws IOException {
} }
} }
static class PrintConfKeyCommandHandler extends CommandHandler {
@Override
protected void checkArgs(String[] args) {
if (args.length != 1) {
throw new HadoopIllegalArgumentException(
"usage: " + Command.CONFKEY.getUsage());
}
}
@Override
int doWorkInternal(GetConf tool, String[] args) throws Exception {
this.key = args[0];
System.err.println("key: " + key);
return super.doWorkInternal(tool, args);
}
}
private final PrintStream out; // Stream for printing command output private final PrintStream out; // Stream for printing command output
private final PrintStream err; // Stream for printing error private final PrintStream err; // Stream for printing error
@ -260,10 +298,11 @@ private void printUsage() {
* @return return status of the command * @return return status of the command
*/ */
private int doWork(String[] args) { private int doWork(String[] args) {
if (args.length == 1) { if (args.length >= 1) {
CommandHandler handler = Command.getHandler(args[0]); CommandHandler handler = Command.getHandler(args[0]);
if (handler != null) { if (handler != null) {
return handler.doWork(this); return handler.doWork(this,
Arrays.copyOfRange(args, 1, args.length));
} }
} }
printUsage(); printUsage();

View File

@ -123,6 +123,11 @@ static int errnoFromException(jthrowable exc, JNIEnv *env,
goto done; goto done;
} }
if (!strcmp(excClass, "java.lang.UnsupportedOperationException")) {
errnum = ENOTSUP;
goto done;
}
if (!strcmp(excClass, "org.apache.hadoop.security." if (!strcmp(excClass, "org.apache.hadoop.security."
"AccessControlException")) { "AccessControlException")) {
errnum = EACCES; errnum = EACCES;
@ -614,8 +619,29 @@ hdfsFile hdfsOpenFile(hdfsFS fs, const char* path, int flags,
} else { } else {
file->file = (*env)->NewGlobalRef(env, jVal.l); file->file = (*env)->NewGlobalRef(env, jVal.l);
file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT); file->type = (((flags & O_WRONLY) == 0) ? INPUT : OUTPUT);
file->flags = 0;
destroyLocalReference(env, jVal.l); destroyLocalReference(env, jVal.l);
if ((flags & O_WRONLY) == 0) {
// Try a test read to see if we can do direct reads
errno = 0;
char buf;
if (readDirect(fs, file, &buf, 0) == 0) {
// Success - 0-byte read should return 0
file->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
} else {
if (errno != ENOTSUP) {
// Unexpected error. Clear it, don't set the direct flag.
fprintf(stderr,
"WARN: Unexpected error %d when testing "
"for direct read compatibility\n", errno);
errno = 0;
goto done;
}
}
errno = 0;
}
} }
done: done:
@ -706,10 +732,57 @@ int hdfsExists(hdfsFS fs, const char *path)
return jVal.z ? 0 : -1; return jVal.z ? 0 : -1;
} }
// Checks input file for readiness for reading.
static int readPrepare(JNIEnv* env, hdfsFS fs, hdfsFile f,
jobject* jInputStream)
{
*jInputStream = (jobject)(f ? f->file : NULL);
//Sanity check
if (!f || f->type == UNINITIALIZED) {
errno = EBADF;
return -1;
}
//Error checking... make sure that this file is 'readable'
if (f->type != INPUT) {
fprintf(stderr, "Cannot read from a non-InputStream object!\n");
errno = EINVAL;
return -1;
}
return 0;
}
// Common error-handling code between read paths.
static int handleReadResult(int success, jvalue jVal, jthrowable jExc,
JNIEnv* env)
{
int noReadBytes;
if (success != 0) {
if ((*env)->ExceptionCheck(env)) {
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs."
"FSDataInputStream::read");
}
noReadBytes = -1;
} else {
noReadBytes = jVal.i;
if (noReadBytes < 0) {
// -1 from Java is EOF, which is 0 here
noReadBytes = 0;
}
errno = 0;
}
return noReadBytes;
}
tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length) tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
{ {
if (f->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
return readDirect(fs, f, buffer, length);
}
// JAVA EQUIVALENT: // JAVA EQUIVALENT:
// byte [] bR = new byte[length]; // byte [] bR = new byte[length];
// fis.read(bR); // fis.read(bR);
@ -722,46 +795,26 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
} }
//Parameters //Parameters
jobject jInputStream = (jobject)(f ? f->file : NULL); jobject jInputStream;
if (readPrepare(env, fs, f, &jInputStream) == -1) {
return -1;
}
jbyteArray jbRarray; jbyteArray jbRarray;
jint noReadBytes = 0; jint noReadBytes = 0;
jvalue jVal; jvalue jVal;
jthrowable jExc = NULL; jthrowable jExc = NULL;
//Sanity check
if (!f || f->type == UNINITIALIZED) {
errno = EBADF;
return -1;
}
//Error checking... make sure that this file is 'readable'
if (f->type != INPUT) {
fprintf(stderr, "Cannot read from a non-InputStream object!\n");
errno = EINVAL;
return -1;
}
//Read the requisite bytes //Read the requisite bytes
jbRarray = (*env)->NewByteArray(env, length); jbRarray = (*env)->NewByteArray(env, length);
if (invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM,
"read", "([B)I", jbRarray) != 0) { int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream, HADOOP_ISTRM,
errno = errnoFromException(jExc, env, "org.apache.hadoop.fs." "read", "([B)I", jbRarray);
"FSDataInputStream::read");
noReadBytes = -1; noReadBytes = handleReadResult(success, jVal, jExc, env);;
}
else { if (noReadBytes > 0) {
noReadBytes = jVal.i; (*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
if (noReadBytes > 0) {
(*env)->GetByteArrayRegion(env, jbRarray, 0, noReadBytes, buffer);
} else {
//This is a valid case: there aren't any bytes left to read!
if (noReadBytes == 0 || noReadBytes < -1) {
fprintf(stderr, "WARN: FSDataInputStream.read returned invalid return code - libhdfs returning EOF, i.e., 0: %d\n", noReadBytes);
}
noReadBytes = 0;
}
errno = 0;
} }
destroyLocalReference(env, jbRarray); destroyLocalReference(env, jbRarray);
@ -769,6 +822,52 @@ tSize hdfsRead(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
return noReadBytes; return noReadBytes;
} }
// Reads using the read(ByteBuffer) API, which does fewer copies
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length)
{
// JAVA EQUIVALENT:
// ByteBuffer bbuffer = ByteBuffer.allocateDirect(length) // wraps C buffer
// fis.read(bbuffer);
//Get the JNIEnv* corresponding to current thread
JNIEnv* env = getJNIEnv();
if (env == NULL) {
errno = EINTERNAL;
return -1;
}
jobject jInputStream;
if (readPrepare(env, fs, f, &jInputStream) == -1) {
return -1;
}
jint noReadBytes = 0;
jvalue jVal;
jthrowable jExc = NULL;
//Read the requisite bytes
jobject bb = (*env)->NewDirectByteBuffer(env, buffer, length);
if (bb == NULL) {
fprintf(stderr, "Could not allocate ByteBuffer");
if ((*env)->ExceptionCheck(env)) {
errno = errnoFromException(NULL, env, "JNIEnv::NewDirectByteBuffer");
} else {
errno = ENOMEM; // Best guess if there's no exception waiting
}
return -1;
}
int success = invokeMethod(env, &jVal, &jExc, INSTANCE, jInputStream,
HADOOP_ISTRM, "read", "(Ljava/nio/ByteBuffer;)I",
bb);
noReadBytes = handleReadResult(success, jVal, jExc, env);
destroyLocalReference(env, bb);
return noReadBytes;
}
tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position, tSize hdfsPread(hdfsFS fs, hdfsFile f, tOffset position,

View File

@ -81,12 +81,16 @@ extern "C" {
}; };
// Bit fields for hdfsFile_internal flags
#define HDFS_FILE_SUPPORTS_DIRECT_READ (1<<0)
/** /**
* The 'file-handle' to a file in hdfs. * The 'file-handle' to a file in hdfs.
*/ */
struct hdfsFile_internal { struct hdfsFile_internal {
void* file; void* file;
enum hdfsStreamType type; enum hdfsStreamType type;
uint32_t flags;
}; };
typedef struct hdfsFile_internal* hdfsFile; typedef struct hdfsFile_internal* hdfsFile;
@ -203,7 +207,6 @@ extern "C" {
*/ */
tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length); tSize hdfsRead(hdfsFS fs, hdfsFile file, void* buffer, tSize length);
/** /**
* hdfsPread - Positional read of data from an open file. * hdfsPread - Positional read of data from an open file.
* @param fs The configured filesystem handle. * @param fs The configured filesystem handle.

View File

@ -18,6 +18,8 @@
#include "hdfs.h" #include "hdfs.h"
tSize readDirect(hdfsFS fs, hdfsFile f, void* buffer, tSize length);
void permission_disp(short permissions, char *rtr) { void permission_disp(short permissions, char *rtr) {
rtr[9] = '\0'; rtr[9] = '\0';
int i; int i;
@ -51,7 +53,6 @@ void permission_disp(short permissions, char *rtr) {
} }
int main(int argc, char **argv) { int main(int argc, char **argv) {
hdfsFS fs = hdfsConnectNewInstance("default", 0); hdfsFS fs = hdfsConnectNewInstance("default", 0);
if(!fs) { if(!fs) {
fprintf(stderr, "Oops! Failed to connect to hdfs!\n"); fprintf(stderr, "Oops! Failed to connect to hdfs!\n");
@ -64,20 +65,25 @@ int main(int argc, char **argv) {
exit(-1); exit(-1);
} }
const char* writePath = "/tmp/testfile.txt"; const char* writePath = "/tmp/testfile.txt";
const char* fileContents = "Hello, World!";
{ {
//Write tests //Write tests
hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0); hdfsFile writeFile = hdfsOpenFile(fs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
if(!writeFile) { if(!writeFile) {
fprintf(stderr, "Failed to open %s for writing!\n", writePath); fprintf(stderr, "Failed to open %s for writing!\n", writePath);
exit(-1); exit(-1);
} }
fprintf(stderr, "Opened %s for writing successfully...\n", writePath); fprintf(stderr, "Opened %s for writing successfully...\n", writePath);
tSize num_written_bytes =
char* buffer = "Hello, World!"; hdfsWrite(fs, writeFile, (void*)fileContents, strlen(fileContents)+1);
tSize num_written_bytes = hdfsWrite(fs, writeFile, (void*)buffer, strlen(buffer)+1); if (num_written_bytes != strlen(fileContents) + 1) {
fprintf(stderr, "Failed to write correct number of bytes - expected %d, got %d\n",
(int)(strlen(fileContents) + 1), (int)num_written_bytes);
exit(-1);
}
fprintf(stderr, "Wrote %d bytes\n", num_written_bytes); fprintf(stderr, "Wrote %d bytes\n", num_written_bytes);
tOffset currentPos = -1; tOffset currentPos = -1;
@ -138,18 +144,86 @@ int main(int argc, char **argv) {
} }
fprintf(stderr, "Current position: %ld\n", currentPos); fprintf(stderr, "Current position: %ld\n", currentPos);
if ((readFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) == 0) {
fprintf(stderr, "Direct read support incorrectly not detected "
"for HDFS filesystem\n");
exit(-1);
}
fprintf(stderr, "Direct read support detected for HDFS\n");
// Clear flags so that we really go through slow read path
readFile->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
static char buffer[32]; static char buffer[32];
tSize num_read_bytes = hdfsRead(fs, readFile, (void*)buffer, tSize num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
sizeof(buffer)); sizeof(buffer));
fprintf(stderr, "Read following %d bytes:\n%s\n", fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, buffer); num_read_bytes, buffer);
memset(buffer, 0, strlen(fileContents + 1));
num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer, num_read_bytes = hdfsPread(fs, readFile, 0, (void*)buffer,
sizeof(buffer)); sizeof(buffer));
fprintf(stderr, "Read following %d bytes:\n%s\n", fprintf(stderr, "Read following %d bytes:\n%s\n",
num_read_bytes, buffer); num_read_bytes, buffer);
if (hdfsSeek(fs, readFile, 0L)) {
fprintf(stderr,
"Failed to seek to file start for direct read test!\n");
exit(-1);
}
readFile->flags |= HDFS_FILE_SUPPORTS_DIRECT_READ;
memset(buffer, 0, strlen(fileContents + 1));
num_read_bytes = hdfsRead(fs, readFile, (void*)buffer,
sizeof(buffer));
if (strncmp(fileContents, buffer, strlen(fileContents)) != 0) {
fprintf(stderr, "Failed to read (direct). Expected %s but got %s (%d bytes)\n",
fileContents, buffer, num_read_bytes);
exit(-1);
}
fprintf(stderr, "Read (direct) following %d bytes:\n%s\n",
num_read_bytes, buffer);
hdfsCloseFile(fs, readFile); hdfsCloseFile(fs, readFile);
// Test correct behaviour for unsupported filesystems
hdfsFile localFile = hdfsOpenFile(lfs, writePath, O_WRONLY|O_CREAT, 0, 0, 0);
if(!localFile) {
fprintf(stderr, "Failed to open %s for writing!\n", writePath);
exit(-1);
}
tSize num_written_bytes = hdfsWrite(lfs, localFile,
(void*)fileContents,
strlen(fileContents) + 1);
hdfsCloseFile(lfs, localFile);
localFile = hdfsOpenFile(lfs, writePath, O_RDONLY, 0, 0, 0);
if (localFile->flags & HDFS_FILE_SUPPORTS_DIRECT_READ) {
fprintf(stderr, "Direct read support incorrectly detected for local "
"filesystem\n");
exit(-1);
}
memset(buffer, 0, strlen(fileContents + 1));
int result = readDirect(lfs, localFile, (void*)buffer, sizeof(buffer));
if (result != -1) {
fprintf(stderr, "Expected error from local direct read not seen!\n");
exit(-1);
}
if (errno != ENOTSUP) {
fprintf(stderr, "Error code not correctly set to ENOTSUP, was %d!\n",
errno);
exit(-1);
}
fprintf(stderr, "Expected exception thrown for unsupported direct read\n");
hdfsCloseFile(lfs, localFile);
} }
int totalResult = 0; int totalResult = 0;
@ -446,4 +520,3 @@ int main(int argc, char **argv) {
/** /**
* vim: ts=4: sw=4: et: * vim: ts=4: sw=4: et:
*/ */

View File

@ -17,126 +17,64 @@
# #
# #
# Note: This script depends on 8 environment variables to function correctly: # Note: This script depends on 5 environment variables to function correctly:
# a) CLASSPATH # a) HADOOP_HOME - must be set
# b) HADOOP_PREFIX # b) HDFS_TEST_CONF_DIR - optional; the directory to read and write
# c) HADOOP_CONF_DIR # core-site.xml to. Defaults to /tmp
# d) HADOOP_LOG_DIR # c) LIBHDFS_BUILD_DIR - optional; the location of the hdfs_test
# e) LIBHDFS_BUILD_DIR # executable. Defaults to the parent directory.
# f) LIBHDFS_INSTALL_DIR # d) OS_NAME - used to choose how to locate libjvm.so
# g) OS_NAME # e) CLOVER_JAR - optional; the location of the Clover code coverage tool's jar.
# h) CLOVER_JAR
# i} HADOOP_VERSION
# j) HADOOP_HDFS_HOME
# All these are passed by build.xml.
# #
if [ "x$HADOOP_HOME" == "x" ]; then
echo "HADOOP_HOME is unset!"
exit 1
fi
if [ "x$LIBHDFS_BUILD_DIR" == "x" ]; then
LIBHDFS_BUILD_DIR=`pwd`/../
fi
if [ "x$HDFS_TEST_CONF_DIR" == "x" ]; then
HDFS_TEST_CONF_DIR=/tmp
fi
# LIBHDFS_INSTALL_DIR is the directory containing libhdfs.so
LIBHDFS_INSTALL_DIR=$HADOOP_HOME/lib/native/
HDFS_TEST=hdfs_test HDFS_TEST=hdfs_test
HADOOP_LIB_DIR=$HADOOP_PREFIX/lib
HADOOP_BIN_DIR=$HADOOP_PREFIX/bin
COMMON_BUILD_DIR=$HADOOP_PREFIX/build/ivy/lib/hadoop-hdfs/common HDFS_TEST_JAR=`find $HADOOP_HOME/share/hadoop/hdfs/ \
COMMON_JAR=$COMMON_BUILD_DIR/hadoop-common-$HADOOP_VERSION.jar -name "hadoop-hdfs-*-tests.jar" | head -n 1`
cat > $HADOOP_CONF_DIR/core-site.xml <<EOF if [ "x$HDFS_TEST_JAR" == "x" ]; then
<?xml version="1.0"?> echo "HDFS test jar not found! Tried looking in all subdirectories \
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> of $HADOOP_HOME/share/hadoop/hdfs/"
<configuration> exit 1
<property>
<name>hadoop.tmp.dir</name>
<value>file:///$LIBHDFS_TEST_DIR</value>
</property>
<property>
<name>fs.default.name</name>
<value>hdfs://localhost:23000/</value>
</property>
</configuration>
EOF
cat > $HADOOP_CONF_DIR/hdfs-site.xml <<EOF
<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.support.append</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.logging.level</name>
<value>DEBUG</value>
</property>
</configuration>
EOF
cat > $HADOOP_CONF_DIR/slaves <<EOF
localhost
EOF
# If we are running from the hdfs repo we need to make sure
# HADOOP_BIN_DIR contains the common scripts.
# If the bin directory does not and we've got a common jar extract its
# bin directory to HADOOP_PREFIX/bin. The bin scripts hdfs-config.sh and
# hadoop-config.sh assume the bin directory is named "bin" and that it
# is located in HADOOP_PREFIX.
unpacked_common_bin_dir=0
if [ ! -f $HADOOP_BIN_DIR/hadoop-config.sh ]; then
if [ -f $COMMON_JAR ]; then
jar xf $COMMON_JAR bin.tgz
tar xfz bin.tgz -C $HADOOP_BIN_DIR
unpacked_common_bin_dir=1
fi
fi fi
# Manipulate HADOOP_CONF_DIR too echo "Found HDFS test jar at $HDFS_TEST_JAR"
# which is necessary to circumvent bin/hadoop
HADOOP_CONF_DIR=$HADOOP_CONF_DIR:$HADOOP_PREFIX/conf
# set pid file dir so they are not written to /tmp # CLASSPATH initially contains $HDFS_TEST_CONF_DIR
export HADOOP_PID_DIR=$HADOOP_LOG_DIR CLASSPATH="${HDFS_TEST_CONF_DIR}"
# CLASSPATH initially contains $HADOOP_CONF_DIR
CLASSPATH="${HADOOP_CONF_DIR}"
CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar CLASSPATH=${CLASSPATH}:$JAVA_HOME/lib/tools.jar
# for developers, add Hadoop classes to CLASSPATH
if [ -d "$HADOOP_PREFIX/build/classes" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/build/classes
fi
if [ -d "$HADOOP_PREFIX/build/web/webapps" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/build/web
fi
if [ -d "$HADOOP_PREFIX/build/test/classes" ]; then
CLASSPATH=${CLASSPATH}:$HADOOP_PREFIX/build/test/classes
fi
# add Clover jar file needed for code coverage runs # add Clover jar file needed for code coverage runs
CLASSPATH=${CLASSPATH}:${CLOVER_JAR}; CLASSPATH=${CLASSPATH}:${CLOVER_JAR};
# so that filenames w/ spaces are handled correctly in loops below # so that filenames w/ spaces are handled correctly in loops below
IFS= IFS=$'\n'
# add libs to CLASSPATH JAR_DIRS="$HADOOP_HOME/share/hadoop/common/lib/
for f in $HADOOP_PREFIX/lib/*.jar; do $HADOOP_HOME/share/hadoop/common/
CLASSPATH=${CLASSPATH}:$f; $HADOOP_HOME/share/hadoop/hdfs
done $HADOOP_HOME/share/hadoop/hdfs/lib/"
for f in $HADOOP_PREFIX/*.jar; do for d in $JAR_DIRS; do
CLASSPATH=${CLASSPATH}:$f for j in $d/*.jar; do
done CLASSPATH=${CLASSPATH}:$j
for f in $HADOOP_PREFIX/lib/jsp-2.1/*.jar; do done;
CLASSPATH=${CLASSPATH}:$f; done;
done
if [ -d "$COMMON_BUILD_DIR" ]; then
CLASSPATH=$CLASSPATH:$COMMON_JAR
for f in $COMMON_BUILD_DIR/*.jar; do
CLASSPATH=${CLASSPATH}:$f;
done
fi
# restore ordinary behaviour # restore ordinary behaviour
unset IFS unset IFS
@ -178,21 +116,37 @@ echo LIB_JVM_DIR = $LIB_JVM_DIR
echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++" echo "++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++"
# Put delays to ensure hdfs is up and running and also shuts down # Put delays to ensure hdfs is up and running and also shuts down
# after the tests are complete # after the tests are complete
cd $HADOOP_PREFIX rm $HDFS_TEST_CONF_DIR/core-site.xml
echo Y | $HADOOP_BIN_DIR/hdfs namenode -format &&
$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs start namenode && sleep 2
$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs start datanode && sleep 2
echo "Wait 30s for the datanode to start up..."
sleep 30
CLASSPATH=$CLASSPATH LD_PRELOAD="$LIB_JVM_DIR/libjvm.so:$LIBHDFS_INSTALL_DIR/libhdfs.so:" $LIBHDFS_BUILD_DIR/$HDFS_TEST
BUILD_STATUS=$?
sleep 3
$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs stop datanode && sleep 2
$HADOOP_BIN_DIR/hadoop-daemon.sh --script $HADOOP_BIN_DIR/hdfs stop namenode && sleep 2
if [ $unpacked_common_bin_dir -eq 1 ]; then $HADOOP_HOME/bin/hadoop jar $HDFS_TEST_JAR \
rm -rf bin.tgz org.apache.hadoop.test.MiniDFSClusterManager \
-format -nnport 20300 -writeConfig $HDFS_TEST_CONF_DIR/core-site.xml \
> /tmp/libhdfs-test-cluster.out 2>&1 &
MINI_CLUSTER_PID=$!
for i in {1..15}; do
echo "Waiting for DFS cluster, attempt $i of 15"
[ -f $HDFS_TEST_CONF_DIR/core-site.xml ] && break;
sleep 2
done
if [ ! -f $HDFS_TEST_CONF_DIR/core-site.xml ]; then
echo "Cluster did not come up in 30s"
kill -9 $MINI_CLUSTER_PID
exit 1
fi fi
echo exiting with $BUILD_STATUS echo "Cluster up, running tests"
# Disable error checking to make sure we get to cluster cleanup
set +e
CLASSPATH=$CLASSPATH \
LD_PRELOAD="$LIB_JVM_DIR/libjvm.so:$LIBHDFS_INSTALL_DIR/libhdfs.so:" \
$LIBHDFS_BUILD_DIR/$HDFS_TEST
BUILD_STATUS=$?
echo "Tearing cluster down"
kill -9 $MINI_CLUSTER_PID
echo "Exiting with $BUILD_STATUS"
exit $BUILD_STATUS exit $BUILD_STATUS

View File

@ -36,16 +36,18 @@ message JournalInfoProto {
} }
/** /**
* JournalInfo - the information about the journal * journalInfo - the information about the journal
* firstTxnId - the first txid in the journal records * firstTxnId - the first txid in the journal records
* numTxns - Number of transactions in editlog * numTxns - Number of transactions in editlog
* records - bytes containing serialized journal records * records - bytes containing serialized journal records
* epoch - change to this represents change of journal writer
*/ */
message JournalRequestProto { message JournalRequestProto {
required JournalInfoProto journalInfo = 1; required JournalInfoProto journalInfo = 1;
required uint64 firstTxnId = 2; required uint64 firstTxnId = 2;
required uint32 numTxns = 3; required uint32 numTxns = 3;
required bytes records = 4; required bytes records = 4;
required uint64 epoch = 5;
} }
/** /**
@ -55,12 +57,13 @@ message JournalResponseProto {
} }
/** /**
* JournalInfo - the information about the journal * journalInfo - the information about the journal
* txid - first txid in the new log * txid - first txid in the new log
*/ */
message StartLogSegmentRequestProto { message StartLogSegmentRequestProto {
required JournalInfoProto journalInfo = 1; required JournalInfoProto journalInfo = 1; // Info about the journal
required uint64 txid = 2; required uint64 txid = 2; // Transaction ID
required uint64 epoch = 3;
} }
/** /**
@ -69,6 +72,27 @@ message StartLogSegmentRequestProto {
message StartLogSegmentResponseProto { message StartLogSegmentResponseProto {
} }
/**
* journalInfo - the information about the journal
* txid - first txid in the new log
*/
message FenceRequestProto {
required JournalInfoProto journalInfo = 1; // Info about the journal
required uint64 epoch = 2; // Epoch - change indicates change in writer
optional string fencerInfo = 3; // Info about fencer for debugging
}
/**
* previousEpoch - previous epoch if any or zero
* lastTransactionId - last valid transaction Id in the journal
* inSync - if all journal segments are available and in sync
*/
message FenceResponseProto {
optional uint64 previousEpoch = 1;
optional uint64 lastTransactionId = 2;
optional bool inSync = 3;
}
/** /**
* Protocol used to journal edits to a remote node. Currently, * Protocol used to journal edits to a remote node. Currently,
* this is used to publish edits from the NameNode to a BackupNode. * this is used to publish edits from the NameNode to a BackupNode.
@ -89,4 +113,10 @@ service JournalProtocolService {
*/ */
rpc startLogSegment(StartLogSegmentRequestProto) rpc startLogSegment(StartLogSegmentRequestProto)
returns (StartLogSegmentResponseProto); returns (StartLogSegmentResponseProto);
/**
* Request to fence a journal receiver.
*/
rpc fence(FenceRequestProto)
returns (FenceResponseProto);
} }

View File

@ -0,0 +1,176 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.FsConstants;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.DataInputBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MIN_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import static org.junit.Assert.*;
/**
* Tests for viewfs implementation of default fs level values.
* This tests for both passing in a path (based on mount point)
* to obtain the default value of the fs that the path is mounted on
* or just passing in no arguments.
*/
public class TestViewFsDefaultValue {
static final String testFileDir = "/tmp/test/";
static final String testFileName = testFileDir + "testFileStatusSerialziation";
private static MiniDFSCluster cluster;
private static Configuration CONF = new Configuration();
private static FileSystem fHdfs;
private static FileSystem vfs;
private static Path testFilePath;
private static Path testFileDirPath;
@BeforeClass
public static void clusterSetupAtBegining() throws IOException,
LoginException, URISyntaxException {
CONF.setLong(DFS_BLOCK_SIZE_KEY, DFS_BLOCK_SIZE_DEFAULT);
CONF.setInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT);
CONF.setInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY,
DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
CONF.setInt(DFS_REPLICATION_KEY, DFS_REPLICATION_DEFAULT + 1);
CONF.setInt(IO_FILE_BUFFER_SIZE_KEY, IO_FILE_BUFFER_SIZE_DEFAULT);
cluster = new MiniDFSCluster.Builder(CONF).numDataNodes(DFS_REPLICATION_DEFAULT + 1).build();
cluster.waitClusterUp();
fHdfs = cluster.getFileSystem();
FileSystemTestHelper.createFile(fHdfs, testFileName);
Configuration conf = ViewFileSystemTestSetup.createConfig();
ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri().toString() +
"/tmp"));
vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
testFileDirPath = new Path (testFileDir);
testFilePath = new Path (testFileName);
}
/**
* Test that default blocksize values can be retrieved on the client side.
*/
@Test
public void testGetDefaultBlockSize()
throws IOException, URISyntaxException {
// createFile does not use defaultBlockSize to create the file,
// but we are only looking at the defaultBlockSize, so this
// test should still pass
try {
vfs.getDefaultBlockSize();
fail("getServerDefaults on viewFs did not throw excetion!");
} catch (NotInMountpointException e) {
assertEquals(vfs.getDefaultBlockSize(testFilePath),
DFS_BLOCK_SIZE_DEFAULT);
}
}
/**
* Test that default replication values can be retrieved on the client side.
*/
@Test
public void testGetDefaultReplication()
throws IOException, URISyntaxException {
try {
vfs.getDefaultReplication();
fail("getDefaultReplication on viewFs did not throw excetion!");
} catch (NotInMountpointException e) {
assertEquals(vfs.getDefaultReplication(testFilePath),
DFS_REPLICATION_DEFAULT+1);
}
}
/**
* Test that server default values can be retrieved on the client side.
*/
@Test
public void testServerDefaults() throws IOException {
try {
FsServerDefaults serverDefaults = vfs.getServerDefaults();
fail("getServerDefaults on viewFs did not throw excetion!");
} catch (NotInMountpointException e) {
FsServerDefaults serverDefaults = vfs.getServerDefaults(testFilePath);
assertEquals(DFS_BLOCK_SIZE_DEFAULT, serverDefaults.getBlockSize());
assertEquals(DFS_BYTES_PER_CHECKSUM_DEFAULT,
serverDefaults.getBytesPerChecksum());
assertEquals(DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT,
serverDefaults.getWritePacketSize());
assertEquals(IO_FILE_BUFFER_SIZE_DEFAULT,
serverDefaults.getFileBufferSize());
assertEquals(DFS_REPLICATION_DEFAULT + 1,
serverDefaults.getReplication());
}
}
/**
* Test that getContentSummary can be retrieved on the client side.
*/
@Test
public void testGetContentSummary() throws IOException {
FileSystem hFs = cluster.getFileSystem(0);
final DistributedFileSystem dfs = (DistributedFileSystem)hFs;
dfs.setQuota(testFileDirPath, 100, 500);
ContentSummary cs = vfs.getContentSummary(testFileDirPath);
assertEquals(100, cs.getQuota());
assertEquals(500, cs.getSpaceQuota());
}
@AfterClass
public static void cleanup() throws IOException {
fHdfs.delete(new Path(testFileName), true);
}
}

View File

@ -30,6 +30,7 @@
import javax.security.auth.login.LoginException; import javax.security.auth.login.LoginException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileChecksum;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FileSystemTestHelper;
@ -48,13 +49,13 @@
public class TestViewFsFileStatusHdfs { public class TestViewFsFileStatusHdfs {
static final String testfilename = "/tmp/testFileStatusSerialziation"; static final String testfilename = "/tmp/testFileStatusSerialziation";
static final String someFile = "/hdfstmp/someFileForTestGetFileChecksum";
private static MiniDFSCluster cluster; private static MiniDFSCluster cluster;
private static Path defaultWorkingDirectory; private static Path defaultWorkingDirectory;
private static Configuration CONF = new Configuration(); private static Configuration CONF = new Configuration();
private static FileSystem fHdfs; private static FileSystem fHdfs;
private static FileSystem vfs;
@BeforeClass @BeforeClass
public static void clusterSetupAtBegining() throws IOException, public static void clusterSetupAtBegining() throws IOException,
@ -65,18 +66,19 @@ public static void clusterSetupAtBegining() throws IOException,
defaultWorkingDirectory = fHdfs.makeQualified( new Path("/user/" + defaultWorkingDirectory = fHdfs.makeQualified( new Path("/user/" +
UserGroupInformation.getCurrentUser().getShortUserName())); UserGroupInformation.getCurrentUser().getShortUserName()));
fHdfs.mkdirs(defaultWorkingDirectory); fHdfs.mkdirs(defaultWorkingDirectory);
// Setup the ViewFS to be used for all tests.
Configuration conf = ViewFileSystemTestSetup.createConfig();
ConfigUtil.addLink(conf, "/vfstmp", new URI(fHdfs.getUri() + "/hdfstmp"));
ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri() + "/tmp"));
vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
assertEquals(ViewFileSystem.class, vfs.getClass());
} }
@Test @Test
public void testFileStatusSerialziation() public void testFileStatusSerialziation()
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
long len = FileSystemTestHelper.createFile(fHdfs, testfilename); long len = FileSystemTestHelper.createFile(fHdfs, testfilename);
Configuration conf = ViewFileSystemTestSetup.createConfig();
ConfigUtil.addLink(conf, "/tmp", new URI(fHdfs.getUri().toString() + "/tmp"));
FileSystem vfs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
assertEquals(ViewFileSystem.class, vfs.getClass());
FileStatus stat = vfs.getFileStatus(new Path(testfilename)); FileStatus stat = vfs.getFileStatus(new Path(testfilename));
assertEquals(len, stat.getLen()); assertEquals(len, stat.getLen());
// check serialization/deserialization // check serialization/deserialization
@ -89,9 +91,34 @@ public void testFileStatusSerialziation()
assertEquals(len, deSer.getLen()); assertEquals(len, deSer.getLen());
} }
@Test
public void testGetFileChecksum() throws IOException, URISyntaxException {
// Create two different files in HDFS
FileSystemTestHelper.createFile(fHdfs, someFile);
FileSystemTestHelper.createFile(fHdfs, FileSystemTestHelper
.getTestRootPath(fHdfs, someFile + "other"), 1, 512);
// Get checksum through ViewFS
FileChecksum viewFSCheckSum = vfs.getFileChecksum(
new Path("/vfstmp/someFileForTestGetFileChecksum"));
// Get checksum through HDFS.
FileChecksum hdfsCheckSum = fHdfs.getFileChecksum(
new Path(someFile));
// Get checksum of different file in HDFS
FileChecksum otherHdfsFileCheckSum = fHdfs.getFileChecksum(
new Path(someFile+"other"));
// Checksums of the same file (got through HDFS and ViewFS should be same)
assertEquals("HDFS and ViewFS checksums were not the same", viewFSCheckSum,
hdfsCheckSum);
// Checksum of different files should be different.
assertFalse("Some other HDFS file which should not have had the same " +
"checksum as viewFS did!", viewFSCheckSum.equals(otherHdfsFileCheckSum));
}
@AfterClass @AfterClass
public static void cleanup() throws IOException { public static void cleanup() throws IOException {
fHdfs.delete(new Path(testfilename), true); fHdfs.delete(new Path(testfilename), true);
fHdfs.delete(new Path(someFile), true);
fHdfs.delete(new Path(someFile + "other"), true);
} }
} }

View File

@ -20,12 +20,18 @@
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.FencedException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -42,7 +48,7 @@ public class TestJournalService {
* called. * called.
*/ */
@Test @Test
public void testCallBacks() throws IOException { public void testCallBacks() throws Exception {
JournalListener listener = Mockito.mock(JournalListener.class); JournalListener listener = Mockito.mock(JournalListener.class);
JournalService service = null; JournalService service = null;
try { try {
@ -51,6 +57,7 @@ public void testCallBacks() throws IOException {
service = startJournalService(listener); service = startJournalService(listener);
verifyRollLogsCallback(service, listener); verifyRollLogsCallback(service, listener);
verifyJournalCallback(service, listener); verifyJournalCallback(service, listener);
verifyFence(service, cluster.getNameNode(0));
} finally { } finally {
if (service != null) { if (service != null) {
service.stop(); service.stop();
@ -93,4 +100,28 @@ private void verifyJournalCallback(JournalService s, JournalListener l) throws I
Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s), Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s),
Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any()); Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
} }
public void verifyFence(JournalService s, NameNode nn) throws Exception {
String cid = nn.getNamesystem().getClusterId();
int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
// Fence the journal service
JournalInfo info = new JournalInfo(lv, cid, nsId);
long currentEpoch = s.getEpoch();
// New epoch lower than the current epoch is rejected
try {
s.fence(info, (currentEpoch - 1), "fencer");
} catch (FencedException ignore) { /* Ignored */ }
// New epoch equal to the current epoch is rejected
try {
s.fence(info, currentEpoch, "fencer");
} catch (FencedException ignore) { /* Ignored */ }
// New epoch higher than the current epoch is successful
FenceResponse resp = s.fence(info, currentEpoch+1, "fencer");
Assert.assertNotNull(resp);
}
} }

View File

@ -42,6 +42,8 @@
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Joiner;
/** /**
* Test for {@link GetConf} * Test for {@link GetConf}
*/ */
@ -117,7 +119,12 @@ private String runTool(HdfsConfiguration conf, String[] args, boolean success)
PrintStream out = new PrintStream(o, true); PrintStream out = new PrintStream(o, true);
try { try {
int ret = ToolRunner.run(new GetConf(conf, out, out), args); int ret = ToolRunner.run(new GetConf(conf, out, out), args);
assertEquals(success, ret == 0); out.flush();
System.err.println("Output: " + o.toString());
assertEquals("Expected " + (success?"success":"failure") +
" for args: " + Joiner.on(" ").join(args) + "\n" +
"Output: " + o.toString(),
success, ret == 0);
return o.toString(); return o.toString();
} finally { } finally {
o.close(); o.close();
@ -222,7 +229,9 @@ public void testEmptyConf() throws Exception {
getAddressListFromTool(TestType.SECONDARY, conf, false); getAddressListFromTool(TestType.SECONDARY, conf, false);
getAddressListFromTool(TestType.NNRPCADDRESSES, conf, false); getAddressListFromTool(TestType.NNRPCADDRESSES, conf, false);
for (Command cmd : Command.values()) { for (Command cmd : Command.values()) {
CommandHandler handler = Command.getHandler(cmd.getName()); String arg = cmd.getName();
CommandHandler handler = Command.getHandler(arg);
assertNotNull("missing handler: " + cmd, handler);
if (handler.key != null) { if (handler.key != null) {
// First test with configuration missing the required key // First test with configuration missing the required key
String[] args = {handler.key}; String[] args = {handler.key};
@ -319,18 +328,36 @@ public void testFederation() throws Exception {
verifyAddresses(conf, TestType.SECONDARY, false, secondaryAddresses); verifyAddresses(conf, TestType.SECONDARY, false, secondaryAddresses);
verifyAddresses(conf, TestType.NNRPCADDRESSES, true, nnAddresses); verifyAddresses(conf, TestType.NNRPCADDRESSES, true, nnAddresses);
} }
@Test
public void testGetSpecificKey() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.set("mykey", " myval ");
String[] args = {"-confKey", "mykey"};
assertTrue(runTool(conf, args, true).equals("myval\n"));
}
@Test
public void testExtraArgsThrowsError() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
conf.set("mykey", "myval");
String[] args = {"-namenodes", "unexpected-arg"};
assertTrue(runTool(conf, args, false).contains(
"Did not expect argument: unexpected-arg"));
}
/** /**
* Tests commands other than {@link Command#NAMENODE}, {@link Command#BACKUP}, * Tests commands other than {@link Command#NAMENODE}, {@link Command#BACKUP},
* {@link Command#SECONDARY} and {@link Command#NNRPCADDRESSES} * {@link Command#SECONDARY} and {@link Command#NNRPCADDRESSES}
*/ */
@Test
public void testTool() throws Exception { public void testTool() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration(false); HdfsConfiguration conf = new HdfsConfiguration(false);
for (Command cmd : Command.values()) { for (Command cmd : Command.values()) {
CommandHandler handler = Command.getHandler(cmd.getName()); CommandHandler handler = Command.getHandler(cmd.getName());
if (handler.key != null) { if (handler.key != null && !"-confKey".equals(cmd.getName())) {
// Add the key to the conf and ensure tool returns the right value // Add the key to the conf and ensure tool returns the right value
String[] args = {handler.key}; String[] args = {cmd.getName()};
conf.set(handler.key, "value"); conf.set(handler.key, "value");
assertTrue(runTool(conf, args, true).contains("value")); assertTrue(runTool(conf, args, true).contains("value"));
} }

View File

@ -204,7 +204,20 @@ Release 2.0.0 - UNRELEASED
MAPREDUCE-4097. tools testcases fail because missing mrapp-generated-classpath MAPREDUCE-4097. tools testcases fail because missing mrapp-generated-classpath
file in classpath (rvs via tucu) file in classpath (rvs via tucu)
MAPREDUCE-4113. Fix tests org.apache.hadoop.mapred.TestClusterMRNotification
(Devaraj K via bobby)
MAPREDUCE-4112. Fix tests org.apache.hadoop.mapred.TestClusterMapReduceTestCase
(Devaraj K via bobby)
MAPREDUCE-4111. Fix tests in org.apache.hadoop.mapred.TestJobName (Devaraj
K via bobby)
MAPREDUCE-4110. Fix tests in org.apache.hadoop.mapred.TestMiniMRClasspath &
org.apache.hadoop.mapred.TestMiniMRWithDFSWithDistinctUsers (Devaraj K via
bobby)
Release 0.23.3 - UNRELEASED Release 0.23.3 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -264,6 +277,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4073. CS assigns multiple off-switch containers when using MAPREDUCE-4073. CS assigns multiple off-switch containers when using
multi-level-queues (Siddharth Seth via bobby) multi-level-queues (Siddharth Seth via bobby)
MAPREDUCE-4051. Remove the empty hadoop-mapreduce-project/assembly/all.xml
file (Ravi Prakash via bobby)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -20,12 +20,9 @@
import java.io.IOException; import java.io.IOException;
import org.junit.Ignore;
/** /**
* Tests Job end notification in cluster mode. * Tests Job end notification in cluster mode.
*/ */
@Ignore
public class TestClusterMRNotification extends NotificationTestCase { public class TestClusterMRNotification extends NotificationTestCase {
public TestClusterMRNotification() throws IOException { public TestClusterMRNotification() throws IOException {

View File

@ -17,15 +17,18 @@
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.util.Properties;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.junit.Ignore;
import java.io.*;
import java.util.Properties;
@Ignore
public class TestClusterMapReduceTestCase extends ClusterMapReduceTestCase { public class TestClusterMapReduceTestCase extends ClusterMapReduceTestCase {
public void _testMapReduce(boolean restart) throws Exception { public void _testMapReduce(boolean restart) throws Exception {
OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt")); OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));

View File

@ -18,23 +18,17 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.io.BufferedReader; import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.InputStreamReader; import java.io.InputStreamReader;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.OutputStreamWriter; import java.io.OutputStreamWriter;
import java.io.Writer; import java.io.Writer;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.serializer.JavaSerializationComparator;
import org.apache.hadoop.mapred.lib.IdentityMapper; import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.junit.Ignore;
@Ignore
public class TestJobName extends ClusterMapReduceTestCase { public class TestJobName extends ClusterMapReduceTestCase {
public void testComplexName() throws Exception { public void testComplexName() throws Exception {

View File

@ -18,47 +18,43 @@
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.io.*; import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI; import java.net.URI;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.junit.Ignore; import org.junit.Assert;
import org.junit.Test;
/** /**
* A JUnit test to test Mini Map-Reduce Cluster with multiple directories * A JUnit test to test Mini Map-Reduce Cluster with multiple directories
* and check for correct classpath * and check for correct classpath
*/ */
@Ignore public class TestMiniMRClasspath {
public class TestMiniMRClasspath extends TestCase {
static void configureWordCount(FileSystem fs, static void configureWordCount(FileSystem fs, JobConf conf, String input,
String jobTracker, int numMaps, int numReduces, Path inDir, Path outDir) throws IOException {
JobConf conf,
String input,
int numMaps,
int numReduces,
Path inDir, Path outDir) throws IOException {
fs.delete(outDir, true); fs.delete(outDir, true);
if (!fs.mkdirs(inDir)) { if (!fs.mkdirs(inDir)) {
throw new IOException("Mkdirs failed to create " + inDir.toString()); throw new IOException("Mkdirs failed to create " + inDir.toString());
} }
{ DataOutputStream file = fs.create(new Path(inDir, "part-0"));
DataOutputStream file = fs.create(new Path(inDir, "part-0")); file.writeBytes(input);
file.writeBytes(input); file.close();
file.close();
}
FileSystem.setDefaultUri(conf, fs.getUri()); FileSystem.setDefaultUri(conf, fs.getUri());
conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.CLASSIC_FRAMEWORK_NAME); conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.YARN_FRAMEWORK_NAME);
conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
conf.setJobName("wordcount"); conf.setJobName("wordcount");
conf.setInputFormat(TextInputFormat.class); conf.setInputFormat(TextInputFormat.class);
@ -74,18 +70,17 @@ static void configureWordCount(FileSystem fs,
FileOutputFormat.setOutputPath(conf, outDir); FileOutputFormat.setOutputPath(conf, outDir);
conf.setNumMapTasks(numMaps); conf.setNumMapTasks(numMaps);
conf.setNumReduceTasks(numReduces); conf.setNumReduceTasks(numReduces);
//pass a job.jar already included in the hadoop build //set the tests jar file
conf.setJar("build/test/mapred/testjar/testjob.jar"); conf.setJarByClass(TestMiniMRClasspath.class);
} }
static String launchWordCount(URI fileSys, String jobTracker, JobConf conf, static String launchWordCount(URI fileSys, JobConf conf, String input,
String input, int numMaps, int numReduces) int numMaps, int numReduces)
throws IOException { throws IOException {
final Path inDir = new Path("/testing/wc/input"); final Path inDir = new Path("/testing/wc/input");
final Path outDir = new Path("/testing/wc/output"); final Path outDir = new Path("/testing/wc/output");
FileSystem fs = FileSystem.get(fileSys, conf); FileSystem fs = FileSystem.get(fileSys, conf);
configureWordCount(fs, jobTracker, conf, input, numMaps, numReduces, inDir, configureWordCount(fs, conf, input, numMaps, numReduces, inDir, outDir);
outDir);
JobClient.runJob(conf); JobClient.runJob(conf);
StringBuffer result = new StringBuffer(); StringBuffer result = new StringBuffer();
{ {
@ -107,8 +102,8 @@ static String launchWordCount(URI fileSys, String jobTracker, JobConf conf,
return result.toString(); return result.toString();
} }
static String launchExternal(URI uri, String jobTracker, JobConf conf, static String launchExternal(URI uri, JobConf conf, String input,
String input, int numMaps, int numReduces) int numMaps, int numReduces)
throws IOException { throws IOException {
final Path inDir = new Path("/testing/ext/input"); final Path inDir = new Path("/testing/ext/input");
@ -124,8 +119,7 @@ static String launchExternal(URI uri, String jobTracker, JobConf conf,
file.close(); file.close();
} }
FileSystem.setDefaultUri(conf, uri); FileSystem.setDefaultUri(conf, uri);
conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.CLASSIC_FRAMEWORK_NAME); conf.set(JTConfig.FRAMEWORK_NAME, JTConfig.YARN_FRAMEWORK_NAME);
conf.set(JTConfig.JT_IPC_ADDRESS, jobTracker);
conf.setJobName("wordcount"); conf.setJobName("wordcount");
conf.setInputFormat(TextInputFormat.class); conf.setInputFormat(TextInputFormat.class);
@ -142,8 +136,8 @@ static String launchExternal(URI uri, String jobTracker, JobConf conf,
conf.set("mapred.mapper.class", "testjar.ExternalMapperReducer"); conf.set("mapred.mapper.class", "testjar.ExternalMapperReducer");
conf.set("mapred.reducer.class", "testjar.ExternalMapperReducer"); conf.set("mapred.reducer.class", "testjar.ExternalMapperReducer");
//pass a job.jar already included in the hadoop build // set the tests jar file
conf.setJar("build/test/mapred/testjar/testjob.jar"); conf.setJarByClass(TestMiniMRClasspath.class);
JobClient.runJob(conf); JobClient.runJob(conf);
StringBuffer result = new StringBuffer(); StringBuffer result = new StringBuffer();
Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir, Path[] fileList = FileUtil.stat2Paths(fs.listStatus(outDir,
@ -164,6 +158,7 @@ static String launchExternal(URI uri, String jobTracker, JobConf conf,
return result.toString(); return result.toString();
} }
@Test
public void testClassPath() throws IOException { public void testClassPath() throws IOException {
String namenode = null; String namenode = null;
MiniDFSCluster dfs = null; MiniDFSCluster dfs = null;
@ -180,13 +175,10 @@ public void testClassPath() throws IOException {
mr = new MiniMRCluster(taskTrackers, namenode, 3); mr = new MiniMRCluster(taskTrackers, namenode, 3);
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf();
String result; String result;
final String jobTrackerName = "localhost:" + mr.getJobTrackerPort(); result = launchWordCount(fileSys.getUri(), jobConf,
result = launchWordCount(fileSys.getUri(), jobTrackerName, jobConf, "The quick brown fox\nhas many silly\n" + "red fox sox\n", 3, 1);
"The quick brown fox\nhas many silly\n" + Assert.assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n"
"red fox sox\n", + "quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
3, 1);
assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
"quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result);
} finally { } finally {
if (dfs != null) { dfs.shutdown(); } if (dfs != null) { dfs.shutdown(); }
@ -195,6 +187,7 @@ public void testClassPath() throws IOException {
} }
} }
@Test
public void testExternalWritable() public void testExternalWritable()
throws IOException { throws IOException {
@ -214,12 +207,10 @@ public void testExternalWritable()
mr = new MiniMRCluster(taskTrackers, namenode, 3); mr = new MiniMRCluster(taskTrackers, namenode, 3);
JobConf jobConf = new JobConf(); JobConf jobConf = new JobConf();
String result; String result;
final String jobTrackerName = "localhost:" + mr.getJobTrackerPort();
result = launchExternal(fileSys.getUri(), jobTrackerName, jobConf, result = launchExternal(fileSys.getUri(), jobConf,
"Dennis was here!\nDennis again!", "Dennis was here!\nDennis again!", 3, 1);
3, 1); Assert.assertEquals("Dennis again!\t1\nDennis was here!\t1\n", result);
assertEquals("Dennis again!\t1\nDennis was here!\t1\n", result);
} }
finally { finally {

View File

@ -17,26 +17,26 @@
*/ */
package org.apache.hadoop.mapred; package org.apache.hadoop.mapred;
import java.io.*; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig; import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.security.*; import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Ignore; import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/** /**
* A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS. * A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
*/ */
@Ignore public class TestMiniMRWithDFSWithDistinctUsers {
public class TestMiniMRWithDFSWithDistinctUsers extends TestCase {
static final UserGroupInformation DFS_UGI = createUGI("dfs", true); static final UserGroupInformation DFS_UGI = createUGI("dfs", true);
static final UserGroupInformation ALICE_UGI = createUGI("alice", false); static final UserGroupInformation ALICE_UGI = createUGI("alice", false);
static final UserGroupInformation BOB_UGI = createUGI("bob", false); static final UserGroupInformation BOB_UGI = createUGI("bob", false);
@ -45,7 +45,6 @@ public class TestMiniMRWithDFSWithDistinctUsers extends TestCase {
MiniDFSCluster dfs = null; MiniDFSCluster dfs = null;
FileSystem fs = null; FileSystem fs = null;
Configuration conf = new Configuration(); Configuration conf = new Configuration();
String jobTrackerName;
static UserGroupInformation createUGI(String name, boolean issuper) { static UserGroupInformation createUGI(String name, boolean issuper) {
String group = issuper? "supergroup": name; String group = issuper? "supergroup": name;
@ -71,9 +70,10 @@ public RunningJob run() throws IOException {
}); });
rj.waitForCompletion(); rj.waitForCompletion();
assertEquals("SUCCEEDED", JobStatus.getJobRunState(rj.getJobState())); Assert.assertEquals("SUCCEEDED", JobStatus.getJobRunState(rj.getJobState()));
} }
@Before
public void setUp() throws Exception { public void setUp() throws Exception {
dfs = new MiniDFSCluster(conf, 4, true, null); dfs = new MiniDFSCluster(conf, 4, true, null);
@ -96,29 +96,30 @@ public FileSystem run() throws IOException {
mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(), mr = new MiniMRCluster(0, 0, 4, dfs.getFileSystem().getUri().toString(),
1, null, null, MR_UGI, mrConf); 1, null, null, MR_UGI, mrConf);
jobTrackerName = "localhost:" + mr.getJobTrackerPort();
} }
@After
public void tearDown() throws Exception { public void tearDown() throws Exception {
if (mr != null) { mr.shutdown();} if (mr != null) { mr.shutdown();}
if (dfs != null) { dfs.shutdown(); } if (dfs != null) { dfs.shutdown(); }
} }
@Test
public void testDistinctUsers() throws Exception { public void testDistinctUsers() throws Exception {
JobConf job1 = mr.createJobConf(); JobConf job1 = mr.createJobConf();
String input = "The quick brown fox\nhas many silly\n" String input = "The quick brown fox\nhas many silly\n"
+ "red fox sox\n"; + "red fox sox\n";
Path inDir = new Path("/testing/distinct/input"); Path inDir = new Path("/testing/distinct/input");
Path outDir = new Path("/user/alice/output"); Path outDir = new Path("/user/alice/output");
TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1, TestMiniMRClasspath
input, 2, 1, inDir, outDir); .configureWordCount(fs, job1, input, 2, 1, inDir, outDir);
runJobAsUser(job1, ALICE_UGI); runJobAsUser(job1, ALICE_UGI);
JobConf job2 = mr.createJobConf(); JobConf job2 = mr.createJobConf();
Path inDir2 = new Path("/testing/distinct/input2"); Path inDir2 = new Path("/testing/distinct/input2");
Path outDir2 = new Path("/user/bob/output2"); Path outDir2 = new Path("/user/bob/output2");
TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job2, TestMiniMRClasspath.configureWordCount(fs, job2, input, 2, 1, inDir2,
input, 2, 1, inDir2, outDir2); outDir2);
runJobAsUser(job2, BOB_UGI); runJobAsUser(job2, BOB_UGI);
} }
@ -127,6 +128,7 @@ public void testDistinctUsers() throws Exception {
* task makes lots of spills (more than fit in the spill index cache) * task makes lots of spills (more than fit in the spill index cache)
* that it will succeed. * that it will succeed.
*/ */
@Test
public void testMultipleSpills() throws Exception { public void testMultipleSpills() throws Exception {
JobConf job1 = mr.createJobConf(); JobConf job1 = mr.createJobConf();
@ -141,8 +143,8 @@ public void testMultipleSpills() throws Exception {
+ "red fox sox\n"; + "red fox sox\n";
Path inDir = new Path("/testing/distinct/input"); Path inDir = new Path("/testing/distinct/input");
Path outDir = new Path("/user/alice/output"); Path outDir = new Path("/user/alice/output");
TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1, TestMiniMRClasspath
input, 2, 1, inDir, outDir); .configureWordCount(fs, job1, input, 2, 1, inDir, outDir);
runJobAsUser(job1, ALICE_UGI); runJobAsUser(job1, ALICE_UGI);
} }
} }

View File

@ -155,4 +155,12 @@ public synchronized void setService(String service) {
builder.setService((service)); builder.setService((service));
} }
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("ContainerToken { ");
sb.append("kind: ").append(getKind()).append(", ");
sb.append("service: ").append(getService()).append(" }");
return sb.toString();
}
} }