diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index feea5fd2005..47a7e2cb45b 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -5,6 +5,9 @@ Release 2.7.0 - UNRELEASED INCOMPATIBLE CHANGES NEW FEATURES + + YARN-2179. [YARN-1492] Initial cache manager structure and context. + (Chris Trezzo via kasha) IMPROVEMENTS diff --git a/hadoop-yarn-project/hadoop-yarn/bin/yarn b/hadoop-yarn-project/hadoop-yarn/bin/yarn index a739dac8e31..799eea081fb 100644 --- a/hadoop-yarn-project/hadoop-yarn/bin/yarn +++ b/hadoop-yarn-project/hadoop-yarn/bin/yarn @@ -66,6 +66,7 @@ function print_usage(){ echo " nodemanager run a nodemanager on each slave" echo " timelineserver run the timeline server" echo " rmadmin admin tools" + echo " sharedcachemanager run the SharedCacheManager daemon" echo " version print the version" echo " jar run a jar file" echo " application prints application(s)" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index a67273da946..9d9aac573c3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1285,6 +1285,28 @@ public class YarnConfiguration extends Configuration { public static final boolean TIMELINE_SERVICE_HTTP_CROSS_ORIGIN_ENABLED_DEFAULT = false; + // /////////////////////////////// + // Shared Cache Configs + // /////////////////////////////// + public static final String SHARED_CACHE_PREFIX = "yarn.sharedcache."; + + // common configs + /** whether the shared cache is enabled/disabled */ + public static final String SHARED_CACHE_ENABLED = + SHARED_CACHE_PREFIX + "enabled"; + public static final boolean DEFAULT_SHARED_CACHE_ENABLED = false; + + /** The config key for the shared cache root directory. */ + public static final String SHARED_CACHE_ROOT = + SHARED_CACHE_PREFIX + "root-dir"; + public static final String DEFAULT_SHARED_CACHE_ROOT = "/sharedcache"; + + /** The config key for the level of nested directories before getting to the + * checksum directory. */ + public static final String SHARED_CACHE_NESTED_LEVEL = + SHARED_CACHE_PREFIX + "nested-level"; + public static final int DEFAULT_SHARED_CACHE_NESTED_LEVEL = 3; + //////////////////////////////// // Other Configs //////////////////////////////// diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 2c9ca68abe7..e40e53e1a2a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1309,6 +1309,26 @@ /etc/krb5.keytab + + + Whether the shared cache is enabled + yarn.sharedcache.enabled + false + + + + The root directory for the shared cache + yarn.sharedcache.root-dir + /sharedcache + + + + The level of nested directories before getting to the checksum + directories. It must be non-negative. + yarn.sharedcache.nested-level + 3 + + The interval that the yarn client library uses to poll the diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java new file mode 100644 index 00000000000..1bac75b74e2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/sharedcache/SharedCacheStructureUtil.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcache; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * A utility class that contains helper methods for dealing with the internal + * shared cache structure. + */ +@Private +@Unstable +public class SharedCacheStructureUtil { + + private static final Log LOG = LogFactory.getLog(SharedCacheStructureUtil.class); + + @Private + public static int getCacheDepth(Configuration conf) { + int cacheDepth = + conf.getInt(YarnConfiguration.SHARED_CACHE_NESTED_LEVEL, + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL); + + if (cacheDepth <= 0) { + LOG.warn("Specified cache depth was less than or equal to zero." + + " Using default value instead. Default: " + + YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL + + ", Specified: " + cacheDepth); + cacheDepth = YarnConfiguration.DEFAULT_SHARED_CACHE_NESTED_LEVEL; + } + + return cacheDepth; + } + + @Private + public static String getCacheEntryPath(int cacheDepth, String cacheRoot, + String checksum) { + + if (cacheDepth <= 0) { + throw new IllegalArgumentException( + "The cache depth must be greater than 0. Passed value: " + cacheDepth); + } + if (checksum.length() < cacheDepth) { + throw new IllegalArgumentException("The checksum passed was too short: " + + checksum); + } + + // Build the cache entry path to the specified depth. For example, if the + // depth is 3 and the checksum is 3c4f, the path would be: + // SHARED_CACHE_ROOT/3/c/4/3c4f + StringBuilder sb = new StringBuilder(cacheRoot); + for (int i = 0; i < cacheDepth; i++) { + sb.append(Path.SEPARATOR_CHAR); + sb.append(checksum.charAt(i)); + } + sb.append(Path.SEPARATOR_CHAR).append(checksum); + + return sb.toString(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml new file mode 100644 index 00000000000..869298bfb4a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/pom.xml @@ -0,0 +1,90 @@ + + + + 4.0.0 + + hadoop-yarn-server + org.apache.hadoop + 3.0.0-SNAPSHOT + + org.apache.hadoop + hadoop-yarn-server-sharedcachemanager + 3.0.0-SNAPSHOT + hadoop-yarn-server-sharedcachemanager + + + + ${project.parent.parent.basedir} + + + + + org.apache.hadoop + hadoop-common + + + org.apache.hadoop + hadoop-yarn-api + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-yarn-client + + + junit + junit + test + + + org.mockito + mockito-all + test + + + org.apache.hadoop + hadoop-common + test-jar + test + + + org.apache.hadoop + hadoop-yarn-server-tests + test + test-jar + + + + + + + + + maven-jar-plugin + + + + test-jar + + test-compile + + + + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/AppChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/AppChecker.java new file mode 100644 index 00000000000..4ac94106700 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/AppChecker.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.util.Collection; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * An interface for checking whether an app is running so that the cleaner + * service may determine if it can safely remove a cached entry. + */ +@Private +@Evolving +public abstract class AppChecker extends CompositeService { + + public AppChecker() { + super("AppChecker"); + } + + public AppChecker(String name) { + super(name); + } + + /** + * Returns whether the app is in an active state. + * + * @return true if the app is found and is not in one of the completed states; + * false otherwise + * @throws YarnException if there is an error in determining the app state + */ + @Private + public abstract boolean isApplicationActive(ApplicationId id) + throws YarnException; + + /** + * Returns the list of all active apps at the given time. + * + * @return the list of active apps, or an empty list if there is none + * @throws YarnException if there is an error in obtaining the list + */ + @Private + public abstract Collection getActiveApplications() + throws YarnException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/RemoteAppChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/RemoteAppChecker.java new file mode 100644 index 00000000000..2bf3a896355 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/RemoteAppChecker.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.EnumSet; +import java.util.List; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * An implementation of AppChecker that queries the resource manager remotely to + * determine whether the app is running. + */ +@Private +@Unstable +public class RemoteAppChecker extends AppChecker { + + private static final EnumSet ACTIVE_STATES = EnumSet + .of(YarnApplicationState.NEW, YarnApplicationState.ACCEPTED, + YarnApplicationState.NEW_SAVING, YarnApplicationState.SUBMITTED, + YarnApplicationState.RUNNING); + + private final YarnClient client; + + public RemoteAppChecker() { + this(YarnClient.createYarnClient()); + } + + RemoteAppChecker(YarnClient client) { + super("RemoteAppChecker"); + this.client = client; + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + addService(client); + super.serviceInit(conf); + } + + @Override + @Private + public boolean isApplicationActive(ApplicationId id) throws YarnException { + ApplicationReport report = null; + try { + report = client.getApplicationReport(id); + } catch (ApplicationNotFoundException e) { + // the app does not exist + return false; + } catch (IOException e) { + throw new YarnException(e); + } + + if (report == null) { + // the app does not exist + return false; + } + + return ACTIVE_STATES.contains(report.getYarnApplicationState()); + } + + @Override + @Private + public Collection getActiveApplications() throws YarnException { + try { + List activeApps = new ArrayList(); + List apps = client.getApplications(ACTIVE_STATES); + for (ApplicationReport app: apps) { + activeApps.add(app.getApplicationId()); + } + return activeApps; + } catch (IOException e) { + throw new YarnException(e); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java new file mode 100644 index 00000000000..866c094e695 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/SharedCacheManager.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.source.JvmMetrics; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * This service maintains the shared cache meta data. It handles claiming and + * releasing of resources, all rpc calls from the client to the shared cache + * manager, and administrative commands. It also persists the shared cache meta + * data to a backend store, and cleans up stale entries on a regular basis. + */ +@Private +@Unstable +public class SharedCacheManager extends CompositeService { + /** + * Priority of the SharedCacheManager shutdown hook. + */ + public static final int SHUTDOWN_HOOK_PRIORITY = 30; + + private static final Log LOG = LogFactory.getLog(SharedCacheManager.class); + + public SharedCacheManager() { + super("SharedCacheManager"); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + // init metrics + DefaultMetricsSystem.initialize("SharedCacheManager"); + JvmMetrics.initSingleton("SharedCacheManager", null); + + super.serviceInit(conf); + } + + @Override + protected void serviceStop() throws Exception { + + DefaultMetricsSystem.shutdown(); + super.serviceStop(); + } + + public static void main(String[] args) { + Thread.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(SharedCacheManager.class, args, LOG); + try { + Configuration conf = new YarnConfiguration(); + SharedCacheManager sharedCacheManager = new SharedCacheManager(); + ShutdownHookManager.get().addShutdownHook( + new CompositeServiceShutdownHook(sharedCacheManager), + SHUTDOWN_HOOK_PRIORITY); + sharedCacheManager.init(conf); + sharedCacheManager.start(); + } catch (Throwable t) { + LOG.fatal("Error starting SharedCacheManager", t); + System.exit(-1); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestRemoteAppChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestRemoteAppChecker.java new file mode 100644 index 00000000000..46546444ad5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/TestRemoteAppChecker.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.sharedcachemanager; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl; +import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException; +import org.junit.After; +import org.junit.Test; + +public class TestRemoteAppChecker { + + private RemoteAppChecker checker; + + @After + public void cleanup() { + if (checker != null) { + checker.stop(); + } + } + + /** + * Creates/initializes/starts a RemoteAppChecker with a spied + * DummyYarnClientImpl. + * + * @return the spied DummyYarnClientImpl in the created AppChecker + */ + private YarnClient createCheckerWithMockedClient() { + YarnClient client = spy(new DummyYarnClientImpl()); + checker = new RemoteAppChecker(client); + checker.init(new Configuration()); + checker.start(); + return client; + } + + @Test + public void testNonExistentApp() throws Exception { + YarnClient client = createCheckerWithMockedClient(); + ApplicationId id = ApplicationId.newInstance(1, 1); + + // test for null + doReturn(null).when(client).getApplicationReport(id); + assertFalse(checker.isApplicationActive(id)); + + // test for ApplicationNotFoundException + doThrow(new ApplicationNotFoundException("Throw!")).when(client) + .getApplicationReport(id); + assertFalse(checker.isApplicationActive(id)); + } + + @Test + public void testRunningApp() throws Exception { + YarnClient client = createCheckerWithMockedClient(); + ApplicationId id = ApplicationId.newInstance(1, 1); + + // create a report and set the state to an active one + ApplicationReport report = new ApplicationReportPBImpl(); + report.setYarnApplicationState(YarnApplicationState.ACCEPTED); + doReturn(report).when(client).getApplicationReport(id); + + assertTrue(checker.isApplicationActive(id)); + } + + class DummyYarnClientImpl extends YarnClientImpl { + @Override + protected void serviceInit(Configuration conf) throws Exception { + // do nothing + } + + @Override + protected void serviceStart() { + // do nothing + } + + @Override + protected void serviceStop() { + // do nothing + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml index ef349b4f179..13129f8f1bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/pom.xml @@ -39,6 +39,7 @@ hadoop-yarn-server-nodemanager hadoop-yarn-server-web-proxy hadoop-yarn-server-resourcemanager + hadoop-yarn-server-sharedcachemanager hadoop-yarn-server-tests hadoop-yarn-server-applicationhistoryservice