YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that prevent subsequent ResourceManager recovery. Contributed by Omkar Vinit Joshi.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1533803 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
e86f4a2e25
commit
7f97fd1319
|
@ -120,6 +120,9 @@ Release 2.2.1 - UNRELEASED
|
|||
YARN-1295. In UnixLocalWrapperScriptBuilder, using bash -c can cause Text
|
||||
file busy errors (Sandy Ryza)
|
||||
|
||||
YARN-1185. Fixed FileSystemRMStateStore to not leave partial files that
|
||||
prevent subsequent ResourceManager recovery. (Omkar Vinit Joshi via vinodkv)
|
||||
|
||||
Release 2.2.0 - 2013-10-13
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -22,6 +22,7 @@ import java.io.ByteArrayInputStream;
|
|||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -118,6 +119,9 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
for (FileStatus childNodeStatus : fs.listStatus(appDir.getPath())) {
|
||||
assert childNodeStatus.isFile();
|
||||
String childNodeName = childNodeStatus.getPath().getName();
|
||||
if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
|
||||
continue;
|
||||
}
|
||||
byte[] childData =
|
||||
readFile(childNodeStatus.getPath(), childNodeStatus.getLen());
|
||||
if (childNodeName.startsWith(ApplicationId.appIdStrPrefix)) {
|
||||
|
@ -178,12 +182,28 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
}
|
||||
}
|
||||
|
||||
private boolean checkAndRemovePartialRecord(Path record) throws IOException {
|
||||
// If the file ends with .tmp then it shows that it failed
|
||||
// during saving state into state store. The file will be deleted as a
|
||||
// part of this call
|
||||
if (record.getName().endsWith(".tmp")) {
|
||||
LOG.error("incomplete rm state store entry found :"
|
||||
+ record);
|
||||
fs.delete(record, false);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private void loadRMDTSecretManagerState(RMState rmState) throws Exception {
|
||||
FileStatus[] childNodes = fs.listStatus(rmDTSecretManagerRoot);
|
||||
|
||||
for(FileStatus childNodeStatus : childNodes) {
|
||||
assert childNodeStatus.isFile();
|
||||
String childNodeName = childNodeStatus.getPath().getName();
|
||||
if (checkAndRemovePartialRecord(childNodeStatus.getPath())) {
|
||||
continue;
|
||||
}
|
||||
if(childNodeName.startsWith(DELEGATION_TOKEN_SEQUENCE_NUMBER_PREFIX)) {
|
||||
rmState.rmSecretManagerState.dtSequenceNumber =
|
||||
Integer.parseInt(childNodeName.split("_")[1]);
|
||||
|
@ -344,10 +364,19 @@ public class FileSystemRMStateStore extends RMStateStore {
|
|||
return data;
|
||||
}
|
||||
|
||||
/*
|
||||
* In order to make this write atomic as a part of write we will first write
|
||||
* data to .tmp file and then rename it. Here we are assuming that rename is
|
||||
* atomic for underlying file system.
|
||||
*/
|
||||
private void writeFile(Path outputPath, byte[] data) throws Exception {
|
||||
FSDataOutputStream fsOut = fs.create(outputPath, false);
|
||||
Path tempPath =
|
||||
new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
|
||||
FSDataOutputStream fsOut = null;
|
||||
fsOut = fs.create(tempPath, false);
|
||||
fsOut.write(data);
|
||||
fsOut.close();
|
||||
fs.rename(tempPath, outputPath);
|
||||
}
|
||||
|
||||
private boolean renameFile(Path src, Path dst) throws Exception {
|
||||
|
|
|
@ -39,6 +39,7 @@ import junit.framework.Assert;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -75,9 +76,9 @@ import org.apache.zookeeper.ZooKeeper;
|
|||
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestRMStateStore extends ClientBaseWithFixes{
|
||||
public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestRMStateStore.class);
|
||||
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
|
||||
|
||||
static class TestDispatcher implements
|
||||
Dispatcher, EventHandler<RMAppAttemptStoredEvent> {
|
||||
|
@ -116,104 +117,6 @@ public class TestRMStateStore extends ClientBaseWithFixes{
|
|||
boolean isFinalStateValid() throws Exception;
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZKRMStateStoreRealZK() throws Exception {
|
||||
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||
testRMAppStateStore(zkTester);
|
||||
testRMDTSecretManagerStateStore(zkTester);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFSRMStateStore() throws Exception {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
try {
|
||||
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
|
||||
testRMAppStateStore(fsTester);
|
||||
testRMDTSecretManagerStateStore(fsTester);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
class TestZKRMStateStoreTester implements RMStateStoreHelper {
|
||||
ZooKeeper client;
|
||||
ZKRMStateStore store;
|
||||
|
||||
class TestZKRMStateStore extends ZKRMStateStore {
|
||||
public TestZKRMStateStore(Configuration conf, String workingZnode)
|
||||
throws Exception {
|
||||
init(conf);
|
||||
start();
|
||||
assertTrue(znodeWorkingPath.equals(workingZnode));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZooKeeper getNewZooKeeper() throws IOException {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
public RMStateStore getRMStateStore() throws Exception {
|
||||
String workingZnode = "/Test";
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
this.client = createClient();
|
||||
this.store = new TestZKRMStateStore(conf, workingZnode);
|
||||
return this.store;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFinalStateValid() throws Exception {
|
||||
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
|
||||
return nodes.size() == 1;
|
||||
}
|
||||
}
|
||||
|
||||
class TestFSRMStateStoreTester implements RMStateStoreHelper {
|
||||
Path workingDirPathURI;
|
||||
FileSystemRMStateStore store;
|
||||
MiniDFSCluster cluster;
|
||||
|
||||
class TestFileSystemRMStore extends FileSystemRMStateStore {
|
||||
TestFileSystemRMStore(Configuration conf) throws Exception {
|
||||
init(conf);
|
||||
Assert.assertNull(fs);
|
||||
assertTrue(workingDirPathURI.equals(fsWorkingPath));
|
||||
start();
|
||||
Assert.assertNotNull(fs);
|
||||
}
|
||||
}
|
||||
|
||||
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
|
||||
Path workingDirPath = new Path("/Test");
|
||||
this.cluster = cluster;
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
fs.mkdirs(workingDirPath);
|
||||
Path clusterURI = new Path(cluster.getURI());
|
||||
workingDirPathURI = new Path(clusterURI, workingDirPath);
|
||||
fs.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMStateStore getRMStateStore() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
|
||||
workingDirPathURI.toString());
|
||||
this.store = new TestFileSystemRMStore(conf);
|
||||
return store;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFinalStateValid() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
FileStatus[] files = fs.listStatus(workingDirPathURI);
|
||||
return files.length == 1;
|
||||
}
|
||||
}
|
||||
|
||||
void waitNotify(TestDispatcher dispatcher) {
|
||||
long startTime = System.currentTimeMillis();
|
||||
while(!dispatcher.notified) {
|
|
@ -0,0 +1,120 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestFSRMStateStore.class);
|
||||
|
||||
class TestFSRMStateStoreTester implements RMStateStoreHelper {
|
||||
|
||||
Path workingDirPathURI;
|
||||
FileSystemRMStateStore store;
|
||||
MiniDFSCluster cluster;
|
||||
|
||||
class TestFileSystemRMStore extends FileSystemRMStateStore {
|
||||
|
||||
TestFileSystemRMStore(Configuration conf) throws Exception {
|
||||
init(conf);
|
||||
Assert.assertNull(fs);
|
||||
assertTrue(workingDirPathURI.equals(fsWorkingPath));
|
||||
start();
|
||||
Assert.assertNotNull(fs);
|
||||
}
|
||||
}
|
||||
|
||||
public TestFSRMStateStoreTester(MiniDFSCluster cluster) throws Exception {
|
||||
Path workingDirPath = new Path("/Test");
|
||||
this.cluster = cluster;
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
fs.mkdirs(workingDirPath);
|
||||
Path clusterURI = new Path(cluster.getURI());
|
||||
workingDirPathURI = new Path(clusterURI, workingDirPath);
|
||||
fs.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public RMStateStore getRMStateStore() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.FS_RM_STATE_STORE_URI,
|
||||
workingDirPathURI.toString());
|
||||
this.store = new TestFileSystemRMStore(conf);
|
||||
return store;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFinalStateValid() throws Exception {
|
||||
FileSystem fs = cluster.getFileSystem();
|
||||
FileStatus[] files = fs.listStatus(workingDirPathURI);
|
||||
return files.length == 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testFSRMStateStore() throws Exception {
|
||||
HdfsConfiguration conf = new HdfsConfiguration();
|
||||
MiniDFSCluster cluster =
|
||||
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
|
||||
try {
|
||||
TestFSRMStateStoreTester fsTester = new TestFSRMStateStoreTester(cluster);
|
||||
// If the state store is FileSystemRMStateStore then add corrupted entry.
|
||||
// It should discard the entry and remove it from file system.
|
||||
FSDataOutputStream fsOut = null;
|
||||
FileSystemRMStateStore fileSystemRMStateStore =
|
||||
(FileSystemRMStateStore) fsTester.getRMStateStore();
|
||||
String appAttemptIdStr3 = "appattempt_1352994193343_0001_000003";
|
||||
ApplicationAttemptId attemptId3 =
|
||||
ConverterUtils.toApplicationAttemptId(appAttemptIdStr3);
|
||||
Path rootDir =
|
||||
new Path(fileSystemRMStateStore.fsWorkingPath, "FSRMStateRoot");
|
||||
Path appRootDir = new Path(rootDir, "RMAppRoot");
|
||||
Path appDir =
|
||||
new Path(appRootDir, attemptId3.getApplicationId().toString());
|
||||
Path tempAppAttemptFile =
|
||||
new Path(appDir, attemptId3.toString() + ".tmp");
|
||||
fsOut = fileSystemRMStateStore.fs.create(tempAppAttemptFile, false);
|
||||
fsOut.write("Some random data ".getBytes());
|
||||
fsOut.close();
|
||||
|
||||
testRMAppStateStore(fsTester);
|
||||
Assert.assertFalse(fileSystemRMStateStore.fsWorkingPath
|
||||
.getFileSystem(conf).exists(tempAppAttemptFile));
|
||||
testRMDTSecretManagerStateStore(fsTester);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
|
||||
|
||||
class TestZKRMStateStoreTester implements RMStateStoreHelper {
|
||||
|
||||
ZooKeeper client;
|
||||
ZKRMStateStore store;
|
||||
|
||||
class TestZKRMStateStoreInternal extends ZKRMStateStore {
|
||||
|
||||
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
|
||||
throws Exception {
|
||||
init(conf);
|
||||
start();
|
||||
assertTrue(znodeWorkingPath.equals(workingZnode));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZooKeeper getNewZooKeeper() throws IOException {
|
||||
return client;
|
||||
}
|
||||
}
|
||||
|
||||
public RMStateStore getRMStateStore() throws Exception {
|
||||
String workingZnode = "/Test";
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_ADDRESS, hostPort);
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
this.client = createClient();
|
||||
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
|
||||
return this.store;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFinalStateValid() throws Exception {
|
||||
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
|
||||
return nodes.size() == 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testZKRMStateStoreRealZK() throws Exception {
|
||||
TestZKRMStateStoreTester zkTester = new TestZKRMStateStoreTester();
|
||||
testRMAppStateStore(zkTester);
|
||||
testRMDTSecretManagerStateStore(zkTester);
|
||||
}
|
||||
}
|
|
@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.TestRMStateStore.TestDispatcher;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
|
||||
import org.apache.hadoop.util.ZKUtil;
|
||||
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
|
@ -43,17 +43,20 @@ import static org.junit.Assert.fail;
|
|||
|
||||
public class TestZKRMStateStoreZKClientConnections extends
|
||||
ClientBaseWithFixes {
|
||||
|
||||
private static final int ZK_OP_WAIT_TIME = 3000;
|
||||
private Log LOG =
|
||||
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
|
||||
|
||||
class TestZKClient {
|
||||
|
||||
ZKRMStateStore store;
|
||||
boolean forExpire = false;
|
||||
TestForwardingWatcher watcher;
|
||||
CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
||||
|
||||
protected class TestZKRMStateStore extends ZKRMStateStore {
|
||||
|
||||
public TestZKRMStateStore(Configuration conf, String workingZnode)
|
||||
throws Exception {
|
||||
init(conf);
|
||||
|
@ -87,6 +90,7 @@ public class TestZKRMStateStoreZKClientConnections extends
|
|||
|
||||
private class TestForwardingWatcher extends
|
||||
ClientBaseWithFixes.CountdownWatcher {
|
||||
|
||||
public void process(WatchedEvent event) {
|
||||
super.process(event);
|
||||
try {
|
||||
|
@ -196,7 +200,8 @@ public class TestZKRMStateStoreZKClientConnections extends
|
|||
zkClientTester.store.zkClient.delete(zkClientTester.store
|
||||
.znodeWorkingPath, -1);
|
||||
fail("Shouldn't be able to delete path");
|
||||
} catch (Exception e) {/* expected behavior */}
|
||||
} catch (Exception e) {/* expected behavior */
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
|
|
Loading…
Reference in New Issue