YARN-2045. Data persisted in NM should be versioned. Contributed by Junping Du

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1612285 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2014-07-21 14:43:59 +00:00
parent 7c71a3b876
commit 8a87085820
6 changed files with 278 additions and 20 deletions

View File

@ -54,6 +54,8 @@ Release 2.6.0 - UNRELEASED
YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo YARN-2323. FairShareComparator creates too many Resource objects (Hong Zhiguo
via Sandy Ryza) via Sandy Ryza)
YARN-2045. Data persisted in NM should be versioned (Junping Du via jlowe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -42,8 +42,11 @@ import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
import org.apache.hadoop.yarn.server.api.records.MasterKey; import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
@ -54,14 +57,18 @@ import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options; import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch; import org.iq80.leveldb.WriteBatch;
import com.google.common.annotations.VisibleForTesting;
public class NMLeveldbStateStoreService extends NMStateStoreService { public class NMLeveldbStateStoreService extends NMStateStoreService {
public static final Log LOG = public static final Log LOG =
LogFactory.getLog(NMLeveldbStateStoreService.class); LogFactory.getLog(NMLeveldbStateStoreService.class);
private static final String DB_NAME = "yarn-nm-state"; private static final String DB_NAME = "yarn-nm-state";
private static final String DB_SCHEMA_VERSION_KEY = "schema-version"; private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
private static final String DB_SCHEMA_VERSION = "1.0";
private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion
.newInstance(1, 0);
private static final String DELETION_TASK_KEY_PREFIX = private static final String DELETION_TASK_KEY_PREFIX =
"DeletionService/deltask_"; "DeletionService/deltask_";
@ -475,22 +482,16 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
options.logger(new LeveldbLogger()); options.logger(new LeveldbLogger());
LOG.info("Using state database at " + storeRoot + " for recovery"); LOG.info("Using state database at " + storeRoot + " for recovery");
File dbfile = new File(storeRoot.toString()); File dbfile = new File(storeRoot.toString());
byte[] schemaVersionData = null;
try { try {
db = JniDBFactory.factory.open(dbfile, options); db = JniDBFactory.factory.open(dbfile, options);
try {
schemaVersionData = db.get(bytes(DB_SCHEMA_VERSION_KEY));
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
}
} catch (NativeDB.DBException e) { } catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) { if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
LOG.info("Creating state database at " + dbfile); LOG.info("Creating state database at " + dbfile);
options.createIfMissing(true); options.createIfMissing(true);
try { try {
db = JniDBFactory.factory.open(dbfile, options); db = JniDBFactory.factory.open(dbfile, options);
schemaVersionData = bytes(DB_SCHEMA_VERSION); // store version
db.put(bytes(DB_SCHEMA_VERSION_KEY), schemaVersionData); storeVersion();
} catch (DBException dbErr) { } catch (DBException dbErr) {
throw new IOException(dbErr.getMessage(), dbErr); throw new IOException(dbErr.getMessage(), dbErr);
} }
@ -498,16 +499,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
throw e; throw e;
} }
} }
if (schemaVersionData != null) { checkVersion();
String schemaVersion = asString(schemaVersionData);
// only support exact schema matches for now
if (!DB_SCHEMA_VERSION.equals(schemaVersion)) {
throw new IOException("Incompatible state database schema, found "
+ schemaVersion + " expected " + DB_SCHEMA_VERSION);
}
} else {
throw new IOException("State database schema version not found");
}
} }
private Path createStorageDir(Configuration conf) throws IOException { private Path createStorageDir(Configuration conf) throws IOException {
@ -532,4 +524,68 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
LOG.info(message); LOG.info(message);
} }
} }
NMDBSchemaVersion loadVersion() throws IOException {
byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
// if version is not stored previously, treat it as 1.0.
if (data == null || data.length == 0) {
return NMDBSchemaVersion.newInstance(1, 0);
}
NMDBSchemaVersion version =
new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
return version;
}
private void storeVersion() throws IOException {
dbStoreVersion(CURRENT_VERSION_INFO);
}
// Only used for test
@VisibleForTesting
void storeVersion(NMDBSchemaVersion state) throws IOException {
dbStoreVersion(state);
}
private void dbStoreVersion(NMDBSchemaVersion state) throws IOException {
String key = DB_SCHEMA_VERSION_KEY;
byte[] data =
((NMDBSchemaVersionPBImpl) state).getProto().toByteArray();
try {
db.put(bytes(key), data);
} catch (DBException e) {
throw new IOException(e.getMessage(), e);
}
}
NMDBSchemaVersion getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
/**
* 1) Versioning scheme: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of state-store is a major upgrade, and any
* compatible change of state-store is a minor upgrade.
* 3) Within a minor upgrade, say 1.1 to 1.2:
* overwrite the version info and proceed as normal.
* 4) Within a major upgrade, say 1.2 to 2.0:
* throw exception and indicate user to use a separate upgrade tool to
* upgrade NM state or remove incompatible old state.
*/
private void checkVersion() throws IOException {
NMDBSchemaVersion loadedVersion = loadVersion();
LOG.info("Loaded NM state version info " + loadedVersion);
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
return;
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing NM state version info " + getCurrentVersion());
storeVersion();
} else {
throw new IOException(
"Incompatible version for NM state: expecting NM state version "
+ getCurrentVersion() + ", but loading version " + loadedVersion);
}
}
} }

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* The version information of DB Schema for NM.
*/
@Private
@Unstable
public abstract class NMDBSchemaVersion {
public static NMDBSchemaVersion newInstance(int majorVersion, int minorVersion) {
NMDBSchemaVersion version = Records.newRecord(NMDBSchemaVersion.class);
version.setMajorVersion(majorVersion);
version.setMinorVersion(minorVersion);
return version;
}
public abstract int getMajorVersion();
public abstract void setMajorVersion(int majorVersion);
public abstract int getMinorVersion();
public abstract void setMinorVersion(int minorVersion);
public String toString() {
return getMajorVersion() + "." + getMinorVersion();
}
public boolean isCompatibleTo(NMDBSchemaVersion version) {
return getMajorVersion() == version.getMajorVersion();
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + getMajorVersion();
result = prime * result + getMinorVersion();
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
NMDBSchemaVersion other = (NMDBSchemaVersion) obj;
if (this.getMajorVersion() == other.getMajorVersion()
&& this.getMinorVersion() == other.getMinorVersion()) {
return true;
} else {
return false;
}
}
}

