YARN-2716. Refactor ZKRMStateStore retry code with Apache Curator. Contributed by Karthik Kambatla

(cherry picked from commit 960b8f19ca)
This commit is contained in:
Jian He 2015-06-08 14:50:58 -07:00
parent 8ee50d8ca7
commit a24ead8c6d
8 changed files with 372 additions and 762 deletions

View File

@ -246,6 +246,9 @@ Release 2.8.0 - UNRELEASED
YARN-1462. AHS API and other AHS changes to handle tags for completed MR jobs. (xgong)
YARN-2716. Refactor ZKRMStateStore retry code with Apache Curator.
(Karthik Kambatla via jianhe)
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not

View File

@ -413,7 +413,7 @@ public class YarnConfiguration extends Configuration {
public static final String RM_ZK_RETRY_INTERVAL_MS =
RM_ZK_PREFIX + "retry-interval-ms";
public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
public static final int DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;

View File

@ -174,6 +174,14 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>

View File

@ -40,7 +40,6 @@ import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.DelegationKey;
@ -76,7 +75,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSec
import org.apache.hadoop.yarn.server.security.MasterKeyData;
import org.apache.hadoop.yarn.util.ConverterUtils;
public class RMStateStoreTestBase extends ClientBaseWithFixes{
public class RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);

View File

@ -18,18 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import javax.crypto.SecretKey;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.test.TestingServer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -50,6 +42,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
@ -61,22 +54,49 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptS
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import javax.crypto.SecretKey;
public class TestZKRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
private static final int ZK_TIMEOUT_MS = 1000;
private TestingServer curatorTestingServer;
private CuratorFramework curatorFramework;
@Before
public void setupCuratorServer() throws Exception {
curatorTestingServer = new TestingServer();
curatorTestingServer.start();
curatorFramework = CuratorFrameworkFactory.builder()
.connectString(curatorTestingServer.getConnectString())
.retryPolicy(new RetryNTimes(100, 100))
.build();
curatorFramework.start();
}
@After
public void cleanupCuratorServer() throws IOException {
curatorFramework.close();
curatorTestingServer.stop();
}
class TestZKRMStateStoreTester implements RMStateStoreHelper {
ZooKeeper client;
TestZKRMStateStoreInternal store;
String workingZnode;
class TestZKRMStateStoreInternal extends ZKRMStateStore {
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
@ -86,11 +106,6 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
assertTrue(znodeWorkingPath.equals(workingZnode));
}
@Override
public ZooKeeper getNewZooKeeper() throws IOException {
return client;
}
public String getVersionNode() {
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
}
@ -109,7 +124,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
* @throws Exception
*/
public void testRetryingCreateRootDir() throws Exception {
createRootDir(znodeWorkingPath);
create(znodeWorkingPath);
}
}
@ -117,23 +132,24 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store;
}
@Override
public boolean isFinalStateValid() throws Exception {
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
return nodes.size() == 1;
return 1 ==
curatorFramework.getChildren().forPath(store.znodeWorkingPath).size();
}
@Override
public void writeVersion(Version version) throws Exception {
client.setData(store.getVersionNode(), ((VersionPBImpl) version)
.getProto().toByteArray(), -1);
curatorFramework.setData().withVersion(-1)
.forPath(store.getVersionNode(),
((VersionPBImpl) version).getProto().toByteArray());
}
@Override
@ -142,10 +158,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
}
public boolean appExists(RMApp app) throws Exception {
Stat node =
client.exists(store.getAppNode(app.getApplicationId().toString()),
false);
return node !=null;
return null != curatorFramework.checkExists()
.forPath(store.getAppNode(app.getApplicationId().toString()));
}
}
@ -178,9 +192,9 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
workingZnode = "/jira/issue/3077/rmstore";
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
Version storedVersion = null;
@ -217,7 +231,8 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
curatorTestingServer.getConnectString());
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
conf.set(YarnConfiguration.RM_HA_ID, rmId);
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");

View File

