YARN-2179. [YARN-1492] Initial cache manager structure and context. (Chris Trezzo via kasha)

This commit is contained in:
Karthik Kambatla 2014-09-30 19:01:47 -07:00
parent 9e9e9cf711
commit 17d1202c35
11 changed files with 583 additions and 0 deletions

View File

@ -32,6 +32,9 @@ Release 2.7.0 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
YARN-2179. [YARN-1492] Initial cache manager structure and context.
(Chris Trezzo via kasha)
IMPROVEMENTS

View File

@ -33,6 +33,7 @@ function hadoop_usage
echo " resourcemanager run the ResourceManager"
echo " resourcemanager -format-state-store deletes the RMStateStore"
echo " rmadmin admin tools"
echo " sharedcachemanager run the SharedCacheManager daemon"
echo " timelineserver run the timeline server"
echo " version print the version"
echo " or"
@ -149,6 +150,11 @@ case "${COMMAND}" in
JAVA_HEAP_MAX="-Xmx${YARN_TIMELINESERVER_HEAPSIZE}m"
fi
;;
sharedcachemanager)
daemon="true"
CLASS='org.apache.hadoop.yarn.server.sharedcachemanager.SharedCacheManager'
YARN_OPTS="$YARN_OPTS $YARN_SHAREDCACHEMANAGER_OPTS"
;;
version)
CLASS=org.apache.hadoop.util.VersionInfo
hadoop_debug "Append YARN_CLIENT_OPTS onto YARN_OPTS"

View File

