YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA. Contributed by Tsuyoshi OZAWA

(cherry picked from commit 1c03376300)
This commit is contained in:
Jian He 2015-02-18 16:06:55 -08:00
parent 3605928bd0
commit b8fe1a747e
6 changed files with 359 additions and 9 deletions

View File

@ -827,7 +827,6 @@
<artifactId>zookeeper</artifactId>
<version>${zookeeper.version}</version>
<type>test-jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.jboss.netty</groupId>

View File

@ -265,6 +265,9 @@ Release 2.7.0 - UNRELEASED
YARN-1299. Improve a log message in AppSchedulingInfo by adding application
id. (Ashutosh Jindal and Devaraj K via ozawa)
YARN-1514. Utility to benchmark ZKRMStateStore#loadState for RM HA.
(Tsuyoshi OZAWA via jianhe)
OPTIMIZATIONS
YARN-2990. FairScheduler's delay-scheduling always waits for node-local and

View File

@ -186,7 +186,6 @@
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<!-- 'mvn dependency:analyze' fails to detect use of this dependency -->
<dependency>
@ -245,6 +244,13 @@
<goal>test-jar</goal>
</goals>
<phase>test-compile</phase>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.test.YarnTestDriver</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
</plugin>

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.test;
import org.apache.hadoop.util.ProgramDriver;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestZKRMStateStorePerf;
/**
* Driver for Yarn tests.
*
*/
public class YarnTestDriver {
private ProgramDriver pgd;
public YarnTestDriver() {
this(new ProgramDriver());
}
public YarnTestDriver(ProgramDriver pgd) {
this.pgd = pgd;
try {
pgd.addClass(TestZKRMStateStorePerf.class.getSimpleName(),
TestZKRMStateStorePerf.class,
"ZKRMStateStore i/o benchmark.");
} catch(Throwable e) {
e.printStackTrace();
}
}
public void run(String argv[]) {
int exitCode = -1;
try {
exitCode = pgd.run(argv);
} catch(Throwable e) {
e.printStackTrace();
}
System.exit(exitCode);
}
public static void main(String argv[]){
new YarnTestDriver().run(argv);
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.event.Event;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@ -79,8 +80,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
static class TestDispatcher implements
Dispatcher, EventHandler<RMAppAttemptEvent> {
static class TestDispatcher implements Dispatcher, EventHandler<Event> {
ApplicationAttemptId attemptId;
@ -93,8 +93,11 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
}
@Override
public void handle(RMAppAttemptEvent event) {
assertEquals(attemptId, event.getApplicationAttemptId());
public void handle(Event event) {
if (event instanceof RMAppAttemptEvent) {
RMAppAttemptEvent rmAppAttemptEvent = (RMAppAttemptEvent) event;
assertEquals(attemptId, rmAppAttemptEvent.getApplicationAttemptId());
}
notified = true;
synchronized (this) {
notifyAll();
@ -134,7 +137,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
dispatcher.notified = false;
}
RMApp storeApp(RMStateStore store, ApplicationId appId, long submitTime,
protected RMApp storeApp(RMStateStore store, ApplicationId appId,
long submitTime,
long startTime) throws Exception {
ApplicationSubmissionContext context =
new ApplicationSubmissionContextPBImpl();
@ -150,7 +154,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
return mockApp;
}
ContainerId storeAttempt(RMStateStore store, ApplicationAttemptId attemptId,
protected ContainerId storeAttempt(RMStateStore store,
ApplicationAttemptId attemptId,
String containerIdStr, Token<AMRMTokenIdentifier> appToken,
SecretKey clientTokenMasterKey, TestDispatcher dispatcher)
throws Exception {
@ -474,7 +479,7 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
}
private Token<AMRMTokenIdentifier> generateAMRMToken(
protected Token<AMRMTokenIdentifier> generateAMRMToken(
ApplicationAttemptId attemptId,
AMRMTokenSecretManager appTokenMgr) {
Token<AMRMTokenIdentifier> appToken =

View File

@ -0,0 +1,277 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import com.google.common.base.Optional;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.security.AMRMTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Before;
import org.junit.After;
import org.junit.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestZKRMStateStorePerf extends RMStateStoreTestBase
implements Tool {
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
final String version = "0.1";
// Configurable variables for performance test
private int ZK_PERF_NUM_APP_DEFAULT = 1000;
private int ZK_PERF_NUM_APPATTEMPT_PER_APP = 10;
private final long clusterTimeStamp = 1352994193343L;
private static final String USAGE =
"Usage: " + TestZKRMStateStorePerf.class.getSimpleName() +
" -appSize numberOfApplications" +
" -appAttemptSize numberOfApplicationAttempts" +
" [-hostPort Host:Port]" +
" [-workingZnode rootZnodeForTesting]\n";
private YarnConfiguration conf = null;
private String workingZnode = "/Test";
private ZKRMStateStore store;
private AMRMTokenSecretManager appTokenMgr;
private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
@Before
public void setUpZKServer() throws Exception {
super.setUp();
}
@After
public void tearDown() throws Exception {
if (store != null) {
store.stop();
}
if (appTokenMgr != null) {
appTokenMgr.stop();
}
super.tearDown();
}
private void initStore(String hostPort) {
Optional<String> optHostPort = Optional.fromNullable(hostPort);
RMContext rmContext = mock(RMContext.class);
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
store = new ZKRMStateStore();
store.init(conf);
store.start();
when(rmContext.getStateStore()).thenReturn(store);
appTokenMgr = new AMRMTokenSecretManager(conf, rmContext);
appTokenMgr.start();
clientToAMTokenMgr = new ClientToAMTokenSecretManagerInRM();
}
@SuppressWarnings("unchecked")
@Override
public int run(String[] args) {
LOG.info("Starting ZKRMStateStorePerf ver." + version);
int numApp = ZK_PERF_NUM_APP_DEFAULT;
int numAppAttemptPerApp = ZK_PERF_NUM_APPATTEMPT_PER_APP;
String hostPort = null;
boolean launchLocalZK= true;
if (args.length == 0) {
System.err.println("Missing arguments.");
return -1;
}
for (int i = 0; i < args.length; i++) { // parse command line
if (args[i].equalsIgnoreCase("-appsize")) {
numApp = Integer.parseInt(args[++i]);
} else if (args[i].equalsIgnoreCase("-appattemptsize")) {
numAppAttemptPerApp = Integer.parseInt(args[++i]);
} else if (args[i].equalsIgnoreCase("-hostPort")) {
hostPort = args[++i];
launchLocalZK = false;
} else if (args[i].equalsIgnoreCase("-workingZnode")) {
workingZnode = args[++i];
} else {
System.err.println("Illegal argument: " + args[i]);
return -1;
}
}
if (launchLocalZK) {
try {
setUp();
} catch (Exception e) {
System.err.println("failed to setup. : " + e.getMessage());
return -1;
}
}
initStore(hostPort);
long submitTime = System.currentTimeMillis();
long startTime = System.currentTimeMillis() + 1234;
ArrayList<ApplicationId> applicationIds = new ArrayList<>();
ArrayList<RMApp> rmApps = new ArrayList<>();
ArrayList<ApplicationAttemptId> attemptIds = new ArrayList<>();
HashMap<ApplicationId, Set<ApplicationAttemptId>> appIdsToAttemptId =
new HashMap<>();
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
for (int i = 0; i < numApp; i++) {
ApplicationId appId = ApplicationId.newInstance(clusterTimeStamp, i);
applicationIds.add(appId);
ArrayList<ApplicationAttemptId> attemptIdsForThisApp =
new ArrayList<>();
for (int j = 0; j < numAppAttemptPerApp; j++) {
ApplicationAttemptId attemptId =
ApplicationAttemptId.newInstance(appId, j);
attemptIdsForThisApp.add(attemptId);
}
appIdsToAttemptId.put(appId, new LinkedHashSet(attemptIdsForThisApp));
attemptIds.addAll(attemptIdsForThisApp);
}
for (ApplicationId appId : applicationIds) {
RMApp app = null;
try {
app = storeApp(store, appId, submitTime, startTime);
} catch (Exception e) {
System.err.println("failed to create Application Znode. : "
+ e.getMessage());
return -1;
}
waitNotify(dispatcher);
rmApps.add(app);
}
for (ApplicationAttemptId attemptId : attemptIds) {
Token<AMRMTokenIdentifier> tokenId =
generateAMRMToken(attemptId, appTokenMgr);
SecretKey clientTokenKey =
clientToAMTokenMgr.createMasterKey(attemptId);
try {
storeAttempt(store, attemptId,
ContainerId.newContainerId(attemptId, 0L).toString(),
tokenId, clientTokenKey, dispatcher);
} catch (Exception e) {
System.err.println("failed to create AppAttempt Znode. : "
+ e.getMessage());
return -1;
}
}
long storeStart = System.currentTimeMillis();
try {
store.loadState();
} catch (Exception e) {
System.err.println("failed to locaState from ZKRMStateStore. : "
+ e.getMessage());
return -1;
}
long storeEnd = System.currentTimeMillis();
long loadTime = storeEnd - storeStart;
String resultMsg = "ZKRMStateStore takes " + loadTime + " msec to loadState.";
LOG.info(resultMsg);
System.out.println(resultMsg);
// cleanup
try {
for (RMApp app : rmApps) {
ApplicationStateData appState =
ApplicationStateData.newInstance(app.getSubmitTime(),
app.getStartTime(), app.getApplicationSubmissionContext(),
app.getUser());
ApplicationId appId = app.getApplicationId();
Map m = mock(Map.class);
when(m.keySet()).thenReturn(appIdsToAttemptId.get(appId));
appState.attempts = m;
store.removeApplicationStateInternal(appState);
}
} catch (Exception e) {
System.err.println("failed to cleanup. : " + e.getMessage());
return -1;
}
return 0;
}
@Override
public void setConf(Configuration conf) {
// currently this function is just ignored
}
@Override
public Configuration getConf() {
return conf;
}
@Test
public void perfZKRMStateStore() throws Exception {
String[] args = {
"-appSize", String.valueOf(ZK_PERF_NUM_APP_DEFAULT),
"-appAttemptSize", String.valueOf(ZK_PERF_NUM_APPATTEMPT_PER_APP)
};
run(args);
}
static public void main(String[] args) throws Exception {
TestZKRMStateStorePerf perf = new TestZKRMStateStorePerf();
int res = -1;
try {
res = ToolRunner.run(perf, args);
} catch(Exception e) {
System.err.print(StringUtils.stringifyException(e));
res = -2;
}
if(res == -1) {
System.err.print(USAGE);
}
System.exit(res);
}
}