YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that prevent subsequent ResourceManager recovery. Contributed by Omkar Vinit Joshi.

svn merge --ignore-ancestry -c 1533803 ../../trunk/


git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1533805 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-10-19 18:15:18 +00:00
parent 75968551bd
commit 06d24042b6
6 changed files with 245 additions and 105 deletions

View File

@ -105,6 +105,9 @@ Release 2.2.1 - UNRELEASED
YARN-1295. In UnixLocalWrapperScriptBuilder, using bash -c can cause Text
file busy errors (Sandy Ryza)
YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that
prevent subsequent ResourceManager recovery. (Omkar Vinit Joshi via vinodkv)
Release 2.2.0 - 2013-10-13
INCOMPATIBLE CHANGES

View File

@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
@ -118,6 +119,9 @@ public class FileSystemRMStateStore extends RMStateStore {
for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
continue;
}
byte[] childData =
readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
@ -178,12 +182,28 @@ public class FileSystemRMStateStore extends RMStateStore {
}
}
private boolean checkAndRemovePartialRecord(Path record) throws IOException {
// If the file ends with .tmp then it shows that it failed
// during saving state into state store. The file will be deleted as a
// part of this call
if (record.getName().endsWith(".tmp")) {
LOG.error("incomplete rm state store entry found :"
+ record);
fs.delete(record, false);
return true;
}
return false;
}
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
for(FileStatus childNodeStatus : childNodes) {
assert childNodeStatus.isFile();
String childNodeName = childNodeStatus.getPath().getName();
if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
continue;
}
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
rmState.rmSecretManagerState.dtSequenceNumber =
Integer.parseInt(childNodeName.split("_")[1]);
@ -344,10 +364,19 @@ public class FileSystemRMStateStore extends RMStateStore {
return data;
}
/*
* In order to make this write atomic as a part of write we will first write
* data to .tmp file and then rename it. Here we are assuming that rename is
* atomic for underlying file system.
*/
private void writeFile(Path outputPath, byte[] data) throws Exception {
FSDataOutputStream fsOut = fs.create(outputPath, false);
Path tempPath =
new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
FSDataOutputStream fsOut = null;
fsOut = fs.create(tempPath, false);
fsOut.write(data);
fsOut.close();
fs.rename(tempPath, outputPath);
}
private boolean renameFile(Path src, Path dst) throws Exception {

View File

@ -39,6 +39,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -75,9 +76,9 @@ import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
public class TestRMStateStore extends ClientBaseWithFixes{
public class RMStateStoreTestBase extends ClientBaseWithFixes{
public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
static class TestDispatcher implements
Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
@ -116,104 +117,6 @@ public class TestRMStateStore extends ClientBaseWithFixes{
boolean isFinalStateValid() throws Exception;
}
@Test
public void testZKRMStateStoreRealZK() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
}
@Test
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
testRMAppStateStore(fsTester);
testRMDTSecretManagerStateStore(fsTester);
} finally {
cluster.shutdown();
}
}
class TestZKRMStateStoreTester implements RMStateStoreHelper {
ZooKeeper client;
ZKRMStateStore store;
class TestZKRMStateStore extends ZKRMStateStore {
public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception {
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
@Override
public ZooKeeper getNewZooKeeper() throws IOException {
return client;
}
}
public RMStateStore getRMStateStore() throws Exception {
String workingZnode = "/Test";
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store;
}
@Override
public boolean isFinalStateValid() throws Exception {
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
return nodes.size() == 1;
}
}
class TestFSRMStateStoreTester implements RMStateStoreHelper {
Path workingDirPathURI;
FileSystemRMStateStore store;
MiniDFSCluster cluster;
class TestFileSystemRMStore extends FileSystemRMStateStore {
TestFileSystemRMStore(Configuration conf) throws Exception {
init(conf);
Assert.assertNull(fs);
assertTrue(workingDirPathURI.equals(fsWorkingPath));
start();
Assert.assertNotNull(fs);
}
}
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
Path workingDirPath = new Path("/Test");
this.cluster = cluster;
FileSystem fs = cluster.getFileSystem();
fs.mkdirs(workingDirPath);
Path clusterURI = new Path(cluster.getURI());
workingDirPathURI = new Path(clusterURI, workingDirPath);
fs.close();
}
@Override
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
workingDirPathURI.toString());
this.store = new TestFileSystemRMStore(conf);
return store;
}
@Override
public boolean isFinalStateValid() throws Exception {
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(workingDirPathURI);
return files.length == 1;
}
}
void waitNotify(TestDispatcher dispatcher) {
long startTime = System.currentTimeMillis();
while(!dispatcher.notified) {

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertTrue;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.Test;
public class TestFSRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class);
class TestFSRMStateStoreTester implements RMStateStoreHelper {
Path workingDirPathURI;
FileSystemRMStateStore store;
MiniDFSCluster cluster;
class TestFileSystemRMStore extends FileSystemRMStateStore {
TestFileSystemRMStore(Configuration conf) throws Exception {
init(conf);
Assert.assertNull(fs);
assertTrue(workingDirPathURI.equals(fsWorkingPath));
start();
Assert.assertNotNull(fs);
}
}
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
Path workingDirPath = new Path("/Test");
this.cluster = cluster;
FileSystem fs = cluster.getFileSystem();
fs.mkdirs(workingDirPath);
Path clusterURI = new Path(cluster.getURI());
workingDirPathURI = new Path(clusterURI, workingDirPath);
fs.close();
}
@Override
public RMStateStore getRMStateStore() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
workingDirPathURI.toString());
this.store = new TestFileSystemRMStore(conf);
return store;
}
@Override
public boolean isFinalStateValid() throws Exception {
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(workingDirPathURI);
return files.length == 1;
}
}
@Test
public void testFSRMStateStore() throws Exception {
HdfsConfiguration conf = new HdfsConfiguration();
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
try {
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
// If the state store is FileSystemRMStateStore then add corrupted entry.
// It should discard the entry and remove it from file system.
FSDataOutputStream fsOut = null;
FileSystemRMStateStore fileSystemRMStateStore =
(FileSystemRMStateStore) fsTester.getRMStateStore();
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
ApplicationAttemptId attemptId3 =
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
Path rootDir =
new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
Path appRootDir = new Path(rootDir, "RMAppRoot");
Path appDir =
new Path(appRootDir, attemptId3.getApplicationId().toString());
Path tempAppAttemptFile =
new Path(appDir, attemptId3.toString() + ".tmp");
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
fsOut.write("Some random data ".getBytes());
fsOut.close();
testRMAppStateStore(fsTester);
Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
.getFileSystem(conf).exists(tempAppAttemptFile));
testRMDTSecretManagerStateStore(fsTester);
} finally {
cluster.shutdown();
}
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.zookeeper.ZooKeeper;
import org.junit.Test;
public class TestZKRMStateStore extends RMStateStoreTestBase {
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
class TestZKRMStateStoreTester implements RMStateStoreHelper {
ZooKeeper client;
ZKRMStateStore store;
class TestZKRMStateStoreInternal extends ZKRMStateStore {
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
throws Exception {
init(conf);
start();
assertTrue(znodeWorkingPath.equals(workingZnode));
}
@Override
public ZooKeeper getNewZooKeeper() throws IOException {
return client;
}
}
public RMStateStore getRMStateStore() throws Exception {
String workingZnode = "/Test";
YarnConfiguration conf = new YarnConfiguration();
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
this.client = createClient();
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
return this.store;
}
@Override
public boolean isFinalStateValid() throws Exception {
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
return nodes.size() == 1;
}
}
@Test
public void testZKRMStateStoreRealZK() throws Exception {
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
testRMAppStateStore(zkTester);
testRMDTSecretManagerStateStore(zkTester);
}
}