@ -1285,6 +1285,28 @@ public class YarnConfiguration extends Configuration {
public static final boolean
TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false;
// ///////////////////////////////
// Shared Cache Configs
// ///////////////////////////////
public static final String SHARED_CACHE_PREFIX = "yarn.sharedcache.";
// common configs
/** whether the shared cache is enabled/disabled */
public static final String SHARED_CACHE_ENABLED =
SHARED_CACHE_PREFIX + "enabled";
public static final boolean DEFAULT_SHARED_CACHE_ENABLED = false;
/** The config key for the shared cache root directory. */
public static final String SHARED_CACHE_ROOT =
SHARED_CACHE_PREFIX + "root-dir";
public static final String DEFAULT_SHARED_CACHE_ROOT = "/sharedcache";
/** The config key for the level of nested directories before getting to the
* checksum directory. */
public static final String SHARED_CACHE_NESTED_LEVEL =
SHARED_CACHE_PREFIX + "nested-level";
public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3;
////////////////////////////////
// Other Configs
////////////////////////////////

View File

@ -1310,6 +1310,26 @@
<value>/etc/krb5.keytab</value>
</property>
<!-- Shared Cache Configuration -->
<property>
<description>Whether the shared cache is enabled</description>
<name>yarn.sharedcache.enabled</name>
<value>false</value>
</property>
<property>
<description>The root directory for the shared cache</description>
<name>yarn.sharedcache.root-dir</name>
<value>/sharedcache</value>
</property>
<property>
<description>The level of nested directories before getting to the checksum
directories. It must be non-negative.</description>
<name>yarn.sharedcache.nested-level</name>
<value>3</value>
</property>
<!-- Other configuration -->
<property>
<description>The interval that the yarn client library uses to poll the

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcache;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* A utility class that contains helper methods for dealing with the internal
* shared cache structure.
*/
@Private
@Unstable
public class SharedCacheStructureUtil {
private static final Log LOG = LogFactory.getLog(SharedCacheStructureUtil.class);
@Private
public static int getCacheDepth(Configuration conf) {
int cacheDepth =
conf.getInt(YarnConfiguration.SHARED_CACHE_NESTED_LEVEL,
YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL);
if (cacheDepth <= 0) {
LOG.warn("Specified cache depth was less than or equal to zero."
+ " Using default value instead. Default: "
+ YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL
+ ", Specified: " + cacheDepth);
cacheDepth = YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL;
}
return cacheDepth;
}
@Private
public static String getCacheEntryPath(int cacheDepth, String cacheRoot,
String checksum) {
if (cacheDepth <= 0) {
throw new IllegalArgumentException(
"The cache depth must be greater than 0. Passed value: " + cacheDepth);
}
if (checksum.length() < cacheDepth) {
throw new IllegalArgumentException("The checksum passed was too short: "
+ checksum);
}
// Build the cache entry path to the specified depth. For example, if the
// depth is 3 and the checksum is 3c4f, the path would be:
// SHARED_CACHE_ROOT/3/c/4/3c4f
StringBuilder sb = new StringBuilder(cacheRoot);
for (int i = 0; i < cacheDepth; i++) {
sb.append(Path.SEPARATOR_CHAR);
sb.append(checksum.charAt(i));
}
sb.append(Path.SEPARATOR_CHAR).append(checksum);
return sb.toString();
}
}

View File

@ -0,0 +1,90 @@
<?xml version="1.0"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project xmlns:pom="http://maven.apache.org/POM/4.0.0">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>hadoop-yarn-server</artifactId>
<groupId>org.apache.hadoop</groupId>
<version>3.0.0-SNAPSHOT</version>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-sharedcachemanager</artifactId>
<version>3.0.0-SNAPSHOT</version>
<name>hadoop-yarn-server-sharedcachemanager</name>
<properties>
<!-- Needed for generating FindBugs warnings using parent pom -->
<yarn.basedir>${project.parent.parent.basedir}</yarn.basedir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Publish tests jar -->
<plugin>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* An interface for checking whether an app is running so that the cleaner
* service may determine if it can safely remove a cached entry.
*/
@Private
@Evolving
public abstract class AppChecker extends CompositeService {
public AppChecker() {
super("AppChecker");
}
public AppChecker(String name) {
super(name);
}
/**
* Returns whether the app is in an active state.
*
* @return true if the app is found and is not in one of the completed states;
* false otherwise
* @throws YarnException if there is an error in determining the app state
*/
@Private
public abstract boolean isApplicationActive(ApplicationId id)
throws YarnException;
/**
* Returns the list of all active apps at the given time.
*
* @return the list of active apps, or an empty list if there is none
* @throws YarnException if there is an error in obtaining the list
*/
@Private
public abstract Collection<ApplicationId> getActiveApplications()
throws YarnException;
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* An implementation of AppChecker that queries the resource manager remotely to
* determine whether the app is running.
*/
@Private
@Unstable
public class RemoteAppChecker extends AppChecker {
private static final EnumSet<YarnApplicationState> ACTIVE_STATES = EnumSet
.of(YarnApplicationState.NEW, YarnApplicationState.ACCEPTED,
YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED,
YarnApplicationState.RUNNING);
private final YarnClient client;
public RemoteAppChecker() {
this(YarnClient.createYarnClient());
}
RemoteAppChecker(YarnClient client) {
super("RemoteAppChecker");
this.client = client;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
addService(client);
super.serviceInit(conf);
}
@Override
@Private
public boolean isApplicationActive(ApplicationId id) throws YarnException {
ApplicationReport report = null;
try {
report = client.getApplicationReport(id);
} catch (ApplicationNotFoundException e) {
// the app does not exist
return false;
} catch (IOException e) {
throw new YarnException(e);
}
if (report == null) {
// the app does not exist
return false;
}
return ACTIVE_STATES.contains(report.getYarnApplicationState());
}
@Override
@Private
public Collection<ApplicationId> getActiveApplications() throws YarnException {
try {
List<ApplicationId> activeApps = new ArrayList<ApplicationId>();
List<ApplicationReport> apps = client.getApplications(ACTIVE_STATES);
for (ApplicationReport app: apps) {
activeApps.add(app.getApplicationId());
}
return activeApps;
} catch (IOException e) {
throw new YarnException(e);
}
}
}

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
/**
* This service maintains the shared cache meta data. It handles claiming and
* releasing of resources, all rpc calls from the client to the shared cache
* manager, and administrative commands. It also persists the shared cache meta
* data to a backend store, and cleans up stale entries on a regular basis.
*/
@Private
@Unstable
public class SharedCacheManager extends CompositeService {
/**
* Priority of the SharedCacheManager shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(SharedCacheManager.class);
public SharedCacheManager() {
super("SharedCacheManager");
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
// init metrics
DefaultMetricsSystem.initialize("SharedCacheManager");
JvmMetrics.initSingleton("SharedCacheManager", null);
super.serviceInit(conf);
}
@Override
protected void serviceStop() throws Exception {
DefaultMetricsSystem.shutdown();
super.serviceStop();
}
public static void main(String[] args) {
Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG);
try {
Configuration conf = new YarnConfiguration();
SharedCacheManager sharedCacheManager = new SharedCacheManager();
ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(sharedCacheManager),
SHUTDOWN_HOOK_PRIORITY);
sharedCacheManager.init(conf);
sharedCacheManager.start();
} catch (Throwable t) {
LOG.fatal("Error starting SharedCacheManager", t);
System.exit(-1);
}
}
}

View File

@ -0,0 +1,107 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
import org.junit.After;
import org.junit.Test;
public class TestRemoteAppChecker {
private RemoteAppChecker checker;
@After
public void cleanup() {
if (checker != null) {
checker.stop();
}
}
/**
* Creates/initializes/starts a RemoteAppChecker with a spied
* DummyYarnClientImpl.
*
* @return the spied DummyYarnClientImpl in the created AppChecker
*/
private YarnClient createCheckerWithMockedClient() {
YarnClient client = spy(new DummyYarnClientImpl());
checker = new RemoteAppChecker(client);
checker.init(new Configuration());
checker.start();
return client;
}
@Test
public void testNonExistentApp() throws Exception {
YarnClient client = createCheckerWithMockedClient();
ApplicationId id = ApplicationId.newInstance(1, 1);
// test for null
doReturn(null).when(client).getApplicationReport(id);
assertFalse(checker.isApplicationActive(id));
// test for ApplicationNotFoundException
doThrow(new ApplicationNotFoundException("Throw!")).when(client)
.getApplicationReport(id);
assertFalse(checker.isApplicationActive(id));
}
@Test
public void testRunningApp() throws Exception {
YarnClient client = createCheckerWithMockedClient();
ApplicationId id = ApplicationId.newInstance(1, 1);
// create a report and set the state to an active one
ApplicationReport report = new ApplicationReportPBImpl();
report.setYarnApplicationState(YarnApplicationState.ACCEPTED);
doReturn(report).when(client).getApplicationReport(id);
assertTrue(checker.isApplicationActive(id));
}
class DummyYarnClientImpl extends YarnClientImpl {
@Override
protected void serviceInit(Configuration conf) throws Exception {
// do nothing
}
@Override
protected void serviceStart() {
// do nothing
}
@Override
protected void serviceStop() {
// do nothing
}
}
}

View File

@ -39,6 +39,7 @@
<module>hadoop-yarn-server-nodemanager</module>
<module>hadoop-yarn-server-web-proxy</module>
<module>hadoop-yarn-server-resourcemanager</module>
<module>hadoop-yarn-server-sharedcachemanager</module>
<module>hadoop-yarn-server-tests</module>
<module>hadoop-yarn-server-applicationhistoryservice</module>
</modules>