@ -25,6 +25,8 @@ import java.util.LinkedHashSet;
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@ -73,10 +75,11 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
private ZKRMStateStore store;
private AMRMTokenSecretManager appTokenMgr;
private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
private TestingServer curatorTestingServer;
@Before
public void setUpZKServer() throws Exception {
super.setUp();
curatorTestingServer = new TestingServer();
}
@After
@ -87,7 +90,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
if (appTokenMgr != null) {
appTokenMgr.stop();
}
super.tearDown();
curatorTestingServer.stop();
}
private void initStore(String hostPort) {
@ -95,7 +98,8 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
RMContext rmContext = mock(RMContext.class);
conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
optHostPort.or(curatorTestingServer.getConnectString()));
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
store = new ZKRMStateStore();
@ -140,7 +144,7 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
if (launchLocalZK) {
try {
setUp();
setUpZKServer();
} catch (Exception e) {
System.err.println("failed to setup. : " + e.getMessage());
return -1;

View File

@ -20,39 +20,32 @@ package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.test.TestingServer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.security.NoSuchAlgorithmException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public class TestZKRMStateStoreZKClientConnections extends
ClientBaseWithFixes {
private static final int ZK_OP_WAIT_TIME = 3000;
private static final int ZK_TIMEOUT_MS = 1000;
public class TestZKRMStateStoreZKClientConnections {
private Log LOG =
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
private static final int ZK_TIMEOUT_MS = 1000;
private static final String DIGEST_USER_PASS="test-user:test-password";
private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS;
private static final String DIGEST_USER_HASH;
@ -66,14 +59,22 @@ public class TestZKRMStateStoreZKClientConnections extends
}
private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda";
private TestingServer testingServer;
@Before
public void setupZKServer() throws Exception {
testingServer = new TestingServer();
testingServer.start();
}
@After
public void cleanupZKServer() throws Exception {
testingServer.stop();
}
class TestZKClient {
ZKRMStateStore store;
boolean forExpire = false;
TestForwardingWatcher oldWatcher;
TestForwardingWatcher watcher;
CyclicBarrier syncBarrier = new CyclicBarrier(2);
protected class TestZKRMStateStore extends ZKRMStateStore {
@ -83,51 +84,12 @@ public class TestZKRMStateStoreZKClientConnections extends
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
@Override
public ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException {
oldWatcher = watcher;
watcher = new TestForwardingWatcher();
return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
}
@Override
public synchronized void processWatchEvent(ZooKeeper zk,
WatchedEvent event) throws Exception {
if (forExpire) {
// a hack... couldn't find a way to trigger expired event.
WatchedEvent expriredEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null);
super.processWatchEvent(zk, expriredEvent);
forExpire = false;
syncBarrier.await();
} else {
super.processWatchEvent(zk, event);
}
}
}
private class TestForwardingWatcher extends
ClientBaseWithFixes.CountdownWatcher {
public void process(WatchedEvent event) {
super.process(event);
try {
if (store != null) {
store.processWatchEvent(client, event);
}
} catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": "
+ StringUtils.stringifyException(t));
}
}
}
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
String workingZnode = "/Test";
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
testingServer.getConnectString());
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
@ -147,12 +109,12 @@ public class TestZKRMStateStoreZKClientConnections extends
store.setRMDispatcher(dispatcher);
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
stopServer();
testingServer.stop();
Thread clientThread = new Thread() {
@Override
public void run() {
try {
store.getDataWithRetries(path, true);
store.getData(path);
} catch (Exception e) {
e.printStackTrace();
assertionFailedInThread.set(true);
@ -160,114 +122,19 @@ public class TestZKRMStateStoreZKClientConnections extends
}
};
Thread.sleep(2000);
startServer();
testingServer.start();
clientThread.join();
Assert.assertFalse(assertionFailedInThread.get());
}
@Test(timeout = 20000)
public void testZKClientDisconnectAndReconnect()
throws Exception {
TestZKClient zkClientTester = new TestZKClient();
String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
// trigger watch
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
store.getDataWithRetries(path, true);
store.setDataWithRetries(path, "newBytes".getBytes(), 0);
stopServer();
zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
try {
store.getDataWithRetries(path, true);
fail("Expected ZKClient time out exception");
} catch (Exception e) {
assertTrue(e.getMessage().contains(
"Wait for ZKClient creation timed out"));
}
// ZKRMStateStore Session restored
startServer();
zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
byte[] ret = null;
try {
ret = store.getDataWithRetries(path, true);
} catch (Exception e) {
String error = "ZKRMStateStore Session restore failed";
LOG.error(error, e);
fail(error);
}
assertEquals("newBytes", new String(ret));
}
@Test(timeout = 20000)
public void testZKSessionTimeout() throws Exception {
TestZKClient zkClientTester = new TestZKClient();
String path = "/test";
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
ZKRMStateStore store =
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
TestDispatcher dispatcher = new TestDispatcher();
store.setRMDispatcher(dispatcher);
// a hack to trigger expired event
zkClientTester.forExpire = true;
// trigger watch
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
store.getDataWithRetries(path, true);
store.setDataWithRetries(path, "bytes".getBytes(), 0);
zkClientTester.syncBarrier.await();
// after this point, expired event has already been processed.
try {
byte[] ret = store.getDataWithRetries(path, false);
assertEquals("bytes", new String(ret));
} catch (Exception e) {
String error = "New session creation failed";
LOG.error(error, e);
fail(error);
}
// send Disconnected event from old client session to ZKRMStateStore
// check the current client session is not affected.
Assert.assertTrue(zkClientTester.oldWatcher != null);
WatchedEvent disconnectedEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Disconnected, null);
zkClientTester.oldWatcher.process(disconnectedEvent);
Assert.assertTrue(store.zkClient != null);
zkClientTester.watcher.process(disconnectedEvent);
Assert.assertTrue(store.zkClient == null);
WatchedEvent connectedEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected, null);
zkClientTester.watcher.process(connectedEvent);
Assert.assertTrue(store.zkClient != null);
Assert.assertTrue(store.zkClient == store.activeZkClient);
}
@Test(timeout = 20000)
public void testSetZKAcl() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca");
try {
zkClientTester.store.zkClient.delete(zkClientTester.store
.znodeWorkingPath, -1);
zkClientTester.store.delete(zkClientTester.store
.znodeWorkingPath);
fail("Shouldn't be able to delete path");
} catch (Exception e) {/* expected behavior */
}