From 1c03376300a46722d4147f5b8f37242f68dba0a2 Mon Sep 17 00:00:00 2001 From: Jian He Date: Wed, 18 Feb 2015 16:06:55 -0800 Subject: [PATCH] YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA. Contributed by Tsuyoshi OZAWA --- hadoop-project/pom.xml | 1 - hadoop-yarn-project/CHANGES.txt | 3 + .../pom.xml | 8 +- .../apache/hadoop/test/YarnTestDriver.java | 60 ++++ .../recovery/RMStateStoreTestBase.java | 19 +- .../recovery/TestZKRMStateStorePerf.java | 277 ++++++++++++++++++ 6 files changed, 359 insertions(+), 9 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 0c7cfc864d5..2c0f03a5f12 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -828,7 +828,6 @@ zookeeper ${zookeeper.version} test-jar - test org.jboss.netty diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 884b506d86a..91ce11f7896 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -301,6 +301,9 @@ Release 2.7.0 - UNRELEASED YARN-1299. Improve a log message in AppSchedulingInfo by adding application id. (Ashutosh Jindal and Devaraj K via ozawa) + YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA. + (Tsuyoshi OZAWA via jianhe) + OPTIMIZATIONS YARN-2990. FairScheduler's delay-scheduling always waits for node-local and diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml index 9bcc7c879da..ff429ccd3ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/pom.xml @@ -186,7 +186,6 @@ org.apache.zookeeper zookeeper test-jar - test @@ -245,6 +244,13 @@ test-jar test-compile + + + + org.apache.hadoop.test.YarnTestDriver + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java new file mode 100644 index 00000000000..8874ed822da --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/test/YarnTestDriver.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.test; + +import org.apache.hadoop.util.ProgramDriver; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStorePerf; + +/** + * Driver for Yarn tests. + * + */ +public class YarnTestDriver { + + private ProgramDriver pgd; + + public YarnTestDriver() { + this(new ProgramDriver()); + } + + public YarnTestDriver(ProgramDriver pgd) { + this.pgd = pgd; + try { + pgd.addClass(TestZKRMStateStorePerf.class.getSimpleName(), + TestZKRMStateStorePerf.class, + "ZKRMStateStore i/o benchmark."); + } catch(Throwable e) { + e.printStackTrace(); + } + } + + public void run(String argv[]) { + int exitCode = -1; + try { + exitCode = pgd.run(argv); + } catch(Throwable e) { + e.printStackTrace(); + } + System.exit(exitCode); + } + + public static void main(String argv[]){ + new YarnTestDriver().run(argv); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java index b01969bd085..5b53a0235af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/RMStateStoreTestBase.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery; +import org.apache.hadoop.yarn.event.Event; import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -79,8 +80,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class); - static class TestDispatcher implements - Dispatcher, EventHandler { + static class TestDispatcher implements Dispatcher, EventHandler { ApplicationAttemptId attemptId; @@ -93,8 +93,11 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ } @Override - public void handle(RMAppAttemptEvent event) { - assertEquals(attemptId, event.getApplicationAttemptId()); + public void handle(Event event) { + if (event instanceof RMAppAttemptEvent) { + RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent) event; + assertEquals(attemptId, rmAppAttemptEvent.getApplicationAttemptId()); + } notified = true; synchronized (this) { notifyAll(); @@ -134,7 +137,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ dispatcher.notified = false; } - RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime, + protected RMApp storeApp(RMStateStore store, ApplicationId appId, + long submitTime, long startTime) throws Exception { ApplicationSubmissionContext context = new ApplicationSubmissionContextPBImpl(); @@ -150,7 +154,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ return mockApp; } - ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId, + protected ContainerId storeAttempt(RMStateStore store, + ApplicationAttemptId attemptId, String containerIdStr, Token appToken, SecretKey clientTokenMasterKey, TestDispatcher dispatcher) throws Exception { @@ -474,7 +479,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{ } - private Token generateAMRMToken( + protected Token generateAMRMToken( ApplicationAttemptId attemptId, AMRMTokenSecretManager appTokenMgr) { Token appToken = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java new file mode 100644 index 00000000000..654b3572982 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/recovery/TestZKRMStateStorePerf.java @@ -0,0 +1,277 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.recovery; + +import com.google.common.base.Optional; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import javax.crypto.SecretKey; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Before; +import org.junit.After; +import org.junit.Test; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestZKRMStateStorePerf extends RMStateStoreTestBase + implements Tool { + public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class); + + final String version = "0.1"; + + // Configurable variables for performance test + private int ZK_PERF_NUM_APP_DEFAULT = 1000; + private int ZK_PERF_NUM_APPATTEMPT_PER_APP = 10; + + private final long clusterTimeStamp = 1352994193343L; + + private static final String USAGE = + "Usage: " + TestZKRMStateStorePerf.class.getSimpleName() + + " -appSize numberOfApplications" + + " -appAttemptSize numberOfApplicationAttempts" + + " [-hostPort Host:Port]" + + " [-workingZnode rootZnodeForTesting]\n"; + + private YarnConfiguration conf = null; + private String workingZnode = "/Test"; + private ZKRMStateStore store; + private AMRMTokenSecretManager appTokenMgr; + private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr; + + @Before + public void setUpZKServer() throws Exception { + super.setUp(); + } + + @After + public void tearDown() throws Exception { + if (store != null) { + store.stop(); + } + if (appTokenMgr != null) { + appTokenMgr.stop(); + } + super.tearDown(); + } + + private void initStore(String hostPort) { + Optional optHostPort = Optional.fromNullable(hostPort); + RMContext rmContext = mock(RMContext.class); + + conf = new YarnConfiguration(); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort)); + conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); + + store = new ZKRMStateStore(); + store.init(conf); + store.start(); + when(rmContext.getStateStore()).thenReturn(store); + appTokenMgr = new AMRMTokenSecretManager(conf, rmContext); + appTokenMgr.start(); + clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM(); + } + + @SuppressWarnings("unchecked") + @Override + public int run(String[] args) { + LOG.info("Starting ZKRMStateStorePerf ver." + version); + + int numApp = ZK_PERF_NUM_APP_DEFAULT; + int numAppAttemptPerApp = ZK_PERF_NUM_APPATTEMPT_PER_APP; + String hostPort = null; + boolean launchLocalZK= true; + + if (args.length == 0) { + System.err.println("Missing arguments."); + return -1; + } + + for (int i = 0; i < args.length; i++) { // parse command line + if (args[i].equalsIgnoreCase("-appsize")) { + numApp = Integer.parseInt(args[++i]); + } else if (args[i].equalsIgnoreCase("-appattemptsize")) { + numAppAttemptPerApp = Integer.parseInt(args[++i]); + } else if (args[i].equalsIgnoreCase("-hostPort")) { + hostPort = args[++i]; + launchLocalZK = false; + } else if (args[i].equalsIgnoreCase("-workingZnode")) { + workingZnode = args[++i]; + } else { + System.err.println("Illegal argument: " + args[i]); + return -1; + } + } + + if (launchLocalZK) { + try { + setUp(); + } catch (Exception e) { + System.err.println("failed to setup. : " + e.getMessage()); + return -1; + } + } + + initStore(hostPort); + + long submitTime = System.currentTimeMillis(); + long startTime = System.currentTimeMillis() + 1234; + + ArrayList applicationIds = new ArrayList<>(); + ArrayList rmApps = new ArrayList<>(); + ArrayList attemptIds = new ArrayList<>(); + HashMap> appIdsToAttemptId = + new HashMap<>(); + TestDispatcher dispatcher = new TestDispatcher(); + store.setRMDispatcher(dispatcher); + + for (int i = 0; i < numApp; i++) { + ApplicationId appId = ApplicationId.newInstance(clusterTimeStamp, i); + applicationIds.add(appId); + ArrayList attemptIdsForThisApp = + new ArrayList<>(); + for (int j = 0; j < numAppAttemptPerApp; j++) { + ApplicationAttemptId attemptId = + ApplicationAttemptId.newInstance(appId, j); + attemptIdsForThisApp.add(attemptId); + } + appIdsToAttemptId.put(appId, new LinkedHashSet(attemptIdsForThisApp)); + attemptIds.addAll(attemptIdsForThisApp); + } + + for (ApplicationId appId : applicationIds) { + RMApp app = null; + try { + app = storeApp(store, appId, submitTime, startTime); + } catch (Exception e) { + System.err.println("failed to create Application Znode. : " + + e.getMessage()); + return -1; + } + waitNotify(dispatcher); + rmApps.add(app); + } + + for (ApplicationAttemptId attemptId : attemptIds) { + Token tokenId = + generateAMRMToken(attemptId, appTokenMgr); + SecretKey clientTokenKey = + clientToAMTokenMgr.createMasterKey(attemptId); + try { + storeAttempt(store, attemptId, + ContainerId.newContainerId(attemptId, 0L).toString(), + tokenId, clientTokenKey, dispatcher); + } catch (Exception e) { + System.err.println("failed to create AppAttempt Znode. : " + + e.getMessage()); + return -1; + } + } + + long storeStart = System.currentTimeMillis(); + try { + store.loadState(); + } catch (Exception e) { + System.err.println("failed to locaState from ZKRMStateStore. : " + + e.getMessage()); + return -1; + } + long storeEnd = System.currentTimeMillis(); + + long loadTime = storeEnd - storeStart; + + String resultMsg = "ZKRMStateStore takes " + loadTime + " msec to loadState."; + LOG.info(resultMsg); + System.out.println(resultMsg); + + // cleanup + try { + for (RMApp app : rmApps) { + ApplicationStateData appState = + ApplicationStateData.newInstance(app.getSubmitTime(), + app.getStartTime(), app.getApplicationSubmissionContext(), + app.getUser()); + ApplicationId appId = app.getApplicationId(); + Map m = mock(Map.class); + when(m.keySet()).thenReturn(appIdsToAttemptId.get(appId)); + appState.attempts = m; + store.removeApplicationStateInternal(appState); + } + } catch (Exception e) { + System.err.println("failed to cleanup. : " + e.getMessage()); + return -1; + } + + return 0; + } + + @Override + public void setConf(Configuration conf) { + // currently this function is just ignored + } + + @Override + public Configuration getConf() { + return conf; + } + + @Test + public void perfZKRMStateStore() throws Exception { + String[] args = { + "-appSize", String.valueOf(ZK_PERF_NUM_APP_DEFAULT), + "-appAttemptSize", String.valueOf(ZK_PERF_NUM_APPATTEMPT_PER_APP) + }; + run(args); + } + + static public void main(String[] args) throws Exception { + TestZKRMStateStorePerf perf = new TestZKRMStateStorePerf(); + + int res = -1; + try { + res = ToolRunner.run(perf, args); + } catch(Exception e) { + System.err.print(StringUtils.stringifyException(e)); + res = -2; + } + if(res == -1) { + System.err.print(USAGE); + } + System.exit(res); + } +}