View File

@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ClientBaseWithFixes;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
import org.apache.hadoop.util.ZKUtil;
import org.apache.zookeeper.CreateMode;
@ -43,17 +43,20 @@ import static org.junit.Assert.fail;
public class TestZKRMStateStoreZKClientConnections extends
ClientBaseWithFixes {
private static final int ZK_OP_WAIT_TIME = 3000;
private Log LOG =
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
class TestZKClient {
ZKRMStateStore store;
boolean forExpire = false;
TestForwardingWatcher watcher;
CyclicBarrier syncBarrier = new CyclicBarrier(2);
protected class TestZKRMStateStore extends ZKRMStateStore {
public TestZKRMStateStore(Configuration conf, String workingZnode)
throws Exception {
init(conf);
@ -87,6 +90,7 @@ public class TestZKRMStateStoreZKClientConnections extends
private class TestForwardingWatcher extends
ClientBaseWithFixes.CountdownWatcher {
public void process(WatchedEvent event) {
super.process(event);
try {
@ -187,7 +191,7 @@ public class TestZKRMStateStoreZKClientConnections extends
}
}
@Test (timeout = 20000)
@Test(timeout = 20000)
public void testSetZKAcl() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();
@ -196,10 +200,11 @@ public class TestZKRMStateStoreZKClientConnections extends
zkClientTester.store.zkClient.delete(zkClientTester.store
.znodeWorkingPath, -1);
fail("Shouldn't be able to delete path");
} catch (Exception e) {/* expected behavior */}
} catch (Exception e) {/* expected behavior */
}
}
@Test (timeout = 20000)
@Test(timeout = 20000)
public void testInvalidZKAclConfiguration() {
TestZKClient zkClientTester = new TestZKClient();
YarnConfiguration conf = new YarnConfiguration();