Merge branch 'trunk' into HDFS-6581

This commit is contained in:
arp 2014-10-01 08:52:21 -07:00
commit e431d054cb
13 changed files with 588 additions and 2 deletions

View File

@ -780,6 +780,9 @@ Release 2.6.0 - UNRELEASED
HDFS-4227. Document dfs.namenode.resource.* (Daisuke Kobayashi via aw)
HDFS-6754. TestNamenodeCapacityReport may sometimes fail due to lack of
retry. (Mit Desai via kihwal)
BREAKDOWN OF HDFS-6134 AND HADOOP-10150 SUBTASKS AND RELATED JIRAS
HDFS-6387. HDFS CLI admin tool for creating & deleting an

View File

@ -169,8 +169,8 @@ public class TestNamenodeCapacityReport {
@Test
public void testXceiverCount() throws Exception {
Configuration conf = new HdfsConfiguration();
// don't waste time retrying if close fails
conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 0);
// retry one time, if close fails
conf.setInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY, 1);
MiniDFSCluster cluster = null;
final int nodes = 8;

View File

@ -32,6 +32,9 @@ Release 2.7.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
YARN-2179. [YARN-1492] Initial cache manager structure and context.
(Chris Trezzo via kasha)
IMPROVEMENTS

View File

@ -33,6 +33,7 @@ function hadoop_usage
echo " resourcemanager run the ResourceManager"
echo " resourcemanager -format-state-store deletes the RMStateStore"
echo " rmadmin admin tools"
echo " sharedcachemanager run the SharedCacheManager daemon"
echo " timelineserver run the timeline server"
echo " version print the version"
echo " or"
@ -149,6 +150,11 @@ case "${COMMAND}" in
JAVA_HEAP_MAX="-Xmx${YARN_TIMELINESERVER_HEAPSIZE}m"
fi
;;
sharedcachemanager)
daemon="true"
CLASS='org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager'
YARN_OPTS="$YARN_OPTS $YARN_SHAREDCACHEMANAGER_OPTS"
;;
version)
CLASS=org.apache.hadoop.util.VersionInfo
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"

View File

