Merge r1410998 through r1412282 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2802@1412297 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
commit
3dd84fcef7
|
@ -132,6 +132,12 @@ Trunk (Unreleased)
|
|||
HADOOP-9004. Allow security unit tests to use external KDC. (Stephen Chu
|
||||
via suresh)
|
||||
|
||||
HADOOP-6616. Improve documentation for rack awareness. (Adam Faris via
|
||||
jghoman)
|
||||
|
||||
HADOOP-9075. FileContext#FSLinkResolver should be made static.
|
||||
(Arpit Agarwal via suresh)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
|
||||
|
@ -371,6 +377,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
HADOOP-9035. Generalize setup of LoginContext (daryn via bobby)
|
||||
|
||||
HADOOP-9042. Add a test for umask in FileSystemContractBaseTest.
|
||||
(Colin McCabe via eli)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-8866. SampleQuantiles#query is O(N^2) instead of O(N). (Andrew Wang
|
||||
|
@ -435,6 +444,9 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
HADOOP-6607. Add different variants of non caching HTTP headers. (tucu)
|
||||
|
||||
HADOOP-9049. DelegationTokenRenewer needs to be Singleton and FileSystems
|
||||
should register/deregister to/from. (Karthik Kambatla via tomwhite)
|
||||
|
||||
Release 2.0.2-alpha - 2012-09-07
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -1137,6 +1149,9 @@ Release 0.23.6 - UNRELEASED
|
|||
|
||||
BUG FIXES
|
||||
|
||||
HADOOP-9072. Hadoop-Common-0.23-Build Fails to build in Jenkins
|
||||
(Robert Parker via tgraves)
|
||||
|
||||
Release 0.23.5 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -1292,23 +1292,139 @@
|
|||
|
||||
<section>
|
||||
<title>Hadoop Rack Awareness</title>
|
||||
<p>The HDFS and the Map/Reduce components are rack-aware.</p>
|
||||
<p>The <code>NameNode</code> and the <code>JobTracker</code> obtains the
|
||||
<code>rack id</code> of the slaves in the cluster by invoking an API
|
||||
<a href="ext:api/org/apache/hadoop/net/dnstoswitchmapping/resolve
|
||||
">resolve</a> in an administrator configured
|
||||
module. The API resolves the slave's DNS name (also IP address) to a
|
||||
rack id. What module to use can be configured using the configuration
|
||||
item <code>net.topology.node.switch.mapping.impl</code>. The default
|
||||
implementation of the same runs a script/command configured using
|
||||
<code>net.topology.script.file.name</code>. If topology.script.file.name is
|
||||
not set, the rack id <code>/default-rack</code> is returned for any
|
||||
passed IP address. The additional configuration in the Map/Reduce
|
||||
part is <code>mapred.cache.task.levels</code> which determines the number
|
||||
of levels (in the network topology) of caches. So, for example, if it is
|
||||
the default value of 2, two levels of caches will be constructed -
|
||||
one for hosts (host -> task mapping) and another for racks
|
||||
(rack -> task mapping).
|
||||
<p>
|
||||
Both HDFS and Map/Reduce components are rack-aware. HDFS block placement will use rack
|
||||
awareness for fault tolerance by placing one block replica on a different rack. This provides
|
||||
data availability in the event of a network switch failure within the cluster. The jobtracker uses rack
|
||||
awareness to reduce network transfers of HDFS data blocks by attempting to schedule tasks on datanodes with a local
|
||||
copy of needed HDFS blocks. If the tasks cannot be scheduled on the datanodes
|
||||
containing the needed HDFS blocks, then the tasks will be scheduled on the same rack to reduce network transfers if possible.
|
||||
</p>
|
||||
<p>The NameNode and the JobTracker obtain the rack id of the cluster slaves by invoking either
|
||||
an external script or java class as specified by configuration files. Using either the
|
||||
java class or external script for topology, output must adhere to the java
|
||||
<a href="ext:api/org/apache/hadoop/net/dnstoswitchmapping/resolve">DNSToSwitchMapping</a>
|
||||
interface. The interface expects a one-to-one correspondence to be maintained
|
||||
and the topology information in the format of '/myrack/myhost', where '/' is the topology
|
||||
delimiter, 'myrack' is the rack identifier, and 'myhost' is the individual host. Assuming
|
||||
a single /24 subnet per rack, one could use the format of '/192.168.100.0/192.168.100.5' as a
|
||||
unique rack-host topology mapping.
|
||||
</p>
|
||||
<p>
|
||||
To use the java class for topology mapping, the class name is specified by the
|
||||
<code>'topology.node.switch.mapping.impl'</code> parameter in the configuration file.
|
||||
An example, NetworkTopology.java, is included with the hadoop distribution and can be customized
|
||||
by the hadoop administrator. If not included with your distribution, NetworkTopology.java can also be found in the Hadoop
|
||||
<a href="http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/NetworkTopology.java?view=markup">
|
||||
subversion tree</a>. Using a java class instead of an external script has a slight performance benefit in
|
||||
that it doesn't need to fork an external process when a new slave node registers itself with the jobtracker or namenode.
|
||||
As this class is only used during slave node registration, the performance benefit is limited.
|
||||
</p>
|
||||
<p>
|
||||
If implementing an external script, it will be specified with the
|
||||
<code>topology.script.file.name</code> parameter in the configuration files. Unlike the java
|
||||
class, the external topology script is not included with the Hadoop distribution and is provided by the
|
||||
administrator. Hadoop will send multiple IP addresses to ARGV when forking the topology script. The
|
||||
number of IP addresses sent to the topology script is controlled with <code>net.topology.script.number.args</code>
|
||||
and defaults to 100. If <code>net.topology.script.number.args</code> was changed to 1, a topology script would
|
||||
get forked for each IP submitted by datanodes and/or tasktrackers. Below are example topology scripts.
|
||||
</p>
|
||||
<section>
|
||||
<title>Python example</title>
|
||||
<source>
|
||||
<code>
|
||||
#!/usr/bin/python
|
||||
|
||||
# this script makes assumptions about the physical environment.
|
||||
# 1) each rack is its own layer 3 network with a /24 subnet, which could be typical where each rack has its own
|
||||
# switch with uplinks to a central core router.
|
||||
#
|
||||
# +-----------+
|
||||
# |core router|
|
||||
# +-----------+
|
||||
# / \
|
||||
# +-----------+ +-----------+
|
||||
# |rack switch| |rack switch|
|
||||
# +-----------+ +-----------+
|
||||
# | data node | | data node |
|
||||
# +-----------+ +-----------+
|
||||
# | data node | | data node |
|
||||
# +-----------+ +-----------+
|
||||
#
|
||||
# 2) topology script gets list of IP's as input, calculates network address, and prints '/network_address/ip'.
|
||||
|
||||
import netaddr
|
||||
import sys
|
||||
sys.argv.pop(0) # discard name of topology script from argv list as we just want IP addresses
|
||||
|
||||
netmask = '255.255.255.0' # set netmask to what's being used in your environment. The example uses a /24
|
||||
|
||||
for ip in sys.argv: # loop over list of datanode IP's
|
||||
address = '{0}/{1}'.format(ip, netmask) # format address string so it looks like 'ip/netmask' to make netaddr work
|
||||
try:
|
||||
network_address = netaddr.IPNetwork(address).network # calculate and print network address
|
||||
print "/{0}".format(network_address)
|
||||
except:
|
||||
print "/rack-unknown" # print catch-all value if unable to calculate network address
|
||||
|
||||
</code>
|
||||
</source>
|
||||
</section>
|
||||
|
||||
<section>
|
||||
<title>Bash example</title>
|
||||
<source>
|
||||
<code>
|
||||
#!/bin/bash
|
||||
# Here's a bash example to show just how simple these scripts can be
|
||||
|
||||
# Assuming we have flat network with everything on a single switch, we can fake a rack topology.
|
||||
# This could occur in a lab environment where we have limited nodes,like 2-8 physical machines on a unmanaged switch.
|
||||
# This may also apply to multiple virtual machines running on the same physical hardware.
|
||||
# The number of machines isn't important, but that we are trying to fake a network topology when there isn't one.
|
||||
#
|
||||
# +----------+ +--------+
|
||||
# |jobtracker| |datanode|
|
||||
# +----------+ +--------+
|
||||
# \ /
|
||||
# +--------+ +--------+ +--------+
|
||||
# |datanode|--| switch |--|datanode|
|
||||
# +--------+ +--------+ +--------+
|
||||
# / \
|
||||
# +--------+ +--------+
|
||||
# |datanode| |namenode|
|
||||
# +--------+ +--------+
|
||||
#
|
||||
# With this network topology, we are treating each host as a rack. This is being done by taking the last octet
|
||||
# in the datanode's IP and prepending it with the word '/rack-'. The advantage for doing this is so HDFS
|
||||
# can create its 'off-rack' block copy.
|
||||
|
||||
# 1) 'echo $@' will echo all ARGV values to xargs.
|
||||
# 2) 'xargs' will enforce that we print a single argv value per line
|
||||
# 3) 'awk' will split fields on dots and append the last field to the string '/rack-'. If awk
|
||||
# fails to split on four dots, it will still print '/rack-' last field value
|
||||
|
||||
echo $@ | xargs -n 1 | awk -F '.' '{print "/rack-"$NF}'
|
||||
|
||||
|
||||
</code>
|
||||
</source>
|
||||
</section>
|
||||
|
||||
|
||||
<p>
|
||||
If <code>topology.script.file.name</code> or <code>topology.node.switch.mapping.impl</code> is
|
||||
not set, the rack id '/default-rack' is returned for any passed IP address.
|
||||
While this behavior appears desirable, it can cause issues with HDFS block replication as
|
||||
default behavior is to write one replicated block off rack and is unable to do so as there is
|
||||
only a single rack named '/default-rack'.
|
||||
</p>
|
||||
<p>
|
||||
An additional configuration setting is <code>mapred.cache.task.levels</code> which determines
|
||||
the number of levels (in the network topology) of caches. So, for example, if it is the
|
||||
default value of 2, two levels of caches will be constructed - one for hosts
|
||||
(host -> task mapping) and another for racks (rack -> task mapping). Giving us our one-to-one
|
||||
mapping of '/myrack/myhost'
|
||||
</p>
|
||||
</section>
|
||||
|
||||
|
|
|
@ -33,7 +33,7 @@ import org.apache.hadoop.util.Time;
|
|||
* A daemon thread that waits for the next file system to renew.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewer.Renewable>
|
||||
public class DelegationTokenRenewer
|
||||
extends Thread {
|
||||
/** The renewable interface used by the renewer. */
|
||||
public interface Renewable {
|
||||
|
@ -93,7 +93,7 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
|
|||
* @param newTime the new time
|
||||
*/
|
||||
private void updateRenewalTime() {
|
||||
renewalTime = RENEW_CYCLE + Time.now();
|
||||
renewalTime = renewCycle + Time.now();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -134,34 +134,69 @@ public class DelegationTokenRenewer<T extends FileSystem & DelegationTokenRenewe
|
|||
}
|
||||
|
||||
/** Wait for 95% of a day between renewals */
|
||||
private static final int RENEW_CYCLE = 24 * 60 * 60 * 950;
|
||||
private static final int RENEW_CYCLE = 24 * 60 * 60 * 950;
|
||||
|
||||
private DelayQueue<RenewAction<T>> queue = new DelayQueue<RenewAction<T>>();
|
||||
@InterfaceAudience.Private
|
||||
protected static int renewCycle = RENEW_CYCLE;
|
||||
|
||||
public DelegationTokenRenewer(final Class<T> clazz) {
|
||||
/** Queue to maintain the RenewActions to be processed by the {@link #run()} */
|
||||
private volatile DelayQueue<RenewAction<?>> queue = new DelayQueue<RenewAction<?>>();
|
||||
|
||||
/**
|
||||
* Create the singleton instance. However, the thread can be started lazily in
|
||||
* {@link #addRenewAction(FileSystem)}
|
||||
*/
|
||||
private static DelegationTokenRenewer INSTANCE = null;
|
||||
|
||||
private DelegationTokenRenewer(final Class<? extends FileSystem> clazz) {
|
||||
super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
|
||||
setDaemon(true);
|
||||
}
|
||||
|
||||
/** Add a renew action to the queue. */
|
||||
public void addRenewAction(final T fs) {
|
||||
queue.add(new RenewAction<T>(fs));
|
||||
public static synchronized DelegationTokenRenewer getInstance() {
|
||||
if (INSTANCE == null) {
|
||||
INSTANCE = new DelegationTokenRenewer(FileSystem.class);
|
||||
}
|
||||
return INSTANCE;
|
||||
}
|
||||
|
||||
/** Add a renew action to the queue. */
|
||||
public synchronized <T extends FileSystem & Renewable> void addRenewAction(final T fs) {
|
||||
queue.add(new RenewAction<T>(fs));
|
||||
if (!isAlive()) {
|
||||
start();
|
||||
}
|
||||
}
|
||||
|
||||
/** Remove the associated renew action from the queue */
|
||||
public synchronized <T extends FileSystem & Renewable> void removeRenewAction(
|
||||
final T fs) {
|
||||
for (RenewAction<?> action : queue) {
|
||||
if (action.weakFs.get() == fs) {
|
||||
queue.remove(action);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("static-access")
|
||||
@Override
|
||||
public void run() {
|
||||
for(;;) {
|
||||
RenewAction<T> action = null;
|
||||
RenewAction<?> action = null;
|
||||
try {
|
||||
action = queue.take();
|
||||
if (action.renew()) {
|
||||
action.updateRenewalTime();
|
||||
queue.add(action);
|
||||
synchronized (this) {
|
||||
action = queue.take();
|
||||
if (action.renew()) {
|
||||
action.updateRenewalTime();
|
||||
queue.add(action);
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
return;
|
||||
} catch (Exception ie) {
|
||||
T.LOG.warn("Failed to renew token, action=" + action, ie);
|
||||
action.weakFs.get().LOG.warn("Failed to renew token, action=" + action,
|
||||
ie);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1119,7 +1119,7 @@ public final class FileContext {
|
|||
* @param target The symlink's absolute target
|
||||
* @return Fully qualified version of the target.
|
||||
*/
|
||||
private Path qualifySymlinkTarget(final AbstractFileSystem pathFS,
|
||||
private static Path qualifySymlinkTarget(final AbstractFileSystem pathFS,
|
||||
Path pathWithLink, Path target) {
|
||||
// NB: makeQualified uses the target's scheme and authority, if
|
||||
// specified, and the scheme and authority of pathFS, if not.
|
||||
|
@ -2321,7 +2321,7 @@ public final class FileContext {
|
|||
* Class used to perform an operation on and resolve symlinks in a
|
||||
* path. The operation may potentially span multiple file systems.
|
||||
*/
|
||||
protected abstract class FSLinkResolver<T> {
|
||||
protected static abstract class FSLinkResolver<T> {
|
||||
// The maximum number of symbolic link components in a path
|
||||
private static final int MAX_PATH_LINKS = 32;
|
||||
|
||||
|
|
|
@ -23,11 +23,13 @@ import java.io.IOException;
|
|||
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataInputStream;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
|
@ -43,7 +45,7 @@ import org.apache.hadoop.fs.Path;
|
|||
* </p>
|
||||
*/
|
||||
public abstract class FileSystemContractBaseTest extends TestCase {
|
||||
|
||||
protected final static String TEST_UMASK = "062";
|
||||
protected FileSystem fs;
|
||||
protected byte[] data = new byte[getBlockSize() * 2]; // two blocks of data
|
||||
{
|
||||
|
@ -151,7 +153,26 @@ public abstract class FileSystemContractBaseTest extends TestCase {
|
|||
assertFalse(fs.exists(testDeepSubDir));
|
||||
|
||||
}
|
||||
|
||||
|
||||
public void testMkdirsWithUmask() throws Exception {
|
||||
if (fs.getScheme().equals("s3") || fs.getScheme().equals("s3n")) {
|
||||
// skip permission tests for S3FileSystem until HDFS-1333 is fixed.
|
||||
return;
|
||||
}
|
||||
Configuration conf = fs.getConf();
|
||||
String oldUmask = conf.get(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY);
|
||||
try {
|
||||
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, TEST_UMASK);
|
||||
final Path dir = new Path("/test/newDir");
|
||||
assertTrue(fs.mkdirs(dir, new FsPermission((short)0777)));
|
||||
FileStatus status = fs.getFileStatus(dir);
|
||||
assertTrue(status.isDirectory());
|
||||
assertEquals((short)0715, status.getPermission().toShort());
|
||||
} finally {
|
||||
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY, oldUmask);
|
||||
}
|
||||
}
|
||||
|
||||
public void testGetFileStatusThrowsExceptionForNonExistentFile()
|
||||
throws Exception {
|
||||
try {
|
||||
|
|
|
@ -0,0 +1,159 @@
|
|||
package org.apache.hadoop.fs;
|
||||
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestDelegationTokenRenewer {
|
||||
private static final int RENEW_CYCLE = 1000;
|
||||
private static final int MAX_RENEWALS = 100;
|
||||
|
||||
@SuppressWarnings("rawtypes")
|
||||
static class TestToken extends Token {
|
||||
public volatile int renewCount = 0;
|
||||
|
||||
@Override
|
||||
public long renew(Configuration conf) {
|
||||
if (renewCount == MAX_RENEWALS) {
|
||||
Thread.currentThread().interrupt();
|
||||
} else {
|
||||
renewCount++;
|
||||
}
|
||||
return renewCount;
|
||||
}
|
||||
}
|
||||
|
||||
static class TestFileSystem extends FileSystem implements
|
||||
DelegationTokenRenewer.Renewable {
|
||||
private Configuration mockConf = mock(Configuration.class);;
|
||||
private TestToken testToken = new TestToken();
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return mockConf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Token<?> getRenewToken() {
|
||||
return testToken;
|
||||
}
|
||||
|
||||
@Override
|
||||
public URI getUri() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream create(Path f, FsPermission permission,
|
||||
boolean overwrite, int bufferSize, short replication, long blockSize,
|
||||
Progressable progress) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FSDataOutputStream append(Path f, int bufferSize,
|
||||
Progressable progress) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean rename(Path src, Path dst) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean delete(Path f, boolean recursive) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus[] listStatus(Path f) throws FileNotFoundException,
|
||||
IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setWorkingDirectory(Path new_dir) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public Path getWorkingDirectory() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean mkdirs(Path f, FsPermission permission) throws IOException {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public FileStatus getFileStatus(Path f) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private DelegationTokenRenewer renewer;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
DelegationTokenRenewer.renewCycle = RENEW_CYCLE;
|
||||
renewer = DelegationTokenRenewer.getInstance();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddRenewAction() throws IOException, InterruptedException {
|
||||
TestFileSystem tfs = new TestFileSystem();
|
||||
renewer.addRenewAction(tfs);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Thread.sleep(RENEW_CYCLE);
|
||||
if (tfs.testToken.renewCount > 0) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue("Token not renewed even after 10 seconds",
|
||||
(tfs.testToken.renewCount > 0));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemoveRenewAction() throws IOException, InterruptedException {
|
||||
TestFileSystem tfs = new TestFileSystem();
|
||||
renewer.addRenewAction(tfs);
|
||||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
Thread.sleep(RENEW_CYCLE);
|
||||
if (tfs.testToken.renewCount > 0) {
|
||||
renewer.removeRenewAction(tfs);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
assertTrue("Token not renewed even once",
|
||||
(tfs.testToken.renewCount > 0));
|
||||
assertTrue("Token not removed",
|
||||
(tfs.testToken.renewCount < MAX_RENEWALS));
|
||||
}
|
||||
}
|
|
@ -164,6 +164,9 @@ Trunk (Unreleased)
|
|||
HDFS-4206. Change the fields in INode and its subclasses to private.
|
||||
(szetszwo)
|
||||
|
||||
HDFS-4215. Remove locking from addToParent(..) since it is used in image
|
||||
loading, and add INode.isFile(). (szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -617,6 +620,13 @@ Release 2.0.3-alpha - Unreleased
|
|||
HDFS-4171. WebHDFS and HttpFs should accept only valid Unix user
|
||||
names. (tucu)
|
||||
|
||||
HDFS-4178. Shell scripts should not close stderr (Andy Isaacson via daryn)
|
||||
|
||||
HDFS-4179. BackupNode: allow reads, fix checkpointing, safeMode. (shv)
|
||||
|
||||
HDFS-4216. Do not ignore QuotaExceededException when adding symlinks.
|
||||
(szetszwo)
|
||||
|
||||
Release 2.0.2-alpha - 2012-09-07
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -74,7 +74,7 @@ fi
|
|||
#---------------------------------------------------------
|
||||
# secondary namenodes (if any)
|
||||
|
||||
SECONDARY_NAMENODES=$($HADOOP_PREFIX/bin/hdfs getconf -secondarynamenodes 2>&-)
|
||||
SECONDARY_NAMENODES=$($HADOOP_PREFIX/bin/hdfs getconf -secondarynamenodes 2>/dev/null)
|
||||
|
||||
if [ -n "$SECONDARY_NAMENODES" ]; then
|
||||
echo "Starting secondary namenodes [$SECONDARY_NAMENODES]"
|
||||
|
|
|
@ -50,7 +50,7 @@ fi
|
|||
#---------------------------------------------------------
|
||||
# secondary namenodes (if any)
|
||||
|
||||
SECONDARY_NAMENODES=$($HADOOP_PREFIX/bin/hdfs getconf -secondarynamenodes 2>&-)
|
||||
SECONDARY_NAMENODES=$($HADOOP_PREFIX/bin/hdfs getconf -secondarynamenodes 2>/dev/null)
|
||||
|
||||
if [ -n "$SECONDARY_NAMENODES" ]; then
|
||||
echo "Stopping secondary namenodes [$SECONDARY_NAMENODES]"
|
||||
|
|
|
@ -82,12 +82,8 @@ import org.xml.sax.helpers.XMLReaderFactory;
|
|||
@InterfaceStability.Evolving
|
||||
public class HftpFileSystem extends FileSystem
|
||||
implements DelegationTokenRenewer.Renewable {
|
||||
private static final DelegationTokenRenewer<HftpFileSystem> dtRenewer
|
||||
= new DelegationTokenRenewer<HftpFileSystem>(HftpFileSystem.class);
|
||||
|
||||
static {
|
||||
HttpURLConnection.setFollowRedirects(true);
|
||||
dtRenewer.start();
|
||||
}
|
||||
|
||||
public static final Text TOKEN_KIND = new Text("HFTP delegation");
|
||||
|
@ -106,6 +102,16 @@ public class HftpFileSystem extends FileSystem
|
|||
private static final HftpDelegationTokenSelector hftpTokenSelector =
|
||||
new HftpDelegationTokenSelector();
|
||||
|
||||
private DelegationTokenRenewer dtRenewer = null;
|
||||
|
||||
private synchronized void addRenewAction(final HftpFileSystem hftpFs) {
|
||||
if (dtRenewer == null) {
|
||||
dtRenewer = DelegationTokenRenewer.getInstance();
|
||||
}
|
||||
|
||||
dtRenewer.addRenewAction(hftpFs);
|
||||
}
|
||||
|
||||
public static final SimpleDateFormat getDateFormat() {
|
||||
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
|
||||
df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
|
||||
|
@ -202,7 +208,7 @@ public class HftpFileSystem extends FileSystem
|
|||
if (token != null) {
|
||||
setDelegationToken(token);
|
||||
if (createdToken) {
|
||||
dtRenewer.addRenewAction(this);
|
||||
addRenewAction(this);
|
||||
LOG.debug("Created new DT for " + token.getService());
|
||||
} else {
|
||||
LOG.debug("Found existing DT for " + token.getService());
|
||||
|
@ -395,6 +401,14 @@ public class HftpFileSystem extends FileSystem
|
|||
return new FSDataInputStream(new RangeHeaderInputStream(u));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
if (dtRenewer != null) {
|
||||
dtRenewer.removeRenewAction(this); // blocks
|
||||
}
|
||||
}
|
||||
|
||||
/** Class to parse and store a listing reply from the server. */
|
||||
class LsParser extends DefaultHandler {
|
||||
|
||||
|
|
|
@ -69,6 +69,8 @@ public class BackupNode extends NameNode {
|
|||
private static final String BN_HTTP_ADDRESS_NAME_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
|
||||
private static final String BN_HTTP_ADDRESS_DEFAULT = DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
|
||||
private static final String BN_SERVICE_RPC_ADDRESS_KEY = DFSConfigKeys.DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY;
|
||||
private static final float BN_SAFEMODE_THRESHOLD_PCT_DEFAULT = 1.5f;
|
||||
private static final int BN_SAFEMODE_EXTENSION_DEFAULT = Integer.MAX_VALUE;
|
||||
|
||||
/** Name-node proxy */
|
||||
NamenodeProtocol namenode;
|
||||
|
@ -127,6 +129,10 @@ public class BackupNode extends NameNode {
|
|||
|
||||
@Override // NameNode
|
||||
protected void loadNamesystem(Configuration conf) throws IOException {
|
||||
conf.setFloat(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY,
|
||||
BN_SAFEMODE_THRESHOLD_PCT_DEFAULT);
|
||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_EXTENSION_KEY,
|
||||
BN_SAFEMODE_EXTENSION_DEFAULT);
|
||||
BackupImage bnImage = new BackupImage(conf);
|
||||
this.namesystem = new FSNamesystem(conf, bnImage);
|
||||
bnImage.setNamesystem(namesystem);
|
||||
|
@ -423,9 +429,9 @@ public class BackupNode extends NameNode {
|
|||
return;
|
||||
}
|
||||
if (OperationCategory.JOURNAL != op &&
|
||||
!(OperationCategory.READ == op && allowStaleStandbyReads)) {
|
||||
!(OperationCategory.READ == op && !isRole(NamenodeRole.CHECKPOINT))) {
|
||||
String msg = "Operation category " + op
|
||||
+ " is not supported at the BackupNode";
|
||||
+ " is not supported at " + getRole();
|
||||
throw new StandbyException(msg);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -206,6 +206,7 @@ class Checkpointer extends Daemon {
|
|||
RemoteEditLogManifest manifest =
|
||||
getRemoteNamenodeProxy().getEditLogManifest(bnImage.getLastAppliedTxId() + 1);
|
||||
|
||||
boolean needReloadImage = false;
|
||||
if (!manifest.getLogs().isEmpty()) {
|
||||
RemoteEditLog firstRemoteLog = manifest.getLogs().get(0);
|
||||
// we don't have enough logs to roll forward using only logs. Need
|
||||
|
@ -218,13 +219,10 @@ class Checkpointer extends Daemon {
|
|||
bnStorage, true);
|
||||
bnImage.saveDigestAndRenameCheckpointImage(
|
||||
sig.mostRecentCheckpointTxId, downloadedHash);
|
||||
|
||||
LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
|
||||
File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
|
||||
bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
|
||||
lastApplied = sig.mostRecentCheckpointTxId;
|
||||
needReloadImage = true;
|
||||
}
|
||||
|
||||
lastApplied = bnImage.getLastAppliedTxId();
|
||||
|
||||
if (firstRemoteLog.getStartTxId() > lastApplied + 1) {
|
||||
throw new IOException("No logs to roll forward from " + lastApplied);
|
||||
}
|
||||
|
@ -234,7 +232,12 @@ class Checkpointer extends Daemon {
|
|||
TransferFsImage.downloadEditsToStorage(
|
||||
backupNode.nnHttpAddress, log, bnStorage);
|
||||
}
|
||||
|
||||
|
||||
if(needReloadImage) {
|
||||
LOG.info("Loading image with txid " + sig.mostRecentCheckpointTxId);
|
||||
File file = bnStorage.findImageFile(sig.mostRecentCheckpointTxId);
|
||||
bnImage.reloadFromImageFile(file, backupNode.getNamesystem());
|
||||
}
|
||||
rollForwardByApplyingLogs(manifest, bnImage, backupNode.getNamesystem());
|
||||
}
|
||||
|
||||
|
@ -243,8 +246,9 @@ class Checkpointer extends Daemon {
|
|||
backupNode.namesystem.writeLock();
|
||||
try {
|
||||
backupNode.namesystem.dir.setReady();
|
||||
backupNode.namesystem.setBlockTotal();
|
||||
|
||||
if(backupNode.namesystem.getBlocksTotal() > 0) {
|
||||
backupNode.namesystem.setBlockTotal();
|
||||
}
|
||||
bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
|
||||
bnStorage.writeAll();
|
||||
} finally {
|
||||
|
@ -284,12 +288,12 @@ class Checkpointer extends Daemon {
|
|||
|
||||
List<EditLogInputStream> editsStreams = Lists.newArrayList();
|
||||
for (RemoteEditLog log : manifest.getLogs()) {
|
||||
File f = dstStorage.findFinalizedEditsFile(
|
||||
log.getStartTxId(), log.getEndTxId());
|
||||
if (log.getStartTxId() > dstImage.getLastAppliedTxId()) {
|
||||
if (log.getEndTxId() > dstImage.getLastAppliedTxId()) {
|
||||
File f = dstStorage.findFinalizedEditsFile(
|
||||
log.getStartTxId(), log.getEndTxId());
|
||||
editsStreams.add(new EditLogFileInputStream(f, log.getStartTxId(),
|
||||
log.getEndTxId(), true));
|
||||
}
|
||||
}
|
||||
}
|
||||
LOG.info("Checkpointer about to load edits from " +
|
||||
editsStreams.size() + " stream(s).");
|
||||
|
|
|
@ -77,6 +77,12 @@ import com.google.common.base.Preconditions;
|
|||
*
|
||||
*************************************************/
|
||||
public class FSDirectory implements Closeable {
|
||||
private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) {
|
||||
final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
|
||||
INodeDirectory.ROOT_NAME,
|
||||
namesystem.createFsOwnerPermissions(new FsPermission((short)0755)));
|
||||
return INodeDirectorySnapshottable.newInstance(r, 0);
|
||||
}
|
||||
|
||||
INodeDirectoryWithQuota rootDir;
|
||||
FSImage fsImage;
|
||||
|
@ -125,16 +131,7 @@ public class FSDirectory implements Closeable {
|
|||
FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
|
||||
this.dirLock = new ReentrantReadWriteLock(true); // fair
|
||||
this.cond = dirLock.writeLock().newCondition();
|
||||
|
||||
this.namesystem = ns;
|
||||
int threshold = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
|
||||
NameNode.LOG.info("Caching file names occuring more than " + threshold
|
||||
+ " times");
|
||||
this.nameCache = new NameCache<ByteArray>(threshold);
|
||||
reset();
|
||||
|
||||
rootDir = createRoot(ns);
|
||||
this.fsImage = fsImage;
|
||||
int configuredLimit = conf.getInt(
|
||||
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
|
||||
|
@ -148,6 +145,14 @@ public class FSDirectory implements Closeable {
|
|||
this.maxDirItems = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_MAX_DIRECTORY_ITEMS_DEFAULT);
|
||||
|
||||
int threshold = conf.getInt(
|
||||
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
|
||||
NameNode.LOG.info("Caching file names occuring more than " + threshold
|
||||
+ " times");
|
||||
nameCache = new NameCache<ByteArray>(threshold);
|
||||
namesystem = ns;
|
||||
}
|
||||
|
||||
private FSNamesystem getFSNamesystem() {
|
||||
|
@ -309,35 +314,6 @@ public class FSDirectory implements Closeable {
|
|||
return newNode;
|
||||
}
|
||||
|
||||
INodeDirectory addToParent(INodeDirectory parentINode,
|
||||
INode newNode, boolean propagateModTime) {
|
||||
// NOTE: This does not update space counts for parents
|
||||
INodeDirectory newParent = null;
|
||||
writeLock();
|
||||
try {
|
||||
try {
|
||||
newParent = rootDir.addToParent(newNode, parentINode,
|
||||
propagateModTime);
|
||||
cacheName(newNode);
|
||||
} catch (FileNotFoundException e) {
|
||||
return null;
|
||||
}
|
||||
if(newParent == null)
|
||||
return null;
|
||||
if(!newNode.isDirectory() && !newNode.isSymlink()) {
|
||||
// Add file->block mapping
|
||||
INodeFile newF = (INodeFile)newNode;
|
||||
BlockInfo[] blocks = newF.getBlocks();
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
newF.setBlock(i, getBlockManager().addBlockCollection(blocks[i], newF));
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
writeUnlock();
|
||||
}
|
||||
return newParent;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a block to the file. Returns a reference to the added block.
|
||||
*/
|
||||
|
@ -845,11 +821,7 @@ public class FSDirectory implements Closeable {
|
|||
final INodesInPath inodesInPath = rootDir.getMutableINodesInPath(src, true);
|
||||
final INode[] inodes = inodesInPath.getINodes();
|
||||
INode inode = inodes[inodes.length - 1];
|
||||
if (inode == null) {
|
||||
return null;
|
||||
}
|
||||
assert !inode.isSymlink();
|
||||
if (inode.isDirectory()) {
|
||||
if (inode == null || !inode.isFile()) {
|
||||
return null;
|
||||
}
|
||||
INodeFile fileNode = (INodeFile)inode;
|
||||
|
@ -868,22 +840,15 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
|
||||
/**
|
||||
* Get the blocksize of a file
|
||||
* @param filename the filename
|
||||
* @return the number of bytes
|
||||
* @param path the file path
|
||||
* @return the block size of the file.
|
||||
*/
|
||||
long getPreferredBlockSize(String filename) throws UnresolvedLinkException,
|
||||
long getPreferredBlockSize(String path) throws UnresolvedLinkException,
|
||||
FileNotFoundException, IOException {
|
||||
readLock();
|
||||
try {
|
||||
INode inode = rootDir.getNode(filename, false);
|
||||
if (inode == null) {
|
||||
throw new FileNotFoundException("File does not exist: " + filename);
|
||||
}
|
||||
if (inode.isDirectory() || inode.isSymlink()) {
|
||||
throw new IOException("Getting block size of non-file: "+ filename);
|
||||
}
|
||||
return ((INodeFile)inode).getPreferredBlockSize();
|
||||
return INodeFile.valueOf(rootDir.getNode(path, false), path
|
||||
).getPreferredBlockSize();
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
@ -897,9 +862,7 @@ public class FSDirectory implements Closeable {
|
|||
if (inode == null) {
|
||||
return false;
|
||||
}
|
||||
return inode.isDirectory() || inode.isSymlink()
|
||||
? true
|
||||
: ((INodeFile)inode).getBlocks() != null;
|
||||
return !inode.isFile() || ((INodeFile)inode).getBlocks() != null;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
@ -1336,14 +1299,8 @@ public class FSDirectory implements Closeable {
|
|||
waitForReady();
|
||||
readLock();
|
||||
try {
|
||||
INode targetNode = rootDir.getNode(src, false);
|
||||
if (targetNode == null)
|
||||
return null;
|
||||
if (targetNode.isDirectory())
|
||||
return null;
|
||||
if (targetNode.isSymlink())
|
||||
return null;
|
||||
return ((INodeFile)targetNode).getBlocks();
|
||||
final INode i = rootDir.getNode(src, false);
|
||||
return i != null && i.isFile()? ((INodeFile)i).getBlocks(): null;
|
||||
} finally {
|
||||
readUnlock();
|
||||
}
|
||||
|
@ -2151,11 +2108,7 @@ public class FSDirectory implements Closeable {
|
|||
writeLock();
|
||||
try {
|
||||
setReady(false);
|
||||
final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
|
||||
INodeDirectory.ROOT_NAME,
|
||||
getFSNamesystem().createFsOwnerPermissions(new FsPermission((short)0755)),
|
||||
Long.MAX_VALUE, UNKNOWN_DISK_SPACE);
|
||||
rootDir = INodeDirectorySnapshottable.newInstance(r, 0);
|
||||
rootDir = createRoot(getFSNamesystem());
|
||||
nameCache.reset();
|
||||
} finally {
|
||||
writeUnlock();
|
||||
|
@ -2250,7 +2203,7 @@ public class FSDirectory implements Closeable {
|
|||
INodeSymlink addSymlink(String path, String target,
|
||||
PermissionStatus dirPerms, boolean createParent)
|
||||
throws UnresolvedLinkException, FileAlreadyExistsException,
|
||||
QuotaExceededException, IOException {
|
||||
QuotaExceededException, SnapshotAccessControlException {
|
||||
waitForReady();
|
||||
|
||||
final long modTime = now();
|
||||
|
@ -2264,7 +2217,7 @@ public class FSDirectory implements Closeable {
|
|||
INodeSymlink newNode = null;
|
||||
writeLock();
|
||||
try {
|
||||
newNode = unprotectedSymlink(path, target, modTime, modTime,
|
||||
newNode = unprotectedAddSymlink(path, target, modTime, modTime,
|
||||
new PermissionStatus(userName, null, FsPermission.getDefault()));
|
||||
} finally {
|
||||
writeUnlock();
|
||||
|
@ -2284,23 +2237,12 @@ public class FSDirectory implements Closeable {
|
|||
/**
|
||||
* Add the specified path into the namespace. Invoked from edit log processing.
|
||||
*/
|
||||
INodeSymlink unprotectedSymlink(String path, String target, long modTime,
|
||||
INodeSymlink unprotectedAddSymlink(String path, String target, long mtime,
|
||||
long atime, PermissionStatus perm)
|
||||
throws UnresolvedLinkException {
|
||||
throws UnresolvedLinkException, QuotaExceededException {
|
||||
assert hasWriteLock();
|
||||
INodeSymlink newNode = new INodeSymlink(target, modTime, atime, perm);
|
||||
try {
|
||||
newNode = addNode(path, newNode, UNKNOWN_DISK_SPACE);
|
||||
} catch (UnresolvedLinkException e) {
|
||||
/* All UnresolvedLinkExceptions should have been resolved by now, but we
|
||||
* should re-throw them in case that changes so they are not swallowed
|
||||
* by catching IOException below.
|
||||
*/
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
return newNode;
|
||||
final INodeSymlink symlink = new INodeSymlink(target, mtime, atime, perm);
|
||||
return addNode(path, symlink, UNKNOWN_DISK_SPACE);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -2309,7 +2251,7 @@ public class FSDirectory implements Closeable {
|
|||
*/
|
||||
void cacheName(INode inode) {
|
||||
// Name is cached only for files
|
||||
if (inode.isDirectory() || inode.isSymlink()) {
|
||||
if (!inode.isFile()) {
|
||||
return;
|
||||
}
|
||||
ByteArray name = new ByteArray(inode.getLocalNameBytes());
|
||||
|
|
|
@ -426,7 +426,7 @@ public class FSEditLogLoader {
|
|||
}
|
||||
case OP_SYMLINK: {
|
||||
SymlinkOp symlinkOp = (SymlinkOp)op;
|
||||
fsDir.unprotectedSymlink(symlinkOp.path, symlinkOp.value,
|
||||
fsDir.unprotectedAddSymlink(symlinkOp.path, symlinkOp.value,
|
||||
symlinkOp.mtime, symlinkOp.atime,
|
||||
symlinkOp.permissionStatus);
|
||||
break;
|
||||
|
|
|
@ -38,7 +38,6 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
|
||||
|
@ -1020,6 +1019,7 @@ public class FSImage implements Closeable {
|
|||
NamenodeCommand startCheckpoint(NamenodeRegistration bnReg, // backup node
|
||||
NamenodeRegistration nnReg) // active name-node
|
||||
throws IOException {
|
||||
LOG.info("Start checkpoint at txid " + getEditLog().getLastWrittenTxId());
|
||||
String msg = null;
|
||||
// Verify that checkpoint is allowed
|
||||
if(bnReg.getNamespaceID() != storage.getNamespaceID())
|
||||
|
@ -1059,6 +1059,7 @@ public class FSImage implements Closeable {
|
|||
* @throws IOException if the checkpoint fields are inconsistent
|
||||
*/
|
||||
void endCheckpoint(CheckpointSignature sig) throws IOException {
|
||||
LOG.info("End checkpoint at txid " + getEditLog().getLastWrittenTxId());
|
||||
sig.validateStorageInfo(this);
|
||||
}
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||
import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
||||
import org.apache.hadoop.io.MD5Hash;
|
||||
|
@ -202,7 +203,7 @@ class FSImageFormat {
|
|||
fsDir.rootDir.setQuota(nsQuota, dsQuota);
|
||||
}
|
||||
fsDir.rootDir.setModificationTime(root.getModificationTime());
|
||||
fsDir.rootDir.setPermissionStatus(root.getPermissionStatus());
|
||||
fsDir.rootDir.clonePermissionStatus(root);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -258,7 +259,7 @@ class FSImageFormat {
|
|||
|
||||
// add to parent
|
||||
newNode.setLocalName(localName);
|
||||
namesystem.dir.addToParent(parent, newNode, false);
|
||||
addToParent(parent, newNode);
|
||||
}
|
||||
return numChildren;
|
||||
}
|
||||
|
@ -293,7 +294,30 @@ class FSImageFormat {
|
|||
|
||||
// add new inode
|
||||
newNode.setLocalName(pathComponents[pathComponents.length-1]);
|
||||
parentINode = fsDir.addToParent(parentINode, newNode, false);
|
||||
addToParent(parentINode, newNode);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add the child node to parent and, if child is a file, update block map.
|
||||
* This method is only used for image loading so that synchronization,
|
||||
* modification time update and space count update are not needed.
|
||||
*/
|
||||
void addToParent(INodeDirectory parent, INode child) {
|
||||
// NOTE: This does not update space counts for parents
|
||||
if (parent.addChild(child, false) == null) {
|
||||
return;
|
||||
}
|
||||
namesystem.dir.cacheName(child);
|
||||
|
||||
if (child.isFile()) {
|
||||
// Add file->block mapping
|
||||
final INodeFile file = (INodeFile)child;
|
||||
final BlockInfo[] blocks = file.getBlocks();
|
||||
final BlockManager bm = namesystem.getBlockManager();
|
||||
for (int i = 0; i < blocks.length; i++) {
|
||||
file.setBlock(i, bm.addBlockCollection(blocks[i], file));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -4048,7 +4048,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
// of the number of total blocks in the system.
|
||||
this.shouldIncrementallyTrackBlocks = true;
|
||||
}
|
||||
|
||||
if(blockSafe < 0)
|
||||
this.blockSafe = 0;
|
||||
checkMode();
|
||||
}
|
||||
|
||||
|
|
|
@ -90,17 +90,17 @@ public abstract class INode implements Comparable<byte[]> {
|
|||
return (record & ~MASK) | (bits << OFFSET);
|
||||
}
|
||||
|
||||
/** Set the {@link PermissionStatus} */
|
||||
/** Encode the {@link PermissionStatus} to a long. */
|
||||
static long toLong(PermissionStatus ps) {
|
||||
long permission = 0L;
|
||||
final int user = SerialNumberManager.INSTANCE.getUserSerialNumber(
|
||||
ps.getUserName());
|
||||
permission = PermissionStatusFormat.USER.combine(user, permission);
|
||||
permission = USER.combine(user, permission);
|
||||
final int group = SerialNumberManager.INSTANCE.getGroupSerialNumber(
|
||||
ps.getGroupName());
|
||||
permission = PermissionStatusFormat.GROUP.combine(group, permission);
|
||||
permission = GROUP.combine(group, permission);
|
||||
final int mode = ps.getPermission().toShort();
|
||||
permission = PermissionStatusFormat.MODE.combine(mode, permission);
|
||||
permission = MODE.combine(mode, permission);
|
||||
return permission;
|
||||
}
|
||||
}
|
||||
|
@ -114,8 +114,9 @@ public abstract class INode implements Comparable<byte[]> {
|
|||
*/
|
||||
private byte[] name = null;
|
||||
/**
|
||||
* Permission encoded using PermissionStatusFormat.
|
||||
* Codes other than {@link #updatePermissionStatus(PermissionStatusFormat, long)}.
|
||||
* Permission encoded using {@link PermissionStatusFormat}.
|
||||
* Codes other than {@link #clonePermissionStatus(INode)}
|
||||
* and {@link #updatePermissionStatus(PermissionStatusFormat, long)}
|
||||
* should not modify it.
|
||||
*/
|
||||
private long permission = 0L;
|
||||
|
@ -159,11 +160,9 @@ public abstract class INode implements Comparable<byte[]> {
|
|||
return name.length == 0;
|
||||
}
|
||||
|
||||
/** Set the {@link PermissionStatus} */
|
||||
protected void setPermissionStatus(PermissionStatus ps) {
|
||||
setUser(ps.getUserName());
|
||||
setGroup(ps.getGroupName());
|
||||
setPermission(ps.getPermission());
|
||||
/** Clone the {@link PermissionStatus}. */
|
||||
void clonePermissionStatus(INode that) {
|
||||
this.permission = that.permission;
|
||||
}
|
||||
/** Get the {@link PermissionStatus} */
|
||||
public PermissionStatus getPermissionStatus() {
|
||||
|
@ -205,6 +204,13 @@ public abstract class INode implements Comparable<byte[]> {
|
|||
updatePermissionStatus(PermissionStatusFormat.MODE, permission.toShort());
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether it's a file.
|
||||
*/
|
||||
public boolean isFile() {
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether it's a directory
|
||||
*/
|
||||
|
|
|
@ -395,23 +395,6 @@ public class INodeDirectory extends INode {
|
|||
return addToParent(pathComponents, newNode, true) == null? null: newNode;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add new inode to the parent if specified.
|
||||
* Optimized version of addNode() if parent is not null.
|
||||
*
|
||||
* @return parent INode if new inode is inserted
|
||||
* or null if it already exists.
|
||||
* @throws FileNotFoundException if parent does not exist or
|
||||
* is not a directory.
|
||||
*/
|
||||
INodeDirectory addToParent(INode newNode, INodeDirectory parent,
|
||||
boolean propagateModTime) throws FileNotFoundException {
|
||||
// insert into the parent children list
|
||||
if(parent.addChild(newNode, propagateModTime) == null)
|
||||
return null;
|
||||
return parent;
|
||||
}
|
||||
|
||||
INodeDirectory getParent(byte[][] pathComponents
|
||||
) throws FileNotFoundException, UnresolvedLinkException {
|
||||
if (pathComponents.length < 2) // add root
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
|
||||
|
@ -26,9 +27,13 @@ import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
|||
* Directory INode class that has a quota restriction
|
||||
*/
|
||||
public class INodeDirectoryWithQuota extends INodeDirectory {
|
||||
private long nsQuota; /// NameSpace quota
|
||||
/** Name space quota */
|
||||
private long nsQuota = Long.MAX_VALUE;
|
||||
/** Name space count */
|
||||
private long nsCount = 1L;
|
||||
private long dsQuota; /// disk space quota
|
||||
/** Disk space quota */
|
||||
private long dsQuota = HdfsConstants.QUOTA_RESET;
|
||||
/** Disk space count */
|
||||
private long diskspace = 0L;
|
||||
|
||||
/** Convert an existing directory inode to one with the given quota
|
||||
|
@ -57,11 +62,8 @@ public class INodeDirectoryWithQuota extends INodeDirectory {
|
|||
}
|
||||
|
||||
/** constructor with no quota verification */
|
||||
INodeDirectoryWithQuota(String name, PermissionStatus permissions,
|
||||
long nsQuota, long dsQuota) {
|
||||
INodeDirectoryWithQuota(String name, PermissionStatus permissions) {
|
||||
super(name, permissions);
|
||||
this.nsQuota = nsQuota;
|
||||
this.dsQuota = dsQuota;
|
||||
}
|
||||
|
||||
/** Get this directory's namespace quota
|
||||
|
|
|
@ -100,6 +100,12 @@ public class INodeFile extends INode implements BlockCollection {
|
|||
this.setLocalName(f.getLocalNameBytes());
|
||||
}
|
||||
|
||||
/** @return true unconditionally. */
|
||||
@Override
|
||||
public final boolean isFile() {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the {@link FsPermission} of this {@link INodeFile}.
|
||||
* Since this is a file,
|
||||
|
|
|
@ -124,15 +124,14 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
public static final WebHdfsDelegationTokenSelector DT_SELECTOR
|
||||
= new WebHdfsDelegationTokenSelector();
|
||||
|
||||
private static DelegationTokenRenewer<WebHdfsFileSystem> DT_RENEWER = null;
|
||||
private DelegationTokenRenewer dtRenewer = null;
|
||||
|
||||
private static synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
|
||||
if (DT_RENEWER == null) {
|
||||
DT_RENEWER = new DelegationTokenRenewer<WebHdfsFileSystem>(WebHdfsFileSystem.class);
|
||||
DT_RENEWER.start();
|
||||
private synchronized void addRenewAction(final WebHdfsFileSystem webhdfs) {
|
||||
if (dtRenewer == null) {
|
||||
dtRenewer = DelegationTokenRenewer.getInstance();
|
||||
}
|
||||
|
||||
DT_RENEWER.addRenewAction(webhdfs);
|
||||
dtRenewer.addRenewAction(webhdfs);
|
||||
}
|
||||
|
||||
/** Is WebHDFS enabled in conf? */
|
||||
|
@ -766,6 +765,14 @@ public class WebHdfsFileSystem extends FileSystem
|
|||
new OffsetUrlOpener(url), new OffsetUrlOpener(null)));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
super.close();
|
||||
if (dtRenewer != null) {
|
||||
dtRenewer.removeRenewAction(this); // blocks
|
||||
}
|
||||
}
|
||||
|
||||
class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
|
||||
OffsetUrlOpener(final URL url) {
|
||||
super(url);
|
||||
|
|
|
@ -28,9 +28,11 @@ import org.apache.commons.logging.impl.Log4JLogger;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
|
||||
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
|
||||
|
@ -51,6 +53,7 @@ public class TestFcHdfsSymlink extends FileContextSymlinkBaseTest {
|
|||
|
||||
private static MiniDFSCluster cluster;
|
||||
private static WebHdfsFileSystem webhdfs;
|
||||
private static DistributedFileSystem dfs;
|
||||
|
||||
|
||||
@Override
|
||||
|
@ -89,6 +92,7 @@ public class TestFcHdfsSymlink extends FileContextSymlinkBaseTest {
|
|||
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||
fc = FileContext.getFileContext(cluster.getURI(0));
|
||||
webhdfs = WebHdfsTestUtil.getWebHdfsFileSystem(conf);
|
||||
dfs = cluster.getFileSystem();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -317,4 +321,27 @@ public class TestFcHdfsSymlink extends FileContextSymlinkBaseTest {
|
|||
assertEquals(2, fc.getFileStatus(link).getReplication());
|
||||
assertEquals(2, fc.getFileStatus(file).getReplication());
|
||||
}
|
||||
|
||||
@Test
|
||||
/** Test craeteSymlink(..) with quota. */
|
||||
public void testQuota() throws IOException {
|
||||
final Path dir = new Path(testBaseDir1());
|
||||
dfs.setQuota(dir, 3, HdfsConstants.QUOTA_DONT_SET);
|
||||
|
||||
final Path file = new Path(dir, "file");
|
||||
createAndWriteFile(file);
|
||||
|
||||
//creating the first link should succeed
|
||||
final Path link1 = new Path(dir, "link1");
|
||||
fc.createSymlink(file, link1, false);
|
||||
|
||||
try {
|
||||
//creating the second link should fail with QuotaExceededException.
|
||||
final Path link2 = new Path(dir, "link2");
|
||||
fc.createSymlink(file, link2, false);
|
||||
fail("Created symlink despite quota violation");
|
||||
} catch(QuotaExceededException qee) {
|
||||
//expected
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs;
|
|||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.fs.FileSystemContractBaseTest;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
@ -33,6 +34,8 @@ public class TestHDFSFileSystemContract extends FileSystemContractBaseTest {
|
|||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
|
||||
FileSystemContractBaseTest.TEST_UMASK);
|
||||
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
|
||||
fs = cluster.getFileSystem();
|
||||
defaultWorkingDirectory = "/user/" +
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
|
||||
|
@ -99,7 +100,10 @@ public class TestBackupNode {
|
|||
c.set(DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY,
|
||||
"127.0.0.1:0");
|
||||
|
||||
return (BackupNode)NameNode.createNameNode(new String[]{startupOpt.getName()}, c);
|
||||
BackupNode bn = (BackupNode)NameNode.createNameNode(
|
||||
new String[]{startupOpt.getName()}, c);
|
||||
assertTrue(bn.getRole() + " must be in SafeMode.", bn.isInSafeMode());
|
||||
return bn;
|
||||
}
|
||||
|
||||
void waitCheckpointDone(MiniDFSCluster cluster, long txid) {
|
||||
|
@ -358,11 +362,22 @@ public class TestBackupNode {
|
|||
DFSTestUtil.createFile(bnFS, file3, fileSize, fileSize, blockSize,
|
||||
replication, seed);
|
||||
} catch (IOException eio) {
|
||||
LOG.info("Write to BN failed as expected: ", eio);
|
||||
LOG.info("Write to " + backup.getRole() + " failed as expected: ", eio);
|
||||
canWrite = false;
|
||||
}
|
||||
assertFalse("Write to BackupNode must be prohibited.", canWrite);
|
||||
|
||||
// Reads are allowed for BackupNode, but not for CheckpointNode
|
||||
boolean canRead = true;
|
||||
try {
|
||||
bnFS.exists(file2);
|
||||
} catch (IOException eio) {
|
||||
LOG.info("Read from " + backup.getRole() + " failed: ", eio);
|
||||
canRead = false;
|
||||
}
|
||||
assertEquals("Reads to BackupNode are allowed, but not CheckpointNode.",
|
||||
canRead, backup.isRole(NamenodeRole.BACKUP));
|
||||
|
||||
DFSTestUtil.createFile(fileSys, file3, fileSize, fileSize, blockSize,
|
||||
replication, seed);
|
||||
|
||||
|
|
|
@ -73,7 +73,7 @@ public class TestFsLimits {
|
|||
fileAsURI(new File(MiniDFSCluster.getBaseDirectory(),
|
||||
"namenode")).toString());
|
||||
|
||||
rootInode = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME, perms, 0L, 0L);
|
||||
rootInode = new INodeDirectoryWithQuota(INodeDirectory.ROOT_NAME, perms);
|
||||
inodes = new INode[]{ rootInode, null };
|
||||
fs = null;
|
||||
fsIsReady = true;
|
||||
|
|
|
@ -89,11 +89,6 @@
|
|||
<phase>test-compile</phase>
|
||||
</execution>
|
||||
</executions>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>mrapp-generated-classpath</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-antrun-plugin</artifactId>
|
||||
|
|
|
@ -601,7 +601,7 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
setSummarySlotSeconds(summary, context.getJob(jobId).getAllCounters());
|
||||
break;
|
||||
default:
|
||||
throw new YarnException("Invalid event type");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,13 +18,8 @@
|
|||
|
||||
package org.apache.hadoop.mapreduce.v2.util;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
@ -134,62 +129,24 @@ public class MRApps extends Apps {
|
|||
|
||||
private static void setMRFrameworkClasspath(
|
||||
Map<String, String> environment, Configuration conf) throws IOException {
|
||||
InputStream classpathFileStream = null;
|
||||
BufferedReader reader = null;
|
||||
try {
|
||||
// Get yarn mapreduce-app classpath from generated classpath
|
||||
// Works if compile time env is same as runtime. Mainly tests.
|
||||
ClassLoader thisClassLoader =
|
||||
Thread.currentThread().getContextClassLoader();
|
||||
String mrAppGeneratedClasspathFile = "mrapp-generated-classpath";
|
||||
classpathFileStream =
|
||||
thisClassLoader.getResourceAsStream(mrAppGeneratedClasspathFile);
|
||||
// Propagate the system classpath when using the mini cluster
|
||||
if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
||||
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
|
||||
System.getProperty("java.class.path"));
|
||||
}
|
||||
|
||||
// Put the file itself on classpath for tasks.
|
||||
URL classpathResource = thisClassLoader
|
||||
.getResource(mrAppGeneratedClasspathFile);
|
||||
if (classpathResource != null) {
|
||||
String classpathElement = classpathResource.getFile();
|
||||
if (classpathElement.contains("!")) {
|
||||
classpathElement = classpathElement.substring(0,
|
||||
classpathElement.indexOf("!"));
|
||||
} else {
|
||||
classpathElement = new File(classpathElement).getParent();
|
||||
}
|
||||
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
|
||||
classpathElement);
|
||||
}
|
||||
|
||||
if (classpathFileStream != null) {
|
||||
reader = new BufferedReader(new InputStreamReader(classpathFileStream,
|
||||
Charsets.UTF_8));
|
||||
String cp = reader.readLine();
|
||||
if (cp != null) {
|
||||
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(),
|
||||
cp.trim());
|
||||
}
|
||||
}
|
||||
|
||||
// Add standard Hadoop classes
|
||||
for (String c : conf.getStrings(
|
||||
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
|
||||
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
|
||||
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
|
||||
.trim());
|
||||
}
|
||||
for (String c : conf.getStrings(
|
||||
MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
|
||||
MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
|
||||
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
|
||||
.trim());
|
||||
}
|
||||
} finally {
|
||||
if (classpathFileStream != null) {
|
||||
classpathFileStream.close();
|
||||
}
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
// Add standard Hadoop classes
|
||||
for (String c : conf.getStrings(
|
||||
YarnConfiguration.YARN_APPLICATION_CLASSPATH,
|
||||
YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH)) {
|
||||
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
|
||||
.trim());
|
||||
}
|
||||
for (String c : conf.getStrings(
|
||||
MRJobConfig.MAPREDUCE_APPLICATION_CLASSPATH,
|
||||
MRJobConfig.DEFAULT_MAPREDUCE_APPLICATION_CLASSPATH)) {
|
||||
Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), c
|
||||
.trim());
|
||||
}
|
||||
// TODO: Remove duplicates.
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.RunningJob;
|
|||
|
||||
import org.apache.hadoop.mapreduce.MRConfig;
|
||||
import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
|
@ -52,6 +53,8 @@ public class TestEncryptedShuffle {
|
|||
private static final String BASEDIR =
|
||||
System.getProperty("test.build.dir", "target/test-dir") + "/" +
|
||||
TestEncryptedShuffle.class.getSimpleName();
|
||||
|
||||
private String classpathDir;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUp() throws Exception {
|
||||
|
@ -62,27 +65,12 @@ public class TestEncryptedShuffle {
|
|||
|
||||
@Before
|
||||
public void createCustomYarnClasspath() throws Exception {
|
||||
String classpathDir =
|
||||
KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
|
||||
|
||||
URL url = Thread.currentThread().getContextClassLoader().
|
||||
getResource("mrapp-generated-classpath");
|
||||
File f = new File(url.getPath());
|
||||
BufferedReader reader = new BufferedReader(new FileReader(f));
|
||||
String cp = reader.readLine();
|
||||
cp = cp + ":" + classpathDir;
|
||||
f = new File(classpathDir, "mrapp-generated-classpath");
|
||||
Writer writer = new FileWriter(f);
|
||||
writer.write(cp);
|
||||
writer.close();
|
||||
classpathDir = KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
|
||||
new File(classpathDir, "core-site.xml").delete();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanUpMiniClusterSpecialConfig() throws Exception {
|
||||
String classpathDir =
|
||||
KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
|
||||
new File(classpathDir, "mrapp-generated-classpath").delete();
|
||||
new File(classpathDir, "core-site.xml").delete();
|
||||
String keystoresDir = new File(BASEDIR).getAbsolutePath();
|
||||
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, classpathDir);
|
||||
|
@ -98,6 +86,9 @@ public class TestEncryptedShuffle {
|
|||
conf.set("dfs.block.access.token.enable", "false");
|
||||
conf.set("dfs.permissions", "true");
|
||||
conf.set("hadoop.security.authentication", "simple");
|
||||
String cp = conf.get(YarnConfiguration.YARN_APPLICATION_CLASSPATH) +
|
||||
File.pathSeparator + classpathDir;
|
||||
conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, cp);
|
||||
dfsCluster = new MiniDFSCluster(conf, 1, true, null);
|
||||
FileSystem fileSystem = dfsCluster.getFileSystem();
|
||||
fileSystem.mkdirs(new Path("/tmp"));
|
||||
|
@ -113,8 +104,6 @@ public class TestEncryptedShuffle {
|
|||
mrCluster = MiniMRClientClusterFactory.create(this.getClass(), 1, conf);
|
||||
|
||||
// so the minicluster conf is avail to the containers.
|
||||
String classpathDir =
|
||||
KeyStoreTestUtil.getClasspathDir(TestEncryptedShuffle.class);
|
||||
Writer writer = new FileWriter(classpathDir + "/core-site.xml");
|
||||
mrCluster.getConfig().writeXml(writer);
|
||||
writer.close();
|
||||
|
|
|
@ -703,11 +703,6 @@
|
|||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-jar-plugin</artifactId>
|
||||
<version>2.3.1</version>
|
||||
<configuration>
|
||||
<excludes>
|
||||
<exclude>mrapp-generated-classpath</exclude>
|
||||
</excludes>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
|
@ -802,21 +797,6 @@
|
|||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>build-classpath</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>build-classpath</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<outputFile>target/classes/mrapp-generated-classpath</outputFile>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
|
|
|
@ -71,6 +71,8 @@ Release 2.0.3-alpha - Unreleased
|
|||
|
||||
YARN-183. Clean up fair scheduler code. (Sandy Ryza via tomwhite)
|
||||
|
||||
YARN-129. Simplify classpath construction for mini YARN tests. (tomwhite)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
@ -245,6 +247,8 @@ Release 0.23.5 - UNRELEASED
|
|||
YARN-212. NM state machine ignores an APPLICATION_CONTAINER_FINISHED event
|
||||
when it shouldn't (Nathan Roberts via jlowe)
|
||||
|
||||
YARN-219. NM should aggregate logs when application finishes. (bobby)
|
||||
|
||||
Release 0.23.4
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -90,23 +90,6 @@
|
|||
</archive>
|
||||
</configuration>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>build-classpath</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>build-classpath</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<!-- needed to run the unit test for DS to generate the required classpath
|
||||
that is required in the env of the launch container in the mini yarn cluster -->
|
||||
<outputFile>target/classes/yarn-apps-ds-generated-classpath</outputFile>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
|
|
|
@ -494,9 +494,10 @@ public class Client extends YarnClientImpl {
|
|||
classPathEnv.append(":./log4j.properties");
|
||||
|
||||
// add the runtime classpath needed for tests to work
|
||||
String testRuntimeClassPath = Client.getTestRuntimeClasspath();
|
||||
classPathEnv.append(':');
|
||||
classPathEnv.append(testRuntimeClassPath);
|
||||
if (conf.getBoolean(YarnConfiguration.IS_MINI_YARN_CLUSTER, false)) {
|
||||
classPathEnv.append(':');
|
||||
classPathEnv.append(System.getProperty("java.class.path"));
|
||||
}
|
||||
|
||||
env.put("CLASSPATH", classPathEnv.toString());
|
||||
|
||||
|
@ -663,50 +664,4 @@ public class Client extends YarnClientImpl {
|
|||
super.killApplication(appId);
|
||||
}
|
||||
|
||||
private static String getTestRuntimeClasspath() {
|
||||
|
||||
InputStream classpathFileStream = null;
|
||||
BufferedReader reader = null;
|
||||
String envClassPath = "";
|
||||
|
||||
LOG.info("Trying to generate classpath for app master from current thread's classpath");
|
||||
try {
|
||||
|
||||
// Create classpath from generated classpath
|
||||
// Check maven ppom.xml for generated classpath info
|
||||
// Works if compile time env is same as runtime. Mainly tests.
|
||||
ClassLoader thisClassLoader =
|
||||
Thread.currentThread().getContextClassLoader();
|
||||
String generatedClasspathFile = "yarn-apps-ds-generated-classpath";
|
||||
classpathFileStream =
|
||||
thisClassLoader.getResourceAsStream(generatedClasspathFile);
|
||||
if (classpathFileStream == null) {
|
||||
LOG.info("Could not classpath resource from class loader");
|
||||
return envClassPath;
|
||||
}
|
||||
LOG.info("Readable bytes from stream=" + classpathFileStream.available());
|
||||
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
|
||||
String cp = reader.readLine();
|
||||
if (cp != null) {
|
||||
envClassPath += cp.trim() + ":";
|
||||
}
|
||||
// Put the file itself on classpath for tasks.
|
||||
envClassPath += thisClassLoader.getResource(generatedClasspathFile).getFile();
|
||||
} catch (IOException e) {
|
||||
LOG.info("Could not find the necessary resource to generate class path for tests. Error=" + e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
if (classpathFileStream != null) {
|
||||
classpathFileStream.close();
|
||||
}
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to close class path file stream or reader. Error=" + e.getMessage());
|
||||
}
|
||||
return envClassPath;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -86,23 +86,6 @@
|
|||
|
||||
<build>
|
||||
<plugins>
|
||||
<plugin>
|
||||
<artifactId>maven-dependency-plugin</artifactId>
|
||||
<executions>
|
||||
<execution>
|
||||
<id>build-classpath</id>
|
||||
<phase>generate-sources</phase>
|
||||
<goals>
|
||||
<goal>build-classpath</goal>
|
||||
</goals>
|
||||
<configuration>
|
||||
<!-- needed to run the unit test for DS to generate the required classpath
|
||||
that is required in the env of the launch container in the mini yarn cluster -->
|
||||
<outputFile>target/classes/yarn-apps-am-generated-classpath</outputFile>
|
||||
</configuration>
|
||||
</execution>
|
||||
</executions>
|
||||
</plugin>
|
||||
<plugin>
|
||||
<groupId>org.apache.maven.plugins</groupId>
|
||||
<artifactId>maven-surefire-plugin</artifactId>
|
||||
|
|
|
@ -18,12 +18,9 @@
|
|||
|
||||
package org.apache.hadoop.yarn.applications.unmanagedamlauncher;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.io.InputStreamReader;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URL;
|
||||
|
||||
|
@ -80,51 +77,17 @@ public class TestUnmanagedAMLauncher {
|
|||
}
|
||||
|
||||
private static String getTestRuntimeClasspath() {
|
||||
|
||||
InputStream classpathFileStream = null;
|
||||
BufferedReader reader = null;
|
||||
String envClassPath = "";
|
||||
|
||||
LOG.info("Trying to generate classpath for app master from current thread's classpath");
|
||||
try {
|
||||
|
||||
// Create classpath from generated classpath
|
||||
// Check maven pom.xml for generated classpath info
|
||||
// Works if compile time env is same as runtime. Mainly tests.
|
||||
ClassLoader thisClassLoader = Thread.currentThread()
|
||||
.getContextClassLoader();
|
||||
String generatedClasspathFile = "yarn-apps-am-generated-classpath";
|
||||
classpathFileStream = thisClassLoader
|
||||
.getResourceAsStream(generatedClasspathFile);
|
||||
if (classpathFileStream == null) {
|
||||
LOG.info("Could not classpath resource from class loader");
|
||||
return envClassPath;
|
||||
}
|
||||
LOG.info("Readable bytes from stream=" + classpathFileStream.available());
|
||||
reader = new BufferedReader(new InputStreamReader(classpathFileStream));
|
||||
String cp = reader.readLine();
|
||||
if (cp != null) {
|
||||
envClassPath += cp.trim() + File.pathSeparator;
|
||||
}
|
||||
// yarn-site.xml at this location contains proper config for mini cluster
|
||||
URL url = thisClassLoader.getResource("yarn-site.xml");
|
||||
envClassPath += new File(url.getFile()).getParent();
|
||||
} catch (IOException e) {
|
||||
LOG.info("Could not find the necessary resource to generate class path for tests. Error="
|
||||
+ e.getMessage());
|
||||
}
|
||||
|
||||
try {
|
||||
if (classpathFileStream != null) {
|
||||
classpathFileStream.close();
|
||||
}
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.info("Failed to close class path file stream or reader. Error="
|
||||
+ e.getMessage());
|
||||
String envClassPath = "";
|
||||
String cp = System.getProperty("java.class.path");
|
||||
if (cp != null) {
|
||||
envClassPath += cp.trim() + File.pathSeparator;
|
||||
}
|
||||
// yarn-site.xml at this location contains proper config for mini cluster
|
||||
ClassLoader thisClassLoader = Thread.currentThread()
|
||||
.getContextClassLoader();
|
||||
URL url = thisClassLoader.getResource("yarn-site.xml");
|
||||
envClassPath += new File(url.getFile()).getParent();
|
||||
return envClassPath;
|
||||
}
|
||||
|
||||
|
|
|
@ -149,16 +149,13 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
ContainerId containerId;
|
||||
|
||||
while (!this.appFinishing.get()) {
|
||||
try {
|
||||
containerId = this.pendingContainers.poll();
|
||||
if (containerId == null) {
|
||||
Thread.sleep(THREAD_SLEEP_TIME);
|
||||
} else {
|
||||
uploadLogsForContainer(containerId);
|
||||
synchronized(this) {
|
||||
try {
|
||||
wait(THREAD_SLEEP_TIME);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("PendingContainers queue is interrupted");
|
||||
this.appFinishing.set(true);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("PendingContainers queue is interrupted");
|
||||
this.appFinishing.set(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -251,8 +248,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void finishLogAggregation() {
|
||||
public synchronized void finishLogAggregation() {
|
||||
LOG.info("Application just finished : " + this.applicationId);
|
||||
this.appFinishing.set(true);
|
||||
this.notifyAll();
|
||||
}
|
||||
}
|
||||
|
|
2
pom.xml
2
pom.xml
|
@ -517,7 +517,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
|
|||
<groupId>com.atlassian.maven.plugins</groupId>
|
||||
<artifactId>maven-clover2-plugin</artifactId>
|
||||
<configuration>
|
||||
<includesAllSourceRoots>true</includesAllSourceRoots>
|
||||
<includesAllSourceRoots>false</includesAllSourceRoots>
|
||||
<includesTestSourceRoots>true</includesTestSourceRoots>
|
||||
<licenseLocation>${cloverLicenseLocation}</licenseLocation>
|
||||
<cloverDatabase>${cloverDatabase}</cloverDatabase>
|
||||
|
|
Loading…
Reference in New Issue