View File

@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProtoOrBuilder;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
@Private
@Evolving
public class NMDBSchemaVersionPBImpl extends NMDBSchemaVersion {
NMDBSchemaVersionProto proto = NMDBSchemaVersionProto.getDefaultInstance();
NMDBSchemaVersionProto.Builder builder = null;
boolean viaProto = false;
public NMDBSchemaVersionPBImpl() {
builder = NMDBSchemaVersionProto.newBuilder();
}
public NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto proto) {
this.proto = proto;
viaProto = true;
}
public NMDBSchemaVersionProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = NMDBSchemaVersionProto.newBuilder(proto);
}
viaProto = false;
}
@Override
public int getMajorVersion() {
NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder;
return p.getMajorVersion();
}
@Override
public void setMajorVersion(int majorVersion) {
maybeInitBuilder();
builder.setMajorVersion(majorVersion);
}
@Override
public int getMinorVersion() {
NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder;
return p.getMinorVersion();
}
@Override
public void setMinorVersion(int minorVersion) {
maybeInitBuilder();
builder.setMinorVersion(minorVersion);
}
}

View File

@ -38,3 +38,9 @@ message LocalizedResourceProto {
optional string localPath = 2; optional string localPath = 2;
optional int64 size = 3; optional int64 size = 3;
} }
message NMDBSchemaVersionProto {
optional int32 majorVersion = 1;
optional int32 minorVersion = 2;
}

View File

@ -29,6 +29,7 @@ import java.util.Map;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResource;
@ -45,9 +46,11 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokenState;
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources; import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager; import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -102,6 +105,36 @@ public class TestNMLeveldbStateStoreService {
assertTrue(stateStore.canRecover()); assertTrue(stateStore.canRecover());
verifyEmptyState(); verifyEmptyState();
} }
@Test
public void testCheckVersion() throws IOException {
// default version
NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion();
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
// compatible version
NMDBSchemaVersion compatibleVersion =
NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2);
stateStore.storeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
restartStateStore();
// overwrite the compatible version
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
// incompatible version
NMDBSchemaVersion incompatibleVersion =
NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion());
stateStore.storeVersion(incompatibleVersion);
try {
restartStateStore();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for NM state:"));
}
}
@Test @Test
public void testStartResourceLocalization() throws IOException { public void testStartResourceLocalization() throws IOException {