|
|
|
@ -0,0 +1,691 @@
|
|
|
|
|
/**
|
|
|
|
|
* Licensed to the Apache Software Foundation (ASF) under one
|
|
|
|
|
* or more contributor license agreements. See the NOTICE file
|
|
|
|
|
* distributed with this work for additional information
|
|
|
|
|
* regarding copyright ownership. The ASF licenses this file
|
|
|
|
|
* to you under the Apache License, Version 2.0 (the
|
|
|
|
|
* "License"); you may not use this file except in compliance
|
|
|
|
|
* with the License. You may obtain a copy of the License at
|
|
|
|
|
*
|
|
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
*
|
|
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
|
* limitations under the License.
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
|
|
|
|
|
|
|
|
|
import static org.fusesource.leveldbjni.JniDBFactory.asString;
|
|
|
|
|
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
|
|
|
|
|
|
|
|
|
|
import java.io.ByteArrayInputStream;
|
|
|
|
|
import java.io.ByteArrayOutputStream;
|
|
|
|
|
import java.io.DataInputStream;
|
|
|
|
|
import java.io.DataOutputStream;
|
|
|
|
|
import java.io.File;
|
|
|
|
|
import java.io.IOException;
|
|
|
|
|
import java.util.Map.Entry;
|
|
|
|
|
|
|
|
|
|
import org.apache.commons.logging.Log;
|
|
|
|
|
import org.apache.commons.logging.LogFactory;
|
|
|
|
|
import org.apache.hadoop.conf.Configuration;
|
|
|
|
|
import org.apache.hadoop.fs.FileSystem;
|
|
|
|
|
import org.apache.hadoop.fs.Path;
|
|
|
|
|
import org.apache.hadoop.fs.permission.FsPermission;
|
|
|
|
|
import org.apache.hadoop.io.IOUtils;
|
|
|
|
|
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
|
|
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
|
|
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
|
|
|
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
|
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
|
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.AMRMTokenSecretManagerStateProto;
|
|
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.EpochProto;
|
|
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationAttemptStateDataProto;
|
|
|
|
|
import org.apache.hadoop.yarn.proto.YarnServerResourceManagerRecoveryProtos.ApplicationStateDataProto;
|
|
|
|
|
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
|
|
|
|
import org.apache.hadoop.yarn.server.records.Version;
|
|
|
|
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.AMRMTokenSecretManagerState;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.Epoch;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.RMDelegationTokenIdentifierData;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.AMRMTokenSecretManagerStatePBImpl;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationAttemptStateDataPBImpl;
|
|
|
|
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.impl.pb.ApplicationStateDataPBImpl;
|
|
|
|
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
|
|
|
|
import org.apache.hadoop.yarn.util.ConverterUtils;
|
|
|
|
|
import org.fusesource.leveldbjni.JniDBFactory;
|
|
|
|
|
import org.fusesource.leveldbjni.internal.NativeDB;
|
|
|
|
|
import org.iq80.leveldb.DB;
|
|
|
|
|
import org.iq80.leveldb.DBException;
|
|
|
|
|
import org.iq80.leveldb.Logger;
|
|
|
|
|
import org.iq80.leveldb.Options;
|
|
|
|
|
import org.iq80.leveldb.WriteBatch;
|
|
|
|
|
|
|
|
|
|
import com.google.common.annotations.VisibleForTesting;
|
|
|
|
|
|
|
|
|
|
public class LeveldbRMStateStore extends RMStateStore {
|
|
|
|
|
|
|
|
|
|
public static final Log LOG =
|
|
|
|
|
LogFactory.getLog(LeveldbRMStateStore.class);
|
|
|
|
|
|
|
|
|
|
private static final String SEPARATOR = "/";
|
|
|
|
|
private static final String DB_NAME = "yarn-rm-state";
|
|
|
|
|
private static final String RM_DT_MASTER_KEY_KEY_PREFIX =
|
|
|
|
|
RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_KEY_PREFIX;
|
|
|
|
|
private static final String RM_DT_TOKEN_KEY_PREFIX =
|
|
|
|
|
RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + DELEGATION_TOKEN_PREFIX;
|
|
|
|
|
private static final String RM_DT_SEQUENCE_NUMBER_KEY =
|
|
|
|
|
RM_DT_SECRET_MANAGER_ROOT + SEPARATOR + "RMDTSequentialNumber";
|
|
|
|
|
private static final String RM_APP_KEY_PREFIX =
|
|
|
|
|
RM_APP_ROOT + SEPARATOR + ApplicationId.appIdStrPrefix;
|
|
|
|
|
|
|
|
|
|
private static final Version CURRENT_VERSION_INFO = Version
|
|
|
|
|
.newInstance(1, 0);
|
|
|
|
|
|
|
|
|
|
private DB db;
|
|
|
|
|
|
|
|
|
|
private String getApplicationNodeKey(ApplicationId appId) {
|
|
|
|
|
return RM_APP_ROOT + SEPARATOR + appId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getApplicationAttemptNodeKey(ApplicationAttemptId attemptId) {
|
|
|
|
|
return getApplicationAttemptNodeKey(
|
|
|
|
|
getApplicationNodeKey(attemptId.getApplicationId()), attemptId);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getApplicationAttemptNodeKey(String appNodeKey,
|
|
|
|
|
ApplicationAttemptId attemptId) {
|
|
|
|
|
return appNodeKey + SEPARATOR + attemptId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getRMDTMasterKeyNodeKey(DelegationKey masterKey) {
|
|
|
|
|
return RM_DT_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private String getRMDTTokenNodeKey(RMDelegationTokenIdentifier tokenId) {
|
|
|
|
|
return RM_DT_TOKEN_KEY_PREFIX + tokenId.getSequenceNumber();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void initInternal(Configuration conf) throws Exception {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Path getStorageDir() throws IOException {
|
|
|
|
|
Configuration conf = getConfig();
|
|
|
|
|
String storePath = conf.get(YarnConfiguration.RM_LEVELDB_STORE_PATH);
|
|
|
|
|
if (storePath == null) {
|
|
|
|
|
throw new IOException("No store location directory configured in " +
|
|
|
|
|
YarnConfiguration.RM_LEVELDB_STORE_PATH);
|
|
|
|
|
}
|
|
|
|
|
return new Path(storePath, DB_NAME);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private Path createStorageDir() throws IOException {
|
|
|
|
|
Path root = getStorageDir();
|
|
|
|
|
FileSystem fs = FileSystem.getLocal(getConfig());
|
|
|
|
|
fs.mkdirs(root, new FsPermission((short)0700));
|
|
|
|
|
return root;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void startInternal() throws Exception {
|
|
|
|
|
Path storeRoot = createStorageDir();
|
|
|
|
|
Options options = new Options();
|
|
|
|
|
options.createIfMissing(false);
|
|
|
|
|
options.logger(new LeveldbLogger());
|
|
|
|
|
LOG.info("Using state database at " + storeRoot + " for recovery");
|
|
|
|
|
File dbfile = new File(storeRoot.toString());
|
|
|
|
|
try {
|
|
|
|
|
db = JniDBFactory.factory.open(dbfile, options);
|
|
|
|
|
} catch (NativeDB.DBException e) {
|
|
|
|
|
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
|
|
|
|
|
LOG.info("Creating state database at " + dbfile);
|
|
|
|
|
options.createIfMissing(true);
|
|
|
|
|
try {
|
|
|
|
|
db = JniDBFactory.factory.open(dbfile, options);
|
|
|
|
|
// store version
|
|
|
|
|
storeVersion();
|
|
|
|
|
} catch (DBException dbErr) {
|
|
|
|
|
throw new IOException(dbErr.getMessage(), dbErr);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
throw e;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void closeInternal() throws Exception {
|
|
|
|
|
if (db != null) {
|
|
|
|
|
db.close();
|
|
|
|
|
db = null;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@VisibleForTesting
|
|
|
|
|
boolean isClosed() {
|
|
|
|
|
return db == null;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected Version loadVersion() throws Exception {
|
|
|
|
|
Version version = null;
|
|
|
|
|
try {
|
|
|
|
|
byte[] data = db.get(bytes(VERSION_NODE));
|
|
|
|
|
if (data != null) {
|
|
|
|
|
version = new VersionPBImpl(VersionProto.parseFrom(data));
|
|
|
|
|
}
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
return version;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void storeVersion() throws Exception {
|
|
|
|
|
dbStoreVersion(CURRENT_VERSION_INFO);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
void dbStoreVersion(Version state) throws IOException {
|
|
|
|
|
String key = VERSION_NODE;
|
|
|
|
|
byte[] data = ((VersionPBImpl) state).getProto().toByteArray();
|
|
|
|
|
try {
|
|
|
|
|
db.put(bytes(key), data);
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected Version getCurrentVersion() {
|
|
|
|
|
return CURRENT_VERSION_INFO;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public synchronized long getAndIncrementEpoch() throws Exception {
|
|
|
|
|
long currentEpoch = 0;
|
|
|
|
|
byte[] dbKeyBytes = bytes(EPOCH_NODE);
|
|
|
|
|
try {
|
|
|
|
|
byte[] data = db.get(dbKeyBytes);
|
|
|
|
|
if (data != null) {
|
|
|
|
|
currentEpoch = EpochProto.parseFrom(data).getEpoch();
|
|
|
|
|
}
|
|
|
|
|
EpochProto proto = Epoch.newInstance(currentEpoch + 1).getProto();
|
|
|
|
|
db.put(dbKeyBytes, proto.toByteArray());
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
return currentEpoch;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public RMState loadState() throws Exception {
|
|
|
|
|
RMState rmState = new RMState();
|
|
|
|
|
loadRMDTSecretManagerState(rmState);
|
|
|
|
|
loadRMApps(rmState);
|
|
|
|
|
loadAMRMTokenSecretManagerState(rmState);
|
|
|
|
|
return rmState;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void loadRMDTSecretManagerState(RMState state) throws IOException {
|
|
|
|
|
int numKeys = loadRMDTSecretManagerKeys(state);
|
|
|
|
|
LOG.info("Recovered " + numKeys + " RM delegation token master keys");
|
|
|
|
|
int numTokens = loadRMDTSecretManagerTokens(state);
|
|
|
|
|
LOG.info("Recovered " + numTokens + " RM delegation tokens");
|
|
|
|
|
loadRMDTSecretManagerTokenSequenceNumber(state);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private int loadRMDTSecretManagerKeys(RMState state) throws IOException {
|
|
|
|
|
int numKeys = 0;
|
|
|
|
|
LeveldbIterator iter = null;
|
|
|
|
|
try {
|
|
|
|
|
iter = new LeveldbIterator(db);
|
|
|
|
|
iter.seek(bytes(RM_DT_MASTER_KEY_KEY_PREFIX));
|
|
|
|
|
while (iter.hasNext()) {
|
|
|
|
|
Entry<byte[],byte[]> entry = iter.next();
|
|
|
|
|
String key = asString(entry.getKey());
|
|
|
|
|
if (!key.startsWith(RM_DT_MASTER_KEY_KEY_PREFIX)) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
DelegationKey masterKey = loadDelegationKey(entry.getValue());
|
|
|
|
|
state.rmSecretManagerState.masterKeyState.add(masterKey);
|
|
|
|
|
++numKeys;
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Loaded RM delegation key from " + key
|
|
|
|
|
+ ": keyId=" + masterKey.getKeyId()
|
|
|
|
|
+ ", expirationDate=" + masterKey.getExpiryDate());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
} finally {
|
|
|
|
|
if (iter != null) {
|
|
|
|
|
iter.close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return numKeys;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private DelegationKey loadDelegationKey(byte[] data) throws IOException {
|
|
|
|
|
DelegationKey key = new DelegationKey();
|
|
|
|
|
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
|
|
|
|
|
try {
|
|
|
|
|
key.readFields(in);
|
|
|
|
|
} finally {
|
|
|
|
|
IOUtils.cleanup(LOG, in);
|
|
|
|
|
}
|
|
|
|
|
return key;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private int loadRMDTSecretManagerTokens(RMState state) throws IOException {
|
|
|
|
|
int numTokens = 0;
|
|
|
|
|
LeveldbIterator iter = null;
|
|
|
|
|
try {
|
|
|
|
|
iter = new LeveldbIterator(db);
|
|
|
|
|
iter.seek(bytes(RM_DT_TOKEN_KEY_PREFIX));
|
|
|
|
|
while (iter.hasNext()) {
|
|
|
|
|
Entry<byte[],byte[]> entry = iter.next();
|
|
|
|
|
String key = asString(entry.getKey());
|
|
|
|
|
if (!key.startsWith(RM_DT_TOKEN_KEY_PREFIX)) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
RMDelegationTokenIdentifierData tokenData = loadDelegationToken(
|
|
|
|
|
entry.getValue());
|
|
|
|
|
RMDelegationTokenIdentifier tokenId = tokenData.getTokenIdentifier();
|
|
|
|
|
long renewDate = tokenData.getRenewDate();
|
|
|
|
|
state.rmSecretManagerState.delegationTokenState.put(tokenId,
|
|
|
|
|
renewDate);
|
|
|
|
|
++numTokens;
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Loaded RM delegation token from " + key
|
|
|
|
|
+ ": tokenId=" + tokenId + ", renewDate=" + renewDate);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
} finally {
|
|
|
|
|
if (iter != null) {
|
|
|
|
|
iter.close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return numTokens;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private RMDelegationTokenIdentifierData loadDelegationToken(byte[] data)
|
|
|
|
|
throws IOException {
|
|
|
|
|
RMDelegationTokenIdentifierData tokenData =
|
|
|
|
|
new RMDelegationTokenIdentifierData();
|
|
|
|
|
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
|
|
|
|
|
try {
|
|
|
|
|
tokenData.readFields(in);
|
|
|
|
|
} finally {
|
|
|
|
|
IOUtils.cleanup(LOG, in);
|
|
|
|
|
}
|
|
|
|
|
return tokenData;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void loadRMDTSecretManagerTokenSequenceNumber(RMState state)
|
|
|
|
|
throws IOException {
|
|
|
|
|
byte[] data = null;
|
|
|
|
|
try {
|
|
|
|
|
data = db.get(bytes(RM_DT_SEQUENCE_NUMBER_KEY));
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
if (data != null) {
|
|
|
|
|
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
|
|
|
|
|
try {
|
|
|
|
|
state.rmSecretManagerState.dtSequenceNumber = in.readInt();
|
|
|
|
|
} finally {
|
|
|
|
|
IOUtils.cleanup(LOG, in);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void loadRMApps(RMState state) throws IOException {
|
|
|
|
|
int numApps = 0;
|
|
|
|
|
int numAppAttempts = 0;
|
|
|
|
|
LeveldbIterator iter = null;
|
|
|
|
|
try {
|
|
|
|
|
iter = new LeveldbIterator(db);
|
|
|
|
|
iter.seek(bytes(RM_APP_KEY_PREFIX));
|
|
|
|
|
while (iter.hasNext()) {
|
|
|
|
|
Entry<byte[],byte[]> entry = iter.next();
|
|
|
|
|
String key = asString(entry.getKey());
|
|
|
|
|
if (!key.startsWith(RM_APP_KEY_PREFIX)) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String appIdStr = key.substring(RM_APP_ROOT.length() + 1);
|
|
|
|
|
if (appIdStr.contains(SEPARATOR)) {
|
|
|
|
|
LOG.warn("Skipping extraneous data " + key);
|
|
|
|
|
continue;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
numAppAttempts += loadRMApp(state, iter, appIdStr, entry.getValue());
|
|
|
|
|
++numApps;
|
|
|
|
|
}
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
} finally {
|
|
|
|
|
if (iter != null) {
|
|
|
|
|
iter.close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
LOG.info("Recovered " + numApps + " applications and " + numAppAttempts
|
|
|
|
|
+ " application attempts");
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private int loadRMApp(RMState rmState, LeveldbIterator iter, String appIdStr,
|
|
|
|
|
byte[] appData) throws IOException {
|
|
|
|
|
ApplicationStateData appState = createApplicationState(appIdStr, appData);
|
|
|
|
|
ApplicationId appId =
|
|
|
|
|
appState.getApplicationSubmissionContext().getApplicationId();
|
|
|
|
|
rmState.appState.put(appId, appState);
|
|
|
|
|
String attemptNodePrefix = getApplicationNodeKey(appId) + SEPARATOR;
|
|
|
|
|
while (iter.hasNext()) {
|
|
|
|
|
Entry<byte[],byte[]> entry = iter.peekNext();
|
|
|
|
|
String key = asString(entry.getKey());
|
|
|
|
|
if (!key.startsWith(attemptNodePrefix)) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String attemptId = key.substring(attemptNodePrefix.length());
|
|
|
|
|
if (attemptId.startsWith(ApplicationAttemptId.appAttemptIdStrPrefix)) {
|
|
|
|
|
ApplicationAttemptStateData attemptState =
|
|
|
|
|
createAttemptState(attemptId, entry.getValue());
|
|
|
|
|
appState.attempts.put(attemptState.getAttemptId(), attemptState);
|
|
|
|
|
} else {
|
|
|
|
|
LOG.warn("Ignoring unknown application key: " + key);
|
|
|
|
|
}
|
|
|
|
|
iter.next();
|
|
|
|
|
}
|
|
|
|
|
int numAttempts = appState.attempts.size();
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Loaded application " + appId + " with " + numAttempts
|
|
|
|
|
+ " attempts");
|
|
|
|
|
}
|
|
|
|
|
return numAttempts;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private ApplicationStateData createApplicationState(String appIdStr,
|
|
|
|
|
byte[] data) throws IOException {
|
|
|
|
|
ApplicationId appId = ConverterUtils.toApplicationId(appIdStr);
|
|
|
|
|
ApplicationStateDataPBImpl appState =
|
|
|
|
|
new ApplicationStateDataPBImpl(
|
|
|
|
|
ApplicationStateDataProto.parseFrom(data));
|
|
|
|
|
if (!appId.equals(
|
|
|
|
|
appState.getApplicationSubmissionContext().getApplicationId())) {
|
|
|
|
|
throw new YarnRuntimeException("The database entry for " + appId
|
|
|
|
|
+ " contains data for "
|
|
|
|
|
+ appState.getApplicationSubmissionContext().getApplicationId());
|
|
|
|
|
}
|
|
|
|
|
return appState;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@VisibleForTesting
|
|
|
|
|
ApplicationStateData loadRMAppState(ApplicationId appId) throws IOException {
|
|
|
|
|
String appKey = getApplicationNodeKey(appId);
|
|
|
|
|
byte[] data = null;
|
|
|
|
|
try {
|
|
|
|
|
data = db.get(bytes(appKey));
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
if (data == null) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
return createApplicationState(appId.toString(), data);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private ApplicationAttemptStateData createAttemptState(String itemName,
|
|
|
|
|
byte[] data) throws IOException {
|
|
|
|
|
ApplicationAttemptId attemptId =
|
|
|
|
|
ConverterUtils.toApplicationAttemptId(itemName);
|
|
|
|
|
ApplicationAttemptStateDataPBImpl attemptState =
|
|
|
|
|
new ApplicationAttemptStateDataPBImpl(
|
|
|
|
|
ApplicationAttemptStateDataProto.parseFrom(data));
|
|
|
|
|
if (!attemptId.equals(attemptState.getAttemptId())) {
|
|
|
|
|
throw new YarnRuntimeException("The database entry for " + attemptId
|
|
|
|
|
+ " contains data for " + attemptState.getAttemptId());
|
|
|
|
|
}
|
|
|
|
|
return attemptState;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private void loadAMRMTokenSecretManagerState(RMState rmState)
|
|
|
|
|
throws IOException {
|
|
|
|
|
try {
|
|
|
|
|
byte[] data = db.get(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT));
|
|
|
|
|
if (data != null) {
|
|
|
|
|
AMRMTokenSecretManagerStatePBImpl stateData =
|
|
|
|
|
new AMRMTokenSecretManagerStatePBImpl(
|
|
|
|
|
AMRMTokenSecretManagerStateProto.parseFrom(data));
|
|
|
|
|
rmState.amrmTokenSecretManagerState =
|
|
|
|
|
AMRMTokenSecretManagerState.newInstance(
|
|
|
|
|
stateData.getCurrentMasterKey(),
|
|
|
|
|
stateData.getNextMasterKey());
|
|
|
|
|
}
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void storeApplicationStateInternal(ApplicationId appId,
|
|
|
|
|
ApplicationStateData appStateData) throws IOException {
|
|
|
|
|
String key = getApplicationNodeKey(appId);
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Storing state for app " + appId + " at " + key);
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
db.put(bytes(key), appStateData.getProto().toByteArray());
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void updateApplicationStateInternal(ApplicationId appId,
|
|
|
|
|
ApplicationStateData appStateData) throws IOException {
|
|
|
|
|
storeApplicationStateInternal(appId, appStateData);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void storeApplicationAttemptStateInternal(
|
|
|
|
|
ApplicationAttemptId attemptId,
|
|
|
|
|
ApplicationAttemptStateData attemptStateData) throws IOException {
|
|
|
|
|
String key = getApplicationAttemptNodeKey(attemptId);
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Storing state for attempt " + attemptId + " at " + key);
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
db.put(bytes(key), attemptStateData.getProto().toByteArray());
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void updateApplicationAttemptStateInternal(
|
|
|
|
|
ApplicationAttemptId attemptId,
|
|
|
|
|
ApplicationAttemptStateData attemptStateData) throws IOException {
|
|
|
|
|
storeApplicationAttemptStateInternal(attemptId, attemptStateData);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void removeApplicationStateInternal(ApplicationStateData appState)
|
|
|
|
|
throws IOException {
|
|
|
|
|
ApplicationId appId =
|
|
|
|
|
appState.getApplicationSubmissionContext().getApplicationId();
|
|
|
|
|
String appKey = getApplicationNodeKey(appId);
|
|
|
|
|
try {
|
|
|
|
|
WriteBatch batch = db.createWriteBatch();
|
|
|
|
|
try {
|
|
|
|
|
batch.delete(bytes(appKey));
|
|
|
|
|
for (ApplicationAttemptId attemptId : appState.attempts.keySet()) {
|
|
|
|
|
String attemptKey = getApplicationAttemptNodeKey(appKey, attemptId);
|
|
|
|
|
batch.delete(bytes(attemptKey));
|
|
|
|
|
}
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Removing state for app " + appId + " and "
|
|
|
|
|
+ appState.attempts.size() + " attempts" + " at " + appKey);
|
|
|
|
|
}
|
|
|
|
|
db.write(batch);
|
|
|
|
|
} finally {
|
|
|
|
|
batch.close();
|
|
|
|
|
}
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void storeRMDelegationTokenAndSequenceNumberState(
|
|
|
|
|
RMDelegationTokenIdentifier tokenId, Long renewDate,
|
|
|
|
|
int latestSequenceNumber) throws IOException {
|
|
|
|
|
String tokenKey = getRMDTTokenNodeKey(tokenId);
|
|
|
|
|
RMDelegationTokenIdentifierData tokenData =
|
|
|
|
|
new RMDelegationTokenIdentifierData(tokenId, renewDate);
|
|
|
|
|
ByteArrayOutputStream bs = new ByteArrayOutputStream();
|
|
|
|
|
DataOutputStream ds = new DataOutputStream(bs);
|
|
|
|
|
try {
|
|
|
|
|
ds.writeInt(latestSequenceNumber);
|
|
|
|
|
} finally {
|
|
|
|
|
ds.close();
|
|
|
|
|
}
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Storing token to " + tokenKey);
|
|
|
|
|
LOG.debug("Storing " + latestSequenceNumber + " to "
|
|
|
|
|
+ RM_DT_SEQUENCE_NUMBER_KEY);
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
WriteBatch batch = db.createWriteBatch();
|
|
|
|
|
try {
|
|
|
|
|
batch.put(bytes(tokenKey), tokenData.toByteArray());
|
|
|
|
|
batch.put(bytes(RM_DT_SEQUENCE_NUMBER_KEY), bs.toByteArray());
|
|
|
|
|
db.write(batch);
|
|
|
|
|
} finally {
|
|
|
|
|
batch.close();
|
|
|
|
|
}
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void updateRMDelegationTokenAndSequenceNumberInternal(
|
|
|
|
|
RMDelegationTokenIdentifier tokenId, Long renewDate,
|
|
|
|
|
int latestSequenceNumber) throws IOException {
|
|
|
|
|
storeRMDelegationTokenAndSequenceNumberState(tokenId, renewDate,
|
|
|
|
|
latestSequenceNumber);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void removeRMDelegationTokenState(
|
|
|
|
|
RMDelegationTokenIdentifier tokenId) throws IOException {
|
|
|
|
|
String tokenKey = getRMDTTokenNodeKey(tokenId);
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Removing token at " + tokenKey);
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
db.delete(bytes(tokenKey));
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void storeRMDTMasterKeyState(DelegationKey masterKey)
|
|
|
|
|
throws IOException {
|
|
|
|
|
String dbKey = getRMDTMasterKeyNodeKey(masterKey);
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Storing token master key to " + dbKey);
|
|
|
|
|
}
|
|
|
|
|
ByteArrayOutputStream os = new ByteArrayOutputStream();
|
|
|
|
|
DataOutputStream out = new DataOutputStream(os);
|
|
|
|
|
try {
|
|
|
|
|
masterKey.write(out);
|
|
|
|
|
} finally {
|
|
|
|
|
out.close();
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
db.put(bytes(dbKey), os.toByteArray());
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
protected void removeRMDTMasterKeyState(DelegationKey masterKey)
|
|
|
|
|
throws IOException {
|
|
|
|
|
String dbKey = getRMDTMasterKeyNodeKey(masterKey);
|
|
|
|
|
if (LOG.isDebugEnabled()) {
|
|
|
|
|
LOG.debug("Removing token master key at " + dbKey);
|
|
|
|
|
}
|
|
|
|
|
try {
|
|
|
|
|
db.delete(bytes(dbKey));
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void storeOrUpdateAMRMTokenSecretManagerState(
|
|
|
|
|
AMRMTokenSecretManagerState state, boolean isUpdate) {
|
|
|
|
|
AMRMTokenSecretManagerState data =
|
|
|
|
|
AMRMTokenSecretManagerState.newInstance(state);
|
|
|
|
|
byte[] stateData = data.getProto().toByteArray();
|
|
|
|
|
try {
|
|
|
|
|
db.put(bytes(AMRMTOKEN_SECRET_MANAGER_ROOT), stateData);
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
notifyStoreOperationFailed(e);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void deleteStore() throws IOException {
|
|
|
|
|
Path root = getStorageDir();
|
|
|
|
|
LOG.info("Deleting state database at " + root);
|
|
|
|
|
db.close();
|
|
|
|
|
db = null;
|
|
|
|
|
FileSystem fs = FileSystem.getLocal(getConfig());
|
|
|
|
|
fs.delete(root, true);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@VisibleForTesting
|
|
|
|
|
int getNumEntriesInDatabase() throws IOException {
|
|
|
|
|
int numEntries = 0;
|
|
|
|
|
LeveldbIterator iter = null;
|
|
|
|
|
try {
|
|
|
|
|
iter = new LeveldbIterator(db);
|
|
|
|
|
iter.seekToFirst();
|
|
|
|
|
while (iter.hasNext()) {
|
|
|
|
|
Entry<byte[],byte[]> entry = iter.next();
|
|
|
|
|
LOG.info("entry: " + asString(entry.getKey()));
|
|
|
|
|
++numEntries;
|
|
|
|
|
}
|
|
|
|
|
} catch (DBException e) {
|
|
|
|
|
throw new IOException(e);
|
|
|
|
|
} finally {
|
|
|
|
|
if (iter != null) {
|
|
|
|
|
iter.close();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return numEntries;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static class LeveldbLogger implements Logger {
|
|
|
|
|
private static final Log LOG = LogFactory.getLog(LeveldbLogger.class);
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public void log(String message) {
|
|
|
|
|
LOG.info(message);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|