YARN-2347. Consolidated RMStateVersion and NMDBSchemaVersion into Version in yarn-server-common. Contributed by Junping Du.
svn merge --ignore-ancestry -c 1614838 ../../trunk/ git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1614839 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
93cfcaa9aa
commit
f52092be46
|
@ -82,13 +82,13 @@ import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.fusesource.leveldbjni.JniDBFactory;
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
|
@ -151,8 +151,8 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||||
|
|
||||||
private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
|
private static final String STATE_DB_NAME = "mapreduce_shuffle_state";
|
||||||
private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
|
private static final String STATE_DB_SCHEMA_VERSION_KEY = "shuffle-schema-version";
|
||||||
protected static final NMDBSchemaVersion CURRENT_VERSION_INFO =
|
protected static final Version CURRENT_VERSION_INFO =
|
||||||
NMDBSchemaVersion.newInstance(1, 0);
|
Version.newInstance(1, 0);
|
||||||
|
|
||||||
private int port;
|
private int port;
|
||||||
private ChannelFactory selector;
|
private ChannelFactory selector;
|
||||||
|
@ -491,21 +491,21 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
NMDBSchemaVersion loadVersion() throws IOException {
|
Version loadVersion() throws IOException {
|
||||||
byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
|
byte[] data = stateDb.get(bytes(STATE_DB_SCHEMA_VERSION_KEY));
|
||||||
// if version is not stored previously, treat it as 1.0.
|
// if version is not stored previously, treat it as 1.0.
|
||||||
if (data == null || data.length == 0) {
|
if (data == null || data.length == 0) {
|
||||||
return NMDBSchemaVersion.newInstance(1, 0);
|
return Version.newInstance(1, 0);
|
||||||
}
|
}
|
||||||
NMDBSchemaVersion version =
|
Version version =
|
||||||
new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void storeSchemaVersion(NMDBSchemaVersion version) throws IOException {
|
private void storeSchemaVersion(Version version) throws IOException {
|
||||||
String key = STATE_DB_SCHEMA_VERSION_KEY;
|
String key = STATE_DB_SCHEMA_VERSION_KEY;
|
||||||
byte[] data =
|
byte[] data =
|
||||||
((NMDBSchemaVersionPBImpl) version).getProto().toByteArray();
|
((VersionPBImpl) version).getProto().toByteArray();
|
||||||
try {
|
try {
|
||||||
stateDb.put(bytes(key), data);
|
stateDb.put(bytes(key), data);
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
|
@ -519,11 +519,11 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||||
|
|
||||||
// Only used for test
|
// Only used for test
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void storeVersion(NMDBSchemaVersion version) throws IOException {
|
void storeVersion(Version version) throws IOException {
|
||||||
storeSchemaVersion(version);
|
storeSchemaVersion(version);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected NMDBSchemaVersion getCurrentVersion() {
|
protected Version getCurrentVersion() {
|
||||||
return CURRENT_VERSION_INFO;
|
return CURRENT_VERSION_INFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -538,7 +538,7 @@ public class ShuffleHandler extends AuxiliaryService {
|
||||||
* upgrade shuffle info or remove incompatible old state.
|
* upgrade shuffle info or remove incompatible old state.
|
||||||
*/
|
*/
|
||||||
private void checkVersion() throws IOException {
|
private void checkVersion() throws IOException {
|
||||||
NMDBSchemaVersion loadedVersion = loadVersion();
|
Version loadedVersion = loadVersion();
|
||||||
LOG.info("Loaded state DB schema version info " + loadedVersion);
|
LOG.info("Loaded state DB schema version info " + loadedVersion);
|
||||||
if (loadedVersion.equals(getCurrentVersion())) {
|
if (loadedVersion.equals(getCurrentVersion())) {
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -75,7 +75,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.jboss.netty.channel.Channel;
|
import org.jboss.netty.channel.Channel;
|
||||||
import org.jboss.netty.channel.ChannelFuture;
|
import org.jboss.netty.channel.ChannelFuture;
|
||||||
import org.jboss.netty.channel.ChannelHandlerContext;
|
import org.jboss.netty.channel.ChannelHandlerContext;
|
||||||
|
@ -764,11 +764,11 @@ public class TestShuffleHandler {
|
||||||
// verify we are still authorized to shuffle to the old application
|
// verify we are still authorized to shuffle to the old application
|
||||||
rc = getShuffleResponseCode(shuffle, jt);
|
rc = getShuffleResponseCode(shuffle, jt);
|
||||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
|
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
|
||||||
NMDBSchemaVersion version = NMDBSchemaVersion.newInstance(1, 0);
|
Version version = Version.newInstance(1, 0);
|
||||||
Assert.assertEquals(version, shuffle.getCurrentVersion());
|
Assert.assertEquals(version, shuffle.getCurrentVersion());
|
||||||
|
|
||||||
// emulate shuffle handler restart with compatible version
|
// emulate shuffle handler restart with compatible version
|
||||||
NMDBSchemaVersion version11 = NMDBSchemaVersion.newInstance(1, 1);
|
Version version11 = Version.newInstance(1, 1);
|
||||||
// update version info before close shuffle
|
// update version info before close shuffle
|
||||||
shuffle.storeVersion(version11);
|
shuffle.storeVersion(version11);
|
||||||
Assert.assertEquals(version11, shuffle.loadVersion());
|
Assert.assertEquals(version11, shuffle.loadVersion());
|
||||||
|
@ -785,7 +785,7 @@ public class TestShuffleHandler {
|
||||||
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
|
Assert.assertEquals(HttpURLConnection.HTTP_OK, rc);
|
||||||
|
|
||||||
// emulate shuffle handler restart with incompatible version
|
// emulate shuffle handler restart with incompatible version
|
||||||
NMDBSchemaVersion version21 = NMDBSchemaVersion.newInstance(2, 1);
|
Version version21 = Version.newInstance(2, 1);
|
||||||
shuffle.storeVersion(version21);
|
shuffle.storeVersion(version21);
|
||||||
Assert.assertEquals(version21, shuffle.loadVersion());
|
Assert.assertEquals(version21, shuffle.loadVersion());
|
||||||
shuffle.close();
|
shuffle.close();
|
||||||
|
|
|
@ -56,6 +56,9 @@ Release 2.6.0 - UNRELEASED
|
||||||
YARN-2328. FairScheduler: Verify update and continuous scheduling threads are
|
YARN-2328. FairScheduler: Verify update and continuous scheduling threads are
|
||||||
stopped when the scheduler is stopped. (kasha)
|
stopped when the scheduler is stopped. (kasha)
|
||||||
|
|
||||||
|
YARN-2347. Consolidated RMStateVersion and NMDBSchemaVersion into Version in
|
||||||
|
yarn-server-common. (Junping Du via zjshen)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -130,11 +130,6 @@ message ApplicationAttemptStateDataProto {
|
||||||
optional int32 am_container_exit_status = 9 [default = -1000];
|
optional int32 am_container_exit_status = 9 [default = -1000];
|
||||||
}
|
}
|
||||||
|
|
||||||
message RMStateVersionProto {
|
|
||||||
optional int32 major_version = 1;
|
|
||||||
optional int32 minor_version = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
message EpochProto {
|
message EpochProto {
|
||||||
optional int64 epoch = 1;
|
optional int64 epoch = 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,21 +15,26 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.recovery.records;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
package org.apache.hadoop.yarn.server.records;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The version information of DB Schema for NM.
|
* The version information for state get stored in YARN components,
|
||||||
|
* i.e. RMState, NMState, etc., which include: majorVersion and
|
||||||
|
* minorVersion.
|
||||||
|
* The major version update means incompatible changes happen while
|
||||||
|
* minor version update indicates compatible changes.
|
||||||
*/
|
*/
|
||||||
@Private
|
@LimitedPrivate({"YARN", "MapReduce"})
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract class NMDBSchemaVersion {
|
public abstract class Version {
|
||||||
|
|
||||||
public static NMDBSchemaVersion newInstance(int majorVersion, int minorVersion) {
|
public static Version newInstance(int majorVersion, int minorVersion) {
|
||||||
NMDBSchemaVersion version = Records.newRecord(NMDBSchemaVersion.class);
|
Version version = Records.newRecord(Version.class);
|
||||||
version.setMajorVersion(majorVersion);
|
version.setMajorVersion(majorVersion);
|
||||||
version.setMinorVersion(minorVersion);
|
version.setMinorVersion(minorVersion);
|
||||||
return version;
|
return version;
|
||||||
|
@ -47,7 +52,7 @@ public abstract class NMDBSchemaVersion {
|
||||||
return getMajorVersion() + "." + getMinorVersion();
|
return getMajorVersion() + "." + getMinorVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isCompatibleTo(NMDBSchemaVersion version) {
|
public boolean isCompatibleTo(Version version) {
|
||||||
return getMajorVersion() == version.getMajorVersion();
|
return getMajorVersion() == version.getMajorVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -68,7 +73,7 @@ public abstract class NMDBSchemaVersion {
|
||||||
return false;
|
return false;
|
||||||
if (getClass() != obj.getClass())
|
if (getClass() != obj.getClass())
|
||||||
return false;
|
return false;
|
||||||
NMDBSchemaVersion other = (NMDBSchemaVersion) obj;
|
Version other = (Version) obj;
|
||||||
if (this.getMajorVersion() == other.getMajorVersion()
|
if (this.getMajorVersion() == other.getMajorVersion()
|
||||||
&& this.getMinorVersion() == other.getMinorVersion()) {
|
&& this.getMinorVersion() == other.getMinorVersion()) {
|
||||||
return true;
|
return true;
|
||||||
|
@ -76,5 +81,4 @@ public abstract class NMDBSchemaVersion {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
|
@ -16,28 +16,29 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb;
|
package org.apache.hadoop.yarn.server.records.impl.pb;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
||||||
|
|
||||||
public class RMStateVersionPBImpl extends RMStateVersion {
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
|
|
||||||
RMStateVersionProto proto = RMStateVersionProto.getDefaultInstance();
|
public class VersionPBImpl extends Version {
|
||||||
RMStateVersionProto.Builder builder = null;
|
|
||||||
|
VersionProto proto = VersionProto.getDefaultInstance();
|
||||||
|
VersionProto.Builder builder = null;
|
||||||
boolean viaProto = false;
|
boolean viaProto = false;
|
||||||
|
|
||||||
public RMStateVersionPBImpl() {
|
public VersionPBImpl() {
|
||||||
builder = RMStateVersionProto.newBuilder();
|
builder = VersionProto.newBuilder();
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMStateVersionPBImpl(RMStateVersionProto proto) {
|
public VersionPBImpl(VersionProto proto) {
|
||||||
this.proto = proto;
|
this.proto = proto;
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMStateVersionProto getProto() {
|
public VersionProto getProto() {
|
||||||
proto = viaProto ? proto : builder.build();
|
proto = viaProto ? proto : builder.build();
|
||||||
viaProto = true;
|
viaProto = true;
|
||||||
return proto;
|
return proto;
|
||||||
|
@ -45,14 +46,14 @@ public class RMStateVersionPBImpl extends RMStateVersion {
|
||||||
|
|
||||||
private void maybeInitBuilder() {
|
private void maybeInitBuilder() {
|
||||||
if (viaProto || builder == null) {
|
if (viaProto || builder == null) {
|
||||||
builder = RMStateVersionProto.newBuilder(proto);
|
builder = VersionProto.newBuilder(proto);
|
||||||
}
|
}
|
||||||
viaProto = false;
|
viaProto = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getMajorVersion() {
|
public int getMajorVersion() {
|
||||||
RMStateVersionProtoOrBuilder p = viaProto ? proto : builder;
|
VersionProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
return p.getMajorVersion();
|
return p.getMajorVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,7 +65,7 @@ public class RMStateVersionPBImpl extends RMStateVersion {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int getMinorVersion() {
|
public int getMinorVersion() {
|
||||||
RMStateVersionProtoOrBuilder p = viaProto ? proto : builder;
|
VersionProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
return p.getMinorVersion();
|
return p.getMinorVersion();
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,4 +47,10 @@ message NodeHealthStatusProto {
|
||||||
optional bool is_node_healthy = 1;
|
optional bool is_node_healthy = 1;
|
||||||
optional string health_report = 2;
|
optional string health_report = 2;
|
||||||
optional int64 last_health_report_time = 3;
|
optional int64 last_health_report_time = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message VersionProto {
|
||||||
|
optional int32 major_version = 1;
|
||||||
|
optional int32 minor_version = 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,13 +41,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb.NMDBSchemaVersionPBImpl;
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.fusesource.leveldbjni.JniDBFactory;
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
|
@ -68,7 +68,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
private static final String DB_NAME = "yarn-nm-state";
|
private static final String DB_NAME = "yarn-nm-state";
|
||||||
private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
|
private static final String DB_SCHEMA_VERSION_KEY = "nm-schema-version";
|
||||||
|
|
||||||
private static final NMDBSchemaVersion CURRENT_VERSION_INFO = NMDBSchemaVersion
|
private static final Version CURRENT_VERSION_INFO = Version
|
||||||
.newInstance(1, 0);
|
.newInstance(1, 0);
|
||||||
|
|
||||||
private static final String DELETION_TASK_KEY_PREFIX =
|
private static final String DELETION_TASK_KEY_PREFIX =
|
||||||
|
@ -617,14 +617,14 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
NMDBSchemaVersion loadVersion() throws IOException {
|
Version loadVersion() throws IOException {
|
||||||
byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
|
byte[] data = db.get(bytes(DB_SCHEMA_VERSION_KEY));
|
||||||
// if version is not stored previously, treat it as 1.0.
|
// if version is not stored previously, treat it as 1.0.
|
||||||
if (data == null || data.length == 0) {
|
if (data == null || data.length == 0) {
|
||||||
return NMDBSchemaVersion.newInstance(1, 0);
|
return Version.newInstance(1, 0);
|
||||||
}
|
}
|
||||||
NMDBSchemaVersion version =
|
Version version =
|
||||||
new NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto.parseFrom(data));
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -634,14 +634,14 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
|
|
||||||
// Only used for test
|
// Only used for test
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
void storeVersion(NMDBSchemaVersion state) throws IOException {
|
void storeVersion(Version state) throws IOException {
|
||||||
dbStoreVersion(state);
|
dbStoreVersion(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void dbStoreVersion(NMDBSchemaVersion state) throws IOException {
|
private void dbStoreVersion(Version state) throws IOException {
|
||||||
String key = DB_SCHEMA_VERSION_KEY;
|
String key = DB_SCHEMA_VERSION_KEY;
|
||||||
byte[] data =
|
byte[] data =
|
||||||
((NMDBSchemaVersionPBImpl) state).getProto().toByteArray();
|
((VersionPBImpl) state).getProto().toByteArray();
|
||||||
try {
|
try {
|
||||||
db.put(bytes(key), data);
|
db.put(bytes(key), data);
|
||||||
} catch (DBException e) {
|
} catch (DBException e) {
|
||||||
|
@ -649,7 +649,7 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
NMDBSchemaVersion getCurrentVersion() {
|
Version getCurrentVersion() {
|
||||||
return CURRENT_VERSION_INFO;
|
return CURRENT_VERSION_INFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -664,9 +664,9 @@ public class NMLeveldbStateStoreService extends NMStateStoreService {
|
||||||
* upgrade NM state or remove incompatible old state.
|
* upgrade NM state or remove incompatible old state.
|
||||||
*/
|
*/
|
||||||
private void checkVersion() throws IOException {
|
private void checkVersion() throws IOException {
|
||||||
NMDBSchemaVersion loadedVersion = loadVersion();
|
Version loadedVersion = loadVersion();
|
||||||
LOG.info("Loaded NM state version info " + loadedVersion);
|
LOG.info("Loaded NM state version info " + loadedVersion);
|
||||||
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
|
if (loadedVersion.equals(getCurrentVersion())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
|
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
|
||||||
|
|
|
@ -1,81 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.yarn.server.nodemanager.recovery.records.impl.pb;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.NMDBSchemaVersionProtoOrBuilder;
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
|
||||||
|
|
||||||
@Private
|
|
||||||
@Evolving
|
|
||||||
public class NMDBSchemaVersionPBImpl extends NMDBSchemaVersion {
|
|
||||||
|
|
||||||
NMDBSchemaVersionProto proto = NMDBSchemaVersionProto.getDefaultInstance();
|
|
||||||
NMDBSchemaVersionProto.Builder builder = null;
|
|
||||||
boolean viaProto = false;
|
|
||||||
|
|
||||||
public NMDBSchemaVersionPBImpl() {
|
|
||||||
builder = NMDBSchemaVersionProto.newBuilder();
|
|
||||||
}
|
|
||||||
|
|
||||||
public NMDBSchemaVersionPBImpl(NMDBSchemaVersionProto proto) {
|
|
||||||
this.proto = proto;
|
|
||||||
viaProto = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
public NMDBSchemaVersionProto getProto() {
|
|
||||||
proto = viaProto ? proto : builder.build();
|
|
||||||
viaProto = true;
|
|
||||||
return proto;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void maybeInitBuilder() {
|
|
||||||
if (viaProto || builder == null) {
|
|
||||||
builder = NMDBSchemaVersionProto.newBuilder(proto);
|
|
||||||
}
|
|
||||||
viaProto = false;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMajorVersion() {
|
|
||||||
NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder;
|
|
||||||
return p.getMajorVersion();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setMajorVersion(int majorVersion) {
|
|
||||||
maybeInitBuilder();
|
|
||||||
builder.setMajorVersion(majorVersion);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMinorVersion() {
|
|
||||||
NMDBSchemaVersionProtoOrBuilder p = viaProto ? proto : builder;
|
|
||||||
return p.getMinorVersion();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setMinorVersion(int minorVersion) {
|
|
||||||
maybeInitBuilder();
|
|
||||||
builder.setMinorVersion(minorVersion);
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
|
@ -39,8 +39,3 @@ message LocalizedResourceProto {
|
||||||
optional int64 size = 3;
|
optional int64 size = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
message NMDBSchemaVersionProto {
|
|
||||||
optional int32 majorVersion = 1;
|
|
||||||
optional int32 minorVersion = 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
|
@ -49,7 +49,7 @@ import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.Re
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLocalizationState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredNMTokensState;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
|
import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredUserResources;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.recovery.records.NMDBSchemaVersion;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.BaseContainerTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||||
|
@ -114,12 +114,12 @@ public class TestNMLeveldbStateStoreService {
|
||||||
@Test
|
@Test
|
||||||
public void testCheckVersion() throws IOException {
|
public void testCheckVersion() throws IOException {
|
||||||
// default version
|
// default version
|
||||||
NMDBSchemaVersion defaultVersion = stateStore.getCurrentVersion();
|
Version defaultVersion = stateStore.getCurrentVersion();
|
||||||
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
|
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
|
||||||
|
|
||||||
// compatible version
|
// compatible version
|
||||||
NMDBSchemaVersion compatibleVersion =
|
Version compatibleVersion =
|
||||||
NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion(),
|
Version.newInstance(defaultVersion.getMajorVersion(),
|
||||||
defaultVersion.getMinorVersion() + 2);
|
defaultVersion.getMinorVersion() + 2);
|
||||||
stateStore.storeVersion(compatibleVersion);
|
stateStore.storeVersion(compatibleVersion);
|
||||||
Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
|
Assert.assertEquals(compatibleVersion, stateStore.loadVersion());
|
||||||
|
@ -128,8 +128,8 @@ public class TestNMLeveldbStateStoreService {
|
||||||
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
|
Assert.assertEquals(defaultVersion, stateStore.loadVersion());
|
||||||
|
|
||||||
// incompatible version
|
// incompatible version
|
||||||
NMDBSchemaVersion incompatibleVersion =
|
Version incompatibleVersion =
|
||||||
NMDBSchemaVersion.newInstance(defaultVersion.getMajorVersion() + 1,
|
Version.newInstance(defaultVersion.getMajorVersion() + 1,
|
||||||
defaultVersion.getMinorVersion());
|
defaultVersion.getMinorVersion());
|
||||||
stateStore.storeVersion(incompatibleVersion);
|
stateStore.storeVersion(incompatibleVersion);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -44,22 +44,22 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
@ -77,7 +77,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
|
public static final Log LOG = LogFactory.getLog(FileSystemRMStateStore.class);
|
||||||
|
|
||||||
protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
|
protected static final String ROOT_DIR_NAME = "FSRMStateRoot";
|
||||||
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
|
protected static final Version CURRENT_VERSION_INFO = Version
|
||||||
.newInstance(1, 1);
|
.newInstance(1, 1);
|
||||||
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
|
protected static final String AMRMTOKEN_SECRET_MANAGER_NODE =
|
||||||
"AMRMTokenSecretManagerNode";
|
"AMRMTokenSecretManagerNode";
|
||||||
|
@ -130,18 +130,18 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RMStateVersion getCurrentVersion() {
|
protected Version getCurrentVersion() {
|
||||||
return CURRENT_VERSION_INFO;
|
return CURRENT_VERSION_INFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized RMStateVersion loadVersion() throws Exception {
|
protected synchronized Version loadVersion() throws Exception {
|
||||||
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
||||||
if (fs.exists(versionNodePath)) {
|
if (fs.exists(versionNodePath)) {
|
||||||
FileStatus status = fs.getFileStatus(versionNodePath);
|
FileStatus status = fs.getFileStatus(versionNodePath);
|
||||||
byte[] data = readFile(versionNodePath, status.getLen());
|
byte[] data = readFile(versionNodePath, status.getLen());
|
||||||
RMStateVersion version =
|
Version version =
|
||||||
new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -151,7 +151,7 @@ public class FileSystemRMStateStore extends RMStateStore {
|
||||||
protected synchronized void storeVersion() throws Exception {
|
protected synchronized void storeVersion() throws Exception {
|
||||||
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
Path versionNodePath = getNodePath(rootDirPath, VERSION_NODE);
|
||||||
byte[] data =
|
byte[] data =
|
||||||
((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
||||||
if (fs.exists(versionNodePath)) {
|
if (fs.exists(versionNodePath)) {
|
||||||
updateFile(versionNodePath, data);
|
updateFile(versionNodePath, data);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -32,10 +32,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -259,7 +259,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RMStateVersion loadVersion() throws Exception {
|
protected Version loadVersion() throws Exception {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -268,7 +268,7 @@ public class MemoryRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RMStateVersion getCurrentVersion() {
|
protected Version getCurrentVersion() {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,10 +25,10 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
||||||
|
|
||||||
@Unstable
|
@Unstable
|
||||||
public class NullRMStateStore extends RMStateStore {
|
public class NullRMStateStore extends RMStateStore {
|
||||||
|
@ -123,7 +123,7 @@ public class NullRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RMStateVersion loadVersion() throws Exception {
|
protected Version loadVersion() throws Exception {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -134,7 +134,7 @@ public class NullRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RMStateVersion getCurrentVersion() {
|
protected Version getCurrentVersion() {
|
||||||
// Do nothing
|
// Do nothing
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,12 +47,12 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMFatalEventType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppNewSavedEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
|
@ -493,14 +493,14 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
* upgrade RM state.
|
* upgrade RM state.
|
||||||
*/
|
*/
|
||||||
public void checkVersion() throws Exception {
|
public void checkVersion() throws Exception {
|
||||||
RMStateVersion loadedVersion = loadVersion();
|
Version loadedVersion = loadVersion();
|
||||||
LOG.info("Loaded RM state version info " + loadedVersion);
|
LOG.info("Loaded RM state version info " + loadedVersion);
|
||||||
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
|
if (loadedVersion != null && loadedVersion.equals(getCurrentVersion())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// if there is no version info, treat it as 1.0;
|
// if there is no version info, treat it as 1.0;
|
||||||
if (loadedVersion == null) {
|
if (loadedVersion == null) {
|
||||||
loadedVersion = RMStateVersion.newInstance(1, 0);
|
loadedVersion = Version.newInstance(1, 0);
|
||||||
}
|
}
|
||||||
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
|
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
|
||||||
LOG.info("Storing RM state version info " + getCurrentVersion());
|
LOG.info("Storing RM state version info " + getCurrentVersion());
|
||||||
|
@ -516,7 +516,7 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
* Derived class use this method to load the version information from state
|
* Derived class use this method to load the version information from state
|
||||||
* store.
|
* store.
|
||||||
*/
|
*/
|
||||||
protected abstract RMStateVersion loadVersion() throws Exception;
|
protected abstract Version loadVersion() throws Exception;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Derived class use this method to store the version information.
|
* Derived class use this method to store the version information.
|
||||||
|
@ -526,7 +526,7 @@ public abstract class RMStateStore extends AbstractService {
|
||||||
/**
|
/**
|
||||||
* Get the current version of the underlying state store.
|
* Get the current version of the underlying state store.
|
||||||
*/
|
*/
|
||||||
protected abstract RMStateVersion getCurrentVersion();
|
protected abstract Version getCurrentVersion();
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -44,23 +44,23 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationAttemptStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.ApplicationStateDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.RMStateVersionProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerServiceProtos.EpochProto;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMZKUtils;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.EpochPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
import org.apache.zookeeper.CreateMode;
|
import org.apache.zookeeper.CreateMode;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -86,7 +86,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
private final SecureRandom random = new SecureRandom();
|
private final SecureRandom random = new SecureRandom();
|
||||||
|
|
||||||
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
protected static final String ROOT_ZNODE_NAME = "ZKRMStateRoot";
|
||||||
protected static final RMStateVersion CURRENT_VERSION_INFO = RMStateVersion
|
protected static final Version CURRENT_VERSION_INFO = Version
|
||||||
.newInstance(1, 1);
|
.newInstance(1, 1);
|
||||||
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
|
private static final String RM_DELEGATION_TOKENS_ROOT_ZNODE_NAME =
|
||||||
"RMDelegationTokensRoot";
|
"RMDelegationTokensRoot";
|
||||||
|
@ -377,7 +377,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected RMStateVersion getCurrentVersion() {
|
protected Version getCurrentVersion() {
|
||||||
return CURRENT_VERSION_INFO;
|
return CURRENT_VERSION_INFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -385,7 +385,7 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
protected synchronized void storeVersion() throws Exception {
|
protected synchronized void storeVersion() throws Exception {
|
||||||
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
||||||
byte[] data =
|
byte[] data =
|
||||||
((RMStateVersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
((VersionPBImpl) CURRENT_VERSION_INFO).getProto().toByteArray();
|
||||||
if (existsWithRetries(versionNodePath, true) != null) {
|
if (existsWithRetries(versionNodePath, true) != null) {
|
||||||
setDataWithRetries(versionNodePath, data, -1);
|
setDataWithRetries(versionNodePath, data, -1);
|
||||||
} else {
|
} else {
|
||||||
|
@ -394,13 +394,13 @@ public class ZKRMStateStore extends RMStateStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected synchronized RMStateVersion loadVersion() throws Exception {
|
protected synchronized Version loadVersion() throws Exception {
|
||||||
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
String versionNodePath = getNodePath(zkRootNodePath, VERSION_NODE);
|
||||||
|
|
||||||
if (existsWithRetries(versionNodePath, true) != null) {
|
if (existsWithRetries(versionNodePath, true) != null) {
|
||||||
byte[] data = getDataWithRetries(versionNodePath, true);
|
byte[] data = getDataWithRetries(versionNodePath, true);
|
||||||
RMStateVersion version =
|
Version version =
|
||||||
new RMStateVersionPBImpl(RMStateVersionProto.parseFrom(data));
|
new VersionPBImpl(VersionProto.parseFrom(data));
|
||||||
return version;
|
return version;
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -1,80 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery.records;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* The version information of RM state.
|
|
||||||
*/
|
|
||||||
@Private
|
|
||||||
@Unstable
|
|
||||||
public abstract class RMStateVersion {
|
|
||||||
|
|
||||||
public static RMStateVersion newInstance(int majorVersion, int minorVersion) {
|
|
||||||
RMStateVersion version = Records.newRecord(RMStateVersion.class);
|
|
||||||
version.setMajorVersion(majorVersion);
|
|
||||||
version.setMinorVersion(minorVersion);
|
|
||||||
return version;
|
|
||||||
}
|
|
||||||
|
|
||||||
public abstract int getMajorVersion();
|
|
||||||
|
|
||||||
public abstract void setMajorVersion(int majorVersion);
|
|
||||||
|
|
||||||
public abstract int getMinorVersion();
|
|
||||||
|
|
||||||
public abstract void setMinorVersion(int minorVersion);
|
|
||||||
|
|
||||||
public String toString() {
|
|
||||||
return getMajorVersion() + "." + getMinorVersion();
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isCompatibleTo(RMStateVersion version) {
|
|
||||||
return getMajorVersion() == version.getMajorVersion();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int hashCode() {
|
|
||||||
final int prime = 31;
|
|
||||||
int result = 1;
|
|
||||||
result = prime * result + getMajorVersion();
|
|
||||||
result = prime * result + getMinorVersion();
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean equals(Object obj) {
|
|
||||||
if (this == obj)
|
|
||||||
return true;
|
|
||||||
if (obj == null)
|
|
||||||
return false;
|
|
||||||
if (getClass() != obj.getClass())
|
|
||||||
return false;
|
|
||||||
RMStateVersion other = (RMStateVersion) obj;
|
|
||||||
if (this.getMajorVersion() == other.getMajorVersion()
|
|
||||||
&& this.getMinorVersion() == other.getMinorVersion()) {
|
|
||||||
return true;
|
|
||||||
} else {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -55,13 +55,13 @@ import org.apache.hadoop.yarn.event.Dispatcher;
|
||||||
import org.apache.hadoop.yarn.event.EventHandler;
|
import org.apache.hadoop.yarn.event.EventHandler;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationAttemptState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMDTSecretManagerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
@ -111,8 +111,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
interface RMStateStoreHelper {
|
interface RMStateStoreHelper {
|
||||||
RMStateStore getRMStateStore() throws Exception;
|
RMStateStore getRMStateStore() throws Exception;
|
||||||
boolean isFinalStateValid() throws Exception;
|
boolean isFinalStateValid() throws Exception;
|
||||||
void writeVersion(RMStateVersion version) throws Exception;
|
void writeVersion(Version version) throws Exception;
|
||||||
RMStateVersion getCurrentVersion() throws Exception;
|
Version getCurrentVersion() throws Exception;
|
||||||
boolean appExists(RMApp app) throws Exception;
|
boolean appExists(RMApp app) throws Exception;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -477,13 +477,13 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
store.setRMDispatcher(new TestDispatcher());
|
store.setRMDispatcher(new TestDispatcher());
|
||||||
|
|
||||||
// default version
|
// default version
|
||||||
RMStateVersion defaultVersion = stateStoreHelper.getCurrentVersion();
|
Version defaultVersion = stateStoreHelper.getCurrentVersion();
|
||||||
store.checkVersion();
|
store.checkVersion();
|
||||||
Assert.assertEquals(defaultVersion, store.loadVersion());
|
Assert.assertEquals(defaultVersion, store.loadVersion());
|
||||||
|
|
||||||
// compatible version
|
// compatible version
|
||||||
RMStateVersion compatibleVersion =
|
Version compatibleVersion =
|
||||||
RMStateVersion.newInstance(defaultVersion.getMajorVersion(),
|
Version.newInstance(defaultVersion.getMajorVersion(),
|
||||||
defaultVersion.getMinorVersion() + 2);
|
defaultVersion.getMinorVersion() + 2);
|
||||||
stateStoreHelper.writeVersion(compatibleVersion);
|
stateStoreHelper.writeVersion(compatibleVersion);
|
||||||
Assert.assertEquals(compatibleVersion, store.loadVersion());
|
Assert.assertEquals(compatibleVersion, store.loadVersion());
|
||||||
|
@ -492,8 +492,8 @@ public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||||
Assert.assertEquals(defaultVersion, store.loadVersion());
|
Assert.assertEquals(defaultVersion, store.loadVersion());
|
||||||
|
|
||||||
// incompatible version
|
// incompatible version
|
||||||
RMStateVersion incompatibleVersion =
|
Version incompatibleVersion =
|
||||||
RMStateVersion.newInstance(defaultVersion.getMajorVersion() + 2,
|
Version.newInstance(defaultVersion.getMajorVersion() + 2,
|
||||||
defaultVersion.getMinorVersion());
|
defaultVersion.getMinorVersion());
|
||||||
stateStoreHelper.writeVersion(incompatibleVersion);
|
stateStoreHelper.writeVersion(incompatibleVersion);
|
||||||
try {
|
try {
|
||||||
|
|
|
@ -36,9 +36,9 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||||
|
@ -70,7 +70,7 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE);
|
return new Path(new Path(workingDirPathURI, ROOT_DIR_NAME), VERSION_NODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMStateVersion getCurrentVersion() {
|
public Version getCurrentVersion() {
|
||||||
return CURRENT_VERSION_INFO;
|
return CURRENT_VERSION_INFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,13 +111,13 @@ public class TestFSRMStateStore extends RMStateStoreTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeVersion(RMStateVersion version) throws Exception {
|
public void writeVersion(Version version) throws Exception {
|
||||||
store.updateFile(store.getVersionNode(), ((RMStateVersionPBImpl) version)
|
store.updateFile(store.getVersionNode(), ((VersionPBImpl) version)
|
||||||
.getProto().toByteArray());
|
.getProto().toByteArray());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RMStateVersion getCurrentVersion() throws Exception {
|
public Version getCurrentVersion() throws Exception {
|
||||||
return store.getCurrentVersion();
|
return store.getCurrentVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,9 +32,9 @@ import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
import org.apache.hadoop.service.Service;
|
import org.apache.hadoop.service.Service;
|
||||||
import org.apache.hadoop.yarn.conf.HAUtil;
|
import org.apache.hadoop.yarn.conf.HAUtil;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMStateVersion;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.RMStateVersionPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.zookeeper.ZooKeeper;
|
import org.apache.zookeeper.ZooKeeper;
|
||||||
import org.apache.zookeeper.data.Stat;
|
import org.apache.zookeeper.data.Stat;
|
||||||
|
@ -69,7 +69,7 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
|
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
|
||||||
}
|
}
|
||||||
|
|
||||||
public RMStateVersion getCurrentVersion() {
|
public Version getCurrentVersion() {
|
||||||
return CURRENT_VERSION_INFO;
|
return CURRENT_VERSION_INFO;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -96,13 +96,13 @@ public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void writeVersion(RMStateVersion version) throws Exception {
|
public void writeVersion(Version version) throws Exception {
|
||||||
client.setData(store.getVersionNode(), ((RMStateVersionPBImpl) version)
|
client.setData(store.getVersionNode(), ((VersionPBImpl) version)
|
||||||
.getProto().toByteArray(), -1);
|
.getProto().toByteArray(), -1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public RMStateVersion getCurrentVersion() throws Exception {
|
public Version getCurrentVersion() throws Exception {
|
||||||
return store.getCurrentVersion();
|
return store.getCurrentVersion();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue