YARN-10003. YarnConfigurationStore#checkVersion throws exception that belongs to RMStateStore. Contributed by Benjamin Teke
This commit is contained in:
parent
4c63a81b00
commit
d2853d1bb0
|
@ -339,11 +339,15 @@ public class LeveldbConfigurationStore extends YarnConfigurationStore {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void storeVersion() throws Exception {
|
public void storeVersion() throws Exception {
|
||||||
String key = VERSION_KEY;
|
storeVersion(CURRENT_VERSION_INFO);
|
||||||
byte[] data = ((VersionPBImpl) CURRENT_VERSION_INFO).getProto()
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected void storeVersion(Version version) throws Exception {
|
||||||
|
byte[] data = ((VersionPBImpl) version).getProto()
|
||||||
.toByteArray();
|
.toByteArray();
|
||||||
try {
|
try {
|
||||||
db.put(bytes(key), data);
|
db.put(bytes(VERSION_KEY), data);
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,43 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This exception is thrown by {@link YarnConfigurationStore} if it's loading
|
||||||
|
* an incompatible persisted schema version.
|
||||||
|
*/
|
||||||
|
public class YarnConfStoreVersionIncompatibleException extends
|
||||||
|
YarnException {
|
||||||
|
private static final long serialVersionUID = -2829858253579013629L;
|
||||||
|
|
||||||
|
public YarnConfStoreVersionIncompatibleException(Throwable cause) {
|
||||||
|
super(cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
public YarnConfStoreVersionIncompatibleException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public YarnConfStoreVersionIncompatibleException(
|
||||||
|
String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,7 +23,6 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateVersionIncompatibleException;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -103,6 +102,7 @@ public abstract class YarnConfigurationStore {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Logs the configuration change to backing store.
|
* Logs the configuration change to backing store.
|
||||||
|
*
|
||||||
* @param logMutation configuration change to be persisted in write ahead log
|
* @param logMutation configuration change to be persisted in write ahead log
|
||||||
* @throws IOException if logging fails
|
* @throws IOException if logging fails
|
||||||
*/
|
*/
|
||||||
|
@ -167,23 +167,22 @@ public abstract class YarnConfigurationStore {
|
||||||
protected abstract Version getCurrentVersion();
|
protected abstract Version getCurrentVersion();
|
||||||
|
|
||||||
public void checkVersion() throws Exception {
|
public void checkVersion() throws Exception {
|
||||||
// TODO this was taken from RMStateStore. Should probably refactor
|
|
||||||
Version loadedVersion = getConfStoreVersion();
|
Version loadedVersion = getConfStoreVersion();
|
||||||
LOG.info("Loaded configuration store version info " + loadedVersion);
|
Version currentVersion = getCurrentVersion();
|
||||||
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
|
LOG.info("Loaded configuration store version info {}", loadedVersion);
|
||||||
|
|
||||||
|
// when hard-coded schema version (currentVersion) is null the version check
|
||||||
|
// is unnecessary
|
||||||
|
if (currentVersion == null || currentVersion.equals(loadedVersion)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// if there is no version info, treat it as CURRENT_VERSION_INFO;
|
// if there is no version info, treat it as CURRENT_VERSION_INFO;
|
||||||
if (loadedVersion == null) {
|
if (loadedVersion == null || loadedVersion.isCompatibleTo(currentVersion)) {
|
||||||
loadedVersion = getCurrentVersion();
|
LOG.info("Storing configuration store version info {}", currentVersion);
|
||||||
}
|
|
||||||
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
|
|
||||||
LOG.info("Storing configuration store version info "
|
|
||||||
+ getCurrentVersion());
|
|
||||||
storeVersion();
|
storeVersion();
|
||||||
} else {
|
} else {
|
||||||
throw new RMStateVersionIncompatibleException(
|
throw new YarnConfStoreVersionIncompatibleException(
|
||||||
"Expecting configuration store version " + getCurrentVersion()
|
"Expecting configuration store version " + currentVersion
|
||||||
+ ", but loading version " + loadedVersion);
|
+ ", but loading version " + loadedVersion);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests {@link InMemoryConfigurationStore}.
|
* Tests {@link InMemoryConfigurationStore}.
|
||||||
*/
|
*/
|
||||||
|
@ -27,4 +31,13 @@ public class TestInMemoryConfigurationStore extends ConfigurationStoreBaseTest {
|
||||||
protected YarnConfigurationStore createConfStore() {
|
protected YarnConfigurationStore createConfStore() {
|
||||||
return new InMemoryConfigurationStore();
|
return new InMemoryConfigurationStore();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void checkVersion() {
|
||||||
|
try {
|
||||||
|
confStore.checkVersion();
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("checkVersion threw exception");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
@ -73,6 +74,23 @@ public class TestLeveldbConfigurationStore extends ConfigurationStoreBaseTest {
|
||||||
confStore.close();
|
confStore.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = YarnConfStoreVersionIncompatibleException.class)
|
||||||
|
public void testIncompatibleVersion() throws Exception {
|
||||||
|
try {
|
||||||
|
confStore.initialize(conf, schedConf, rmContext);
|
||||||
|
|
||||||
|
Version otherVersion = Version.newInstance(1, 1);
|
||||||
|
((LeveldbConfigurationStore) confStore).storeVersion(otherVersion);
|
||||||
|
|
||||||
|
assertEquals("The configuration store should have stored the new" +
|
||||||
|
"version.", otherVersion,
|
||||||
|
confStore.getConfStoreVersion());
|
||||||
|
confStore.checkVersion();
|
||||||
|
} finally {
|
||||||
|
confStore.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistConfiguration() throws Exception {
|
public void testPersistConfiguration() throws Exception {
|
||||||
schedConf.set("key", "val");
|
schedConf.set("key", "val");
|
||||||
|
|
|
@ -18,6 +18,11 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.util.curator.ZKCuratorManager;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
|
import org.apache.zookeeper.CreateMode;
|
||||||
|
import org.apache.zookeeper.data.ACL;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.curator.framework.CuratorFramework;
|
import org.apache.curator.framework.CuratorFramework;
|
||||||
|
@ -47,6 +52,7 @@ import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
@ -111,6 +117,29 @@ public class TestZKConfigurationStore extends ConfigurationStoreBaseTest {
|
||||||
confStore.getConfStoreVersion());
|
confStore.getConfStoreVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(expected = YarnConfStoreVersionIncompatibleException.class)
|
||||||
|
public void testIncompatibleVersion() throws Exception {
|
||||||
|
confStore.initialize(conf, schedConf, rmContext);
|
||||||
|
|
||||||
|
Version otherVersion = Version.newInstance(1, 1);
|
||||||
|
String znodeParentPath = conf.get(YarnConfiguration.
|
||||||
|
RM_SCHEDCONF_STORE_ZK_PARENT_PATH,
|
||||||
|
YarnConfiguration.DEFAULT_RM_SCHEDCONF_STORE_ZK_PARENT_PATH);
|
||||||
|
String zkVersionPath = ZKCuratorManager.getNodePath(znodeParentPath,
|
||||||
|
"VERSION");
|
||||||
|
String fencingNodePath = ZKCuratorManager.getNodePath(znodeParentPath,
|
||||||
|
"FENCING");
|
||||||
|
byte[] versionData =
|
||||||
|
((VersionPBImpl) otherVersion).getProto().toByteArray();
|
||||||
|
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
|
||||||
|
((ZKConfigurationStore) confStore).zkManager.safeCreate(zkVersionPath,
|
||||||
|
versionData, zkAcl, CreateMode.PERSISTENT, zkAcl, fencingNodePath);
|
||||||
|
|
||||||
|
assertEquals("The configuration store should have stored the new" +
|
||||||
|
"version.", otherVersion, confStore.getConfStoreVersion());
|
||||||
|
confStore.checkVersion();
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPersistConfiguration() throws Exception {
|
public void testPersistConfiguration() throws Exception {
|
||||||
schedConf.set("key", "val");
|
schedConf.set("key", "val");
|
||||||
|
|
Loading…
Reference in New Issue