@ -1285,6 +1285,28 @@ public class YarnConfiguration extends Configuration {
public static final boolean
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false;
// ///////////////////////////////
// Shared Cache Configs
// ///////////////////////////////
public static final String SHARED_CACHE_PREFIX = "yarn.sharedcache.";
// common configs
/** whether the shared cache is enabled/disabled */
public static final String SHARED_CACHE_ENABLED =
SHARED_CACHE_PREFIX + "enabled";
public static final boolean DEFAULT_SHARED_CACHE_ENABLED = false;
/** The config key for the shared cache root directory. */
public static final String SHARED_CACHE_ROOT =
SHARED_CACHE_PREFIX + "root-dir";
public static final String DEFAULT_SHARED_CACHE_ROOT = "/sharedcache";
/** The config key for the level of nested directories before getting to the
* checksum directory. */
public static final String SHARED_CACHE_NESTED_LEVEL =
SHARED_CACHE_PREFIX + "nested-level";
public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
////////////////////////////////
// Other Configs
////////////////////////////////

View File

@ -1310,6 +1310,26 @@
<value>/etc/krb5.keytab</value>
</property>
<!-- Shared Cache Configuration -->
<property>
<description>Whether the shared cache is enabled</description>
<name>yarn.sharedcache.enabled</name>
<value>false</value>
</property>
<property>
<description>The root directory for the shared cache</description>
<name>yarn.sharedcache.root-dir</name>
<value>/sharedcache</value>
</property>
<property>
<description>The level of nested directories before getting to the checksum
directories. It must be non-negative.</description>
<name>yarn.sharedcache.nested-level</name>
<value>3</value>
</property>
<!-- Other configuration -->
<property>
<description>The interval that the yarn client library uses to poll the

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcache;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* A utility class that contains helper methods for dealing with the internal
* shared cache structure.
*/
@Private
@Unstable
public class SharedCacheStructureUtil {
private static final Log LOG = LogFactory.getLog(SharedCacheStructureUtil.class);
@Private
public static int getCacheDepth(Configuration conf) {
int cacheDepth =
conf.getInt(YarnConfiguration.SHARED_CACHE_NESTED_LEVEL,
YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL);
if (cacheDepth <= 0) {
LOG.warn("Specified cache depth was less than or equal to zero."
+ " Using default value instead. Default: "
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL
+ ", Specified: " + cacheDepth);
cacheDepth = YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
}
return cacheDepth;
}
@Private
public static String getCacheEntryPath(int cacheDepth, String cacheRoot,
String checksum) {
if (cacheDepth <= 0) {
throw new IllegalArgumentException(
"The cache depth must be greater than 0. Passed value: " + cacheDepth);
}
if (checksum.length() < cacheDepth) {
throw new IllegalArgumentException("The checksum passed was too short: "
+ checksum);
}
// Build the cache entry path to the specified depth. For example, if the
// depth is 3 and the checksum is 3c4f, the path would be:
// SHARED_CACHE_ROOT/3/c/4/3c4f
StringBuilder sb = new StringBuilder(cacheRoot);
for (int i = 0; i < cacheDepth; i++) {
sb.append(Path.SEPARATOR_CHAR);
sb.append(checksum.charAt(i));
}
sb.append(Path.SEPARATOR_CHAR).append(checksum);
return sb.toString();
}
}

View File

@ -0,0 +1,90 @@
<?xml version="1.0"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns:pom="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>3.0.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-sharedcachemanager</artifactId>
<version>3.0.0-SNAPSHOT</version>
<name>hadoop-yarn-server-sharedcachemanager</name>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Publish tests jar -->
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* An interface for checking whether an app is running so that the cleaner
* service may determine if it can safely remove a cached entry.
*/
@Private
@Evolving
public abstract class AppChecker extends CompositeService {
public AppChecker() {
super("AppChecker");
}
public AppChecker(String name) {
super(name);
}
/**
* Returns whether the app is in an active state.
*
* @return true if the app is found and is not in one of the completed states;
* false otherwise
* @throws YarnException if there is an error in determining the app state
*/
@Private
public abstract boolean isApplicationActive(ApplicationId id)
throws YarnException;
/**
* Returns the list of all active apps at the given time.
*
* @return the list of active apps, or an empty list if there is none
* @throws YarnException if there is an error in obtaining the list
*/
@Private
public abstract Collection<ApplicationId> getActiveApplications()
throws YarnException;
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* An implementation of AppChecker that queries the resource manager remotely to
* determine whether the app is running.
*/
@Private
@Unstable
public class RemoteAppChecker extends AppChecker {
private static final EnumSet<YarnApplicationState> ACTIVE_STATES = EnumSet
.of(YarnApplicationState.NEW, YarnApplicationState.ACCEPTED,
YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED,
YarnApplicationState.RUNNING);
private final YarnClient client;
public RemoteAppChecker() {
this(YarnClient.createYarnClient());
}
RemoteAppChecker(YarnClient client) {
super("RemoteAppChecker");
this.client = client;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
addService(client);
super.serviceInit(conf);
}
@Override
@Private
public boolean isApplicationActive(ApplicationId id) throws YarnException {
ApplicationReport report = null;
try {
report = client.getApplicationReport(id);
} catch (ApplicationNotFoundException e) {
// the app does not exist
return false;
} catch (IOException e) {
throw new YarnException(e);
}
if (report == null) {
// the app does not exist
return false;
}
return ACTIVE_STATES.contains(report.getYarnApplicationState());
}
@Override
@Private
public Collection<ApplicationId> getActiveApplications() throws YarnException {
try {
List<ApplicationId> activeApps = new ArrayList<ApplicationId>();
List<ApplicationReport> apps = client.getApplications(ACTIVE_STATES);
for (ApplicationReport app: apps) {
activeApps.add(app.getApplicationId());
}
return activeApps;
} catch (IOException e) {
throw new YarnException(e);
}
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* This service maintains the shared cache meta data. It handles claiming and
* releasing of resources, all rpc calls from the client to the shared cache
* manager, and administrative commands. It also persists the shared cache meta
* data to a backend store, and cleans up stale entries on a regular basis.
*/
@Private
@Unstable
public class SharedCacheManager extends CompositeService {
/**
* Priority of the SharedCacheManager shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(SharedCacheManager.class);
public SharedCacheManager() {
super("SharedCacheManager");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// init metrics
DefaultMetricsSystem.initialize("SharedCacheManager");
JvmMetrics.initSingleton("SharedCacheManager", null);
super.serviceInit(conf);
}
@Override
protected void serviceStop() throws Exception {
DefaultMetricsSystem.shutdown();
super.serviceStop();
}
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
try {
Configuration conf = new YarnConfiguration();
SharedCacheManager sharedCacheManager = new SharedCacheManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(sharedCacheManager),
SHUTDOWN_HOOK_PRIORITY);
sharedCacheManager.init(conf);
sharedCacheManager.start();
} catch (Throwable t) {
LOG.fatal("Error starting SharedCacheManager", t);
System.exit(-1);
}
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.junit.After;
import org.junit.Test;
public class TestRemoteAppChecker {
private RemoteAppChecker checker;
@After
public void cleanup() {
if (checker != null) {
checker.stop();
}
}
/**
* Creates/initializes/starts a RemoteAppChecker with a spied
* DummyYarnClientImpl.
*
* @return the spied DummyYarnClientImpl in the created AppChecker
*/
private YarnClient createCheckerWithMockedClient() {
YarnClient client = spy(new DummyYarnClientImpl());
checker = new RemoteAppChecker(client);
checker.init(new Configuration());
checker.start();
return client;
}
@Test
public void testNonExistentApp() throws Exception {
YarnClient client = createCheckerWithMockedClient();
ApplicationId id = ApplicationId.newInstance(1, 1);
// test for null
doReturn(null).when(client).getApplicationReport(id);
assertFalse(checker.isApplicationActive(id));
// test for ApplicationNotFoundException
doThrow(new ApplicationNotFoundException("Throw!")).when(client)
.getApplicationReport(id);
assertFalse(checker.isApplicationActive(id));
}
@Test
public void testRunningApp() throws Exception {
YarnClient client = createCheckerWithMockedClient();
ApplicationId id = ApplicationId.newInstance(1, 1);
// create a report and set the state to an active one
ApplicationReport report = new ApplicationReportPBImpl();
report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
doReturn(report).when(client).getApplicationReport(id);
assertTrue(checker.isApplicationActive(id));
}
class DummyYarnClientImpl extends YarnClientImpl {
@Override
protected void serviceInit(Configuration conf) throws Exception {
// do nothing
}
@Override
protected void serviceStart() {
// do nothing
}
@Override
protected void serviceStop() {
// do nothing
}
}
}

View File

@ -39,6 +39,7 @@
<module>hadoop-yarn-server-nodemanager</module>
<module>hadoop-yarn-server-web-proxy</module>
<module>hadoop-yarn-server-resourcemanager</module>
<module>hadoop-yarn-server-sharedcachemanager</module>
<module>hadoop-yarn-server-tests</module>
<module>hadoop-yarn-server-applicationhistoryservice</module>
</modules>