YARN-2837. Support TimeLine server to recover delegation token when restarting. Contributed by Zhijie Shen

This commit is contained in:
Jian He 2014-12-23 18:23:57 -08:00
parent e1ee0d45ea
commit 149512a837
18 changed files with 1421 additions and 182 deletions

View File

@ -149,6 +149,9 @@ Release 2.7.0 - UNRELEASED
YARN-2970. NodeLabel operations in RMAdmin CLI get missing in help command. YARN-2970. NodeLabel operations in RMAdmin CLI get missing in help command.
(Varun Saxena via junping_du) (Varun Saxena via junping_du)
YARN-2837. Support TimeLine server to recover delegation token when
restarting. (Zhijie Shen via jianhe)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -402,4 +402,9 @@
<Bug pattern="DM_DEFAULT_ENCODING" /> <Bug pattern="DM_DEFAULT_ENCODING" />
</Match> </Match>
<!-- Ignore EI_EXPOSE_REP2 in timeline service -->
<Match>
<Class name="org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils$KeyParser" />
<Bug pattern="EI_EXPOSE_REP2" />
</Match>
</FindBugsFilter> </FindBugsFilter>

View File

@ -1137,7 +1137,7 @@ private static void addDeprecatedKeys() {
* OS environment expansion syntax. * OS environment expansion syntax.
* </p> * </p>
* <p> * <p>
* Note: Use {@link DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH} for * Note: Use {@link #DEFAULT_YARN_CROSS_PLATFORM_APPLICATION_CLASSPATH} for
* cross-platform practice i.e. submit an application from a Windows client to * cross-platform practice i.e. submit an application from a Windows client to
* a Linux/Unix server or vice versa. * a Linux/Unix server or vice versa.
* </p> * </p>
@ -1366,6 +1366,22 @@ private static void addDeprecatedKeys() {
public static final long public static final long
DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS = 1000; DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS = 1000;
/** Flag to enable recovery of timeline service */
public static final String TIMELINE_SERVICE_RECOVERY_ENABLED =
TIMELINE_SERVICE_PREFIX + "recovery.enabled";
public static final boolean DEFAULT_TIMELINE_SERVICE_RECOVERY_ENABLED = false;
/** Timeline service state store class */
public static final String TIMELINE_SERVICE_STATE_STORE_CLASS =
TIMELINE_SERVICE_PREFIX + "state-store-class";
public static final String TIMELINE_SERVICE_LEVELDB_STATE_STORE_PREFIX =
TIMELINE_SERVICE_PREFIX + "leveldb-state-store.";
/** Timeline service state store leveldb path */
public static final String TIMELINE_SERVICE_LEVELDB_STATE_STORE_PATH =
TIMELINE_SERVICE_LEVELDB_STATE_STORE_PREFIX + "path";
// /////////////////////////////// // ///////////////////////////////
// Shared Cache Configs // Shared Cache Configs
// /////////////////////////////// // ///////////////////////////////

View File

@ -1378,6 +1378,26 @@
<value>1000</value> <value>1000</value>
</property> </property>
<property>
<description>Enable timeline server to recover state after starting. If
true, then yarn.timeline-service.state-store-class must be specified.
</description>
<name>yarn.timeline-service.recovery.enabled</name>
<value>false</value>
</property>
<property>
<description>Store class name for timeline state store.</description>
<name>yarn.timeline-service.state-store-class</name>
<value>org.apache.hadoop.yarn.server.timeline.recovery.LeveldbTimelineStateStore</value>
</property>
<property>
<description>Store file name for leveldb state store.</description>
<name>yarn.timeline-service.leveldb-state-store.path</name>
<value>${hadoop.tmp.dir}/yarn/timeline</value>
</property>
<!-- Shared Cache Configuration --> <!-- Shared Cache Configuration -->
<property> <property>
<description>Whether the shared cache is enabled</description> <description>Whether the shared cache is enabled</description>

View File

@ -181,4 +181,40 @@
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
</dependencies> </dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-maven-plugins</artifactId>
<executions>
<execution>
<id>compile-protoc</id>
<phase>generate-sources</phase>
<goals>
<goal>protoc</goal>
</goals>
<configuration>
<protocVersion>${protobuf.version}</protocVersion>
<protocCommand>${protoc.path}</protocCommand>
<imports>
<param>${basedir}/../../../../hadoop-common-project/hadoop-common/src/main/proto</param>
<param>${basedir}/../../hadoop-yarn-api/src/main/proto</param>
<param>${basedir}/../../hadoop-yarn-common/src/main/proto/server/</param>
<param>${basedir}/../hadoop-yarn-server-common/src/main/proto</param>
<param>${basedir}/src/main/proto</param>
</imports>
<source>
<directory>${basedir}/src/main/proto</directory>
<includes>
<include>yarn_server_timelineserver_recovery.proto</include>
</includes>
</source>
<output>${project.build.directory}/generated-sources/java</output>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project> </project>

View File

@ -106,9 +106,8 @@ protected void serviceStart() throws Exception {
} catch(IOException ie) { } catch(IOException ie) {
throw new YarnRuntimeException("Failed to login", ie); throw new YarnRuntimeException("Failed to login", ie);
} }
startWebApp();
super.serviceStart(); super.serviceStart();
startWebApp();
} }
@Override @Override

View File

@ -18,28 +18,8 @@
package org.apache.hadoop.yarn.server.timeline; package org.apache.hadoop.yarn.server.timeline;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; import com.google.common.base.Preconditions;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.collections.map.LRUMap; import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -53,30 +33,30 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.*;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB; import org.iq80.leveldb.*;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.ReadOptions;
import org.iq80.leveldb.WriteBatch;
import org.iq80.leveldb.WriteOptions;
import com.google.common.annotations.VisibleForTesting; import java.io.File;
import com.google.common.base.Preconditions; import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
/** /**
* <p>An implementation of an application timeline store backed by leveldb.</p> * <p>An implementation of an application timeline store backed by leveldb.</p>
@ -357,102 +337,6 @@ synchronized void returnLock(CountingReentrantLock<K> lock) {
} }
} }
private static class KeyBuilder {
private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
private byte[][] b;
private boolean[] useSeparator;
private int index;
private int length;
public KeyBuilder(int size) {
b = new byte[size][];
useSeparator = new boolean[size];
index = 0;
length = 0;
}
public static KeyBuilder newInstance() {
return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
}
public KeyBuilder add(String s) {
return add(s.getBytes(), true);
}
public KeyBuilder add(byte[] t) {
return add(t, false);
}
public KeyBuilder add(byte[] t, boolean sep) {
b[index] = t;
useSeparator[index] = sep;
length += t.length;
if (sep) {
length++;
}
index++;
return this;
}
public byte[] getBytes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
for (int i = 0; i < index; i++) {
baos.write(b[i]);
if (i < index-1 && useSeparator[i]) {
baos.write(0x0);
}
}
return baos.toByteArray();
}
public byte[] getBytesForLookup() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
for (int i = 0; i < index; i++) {
baos.write(b[i]);
if (useSeparator[i]) {
baos.write(0x0);
}
}
return baos.toByteArray();
}
}
private static class KeyParser {
private final byte[] b;
private int offset;
public KeyParser(byte[] b, int offset) {
this.b = b;
this.offset = offset;
}
public String getNextString() throws IOException {
if (offset >= b.length) {
throw new IOException(
"tried to read nonexistent string from byte array");
}
int i = 0;
while (offset+i < b.length && b[offset+i] != 0x0) {
i++;
}
String s = new String(b, offset, i);
offset = offset + i + 1;
return s;
}
public long getNextLong() throws IOException {
if (offset+8 >= b.length) {
throw new IOException("byte array ran out when trying to read long");
}
long l = readReverseOrderedLong(b, offset);
offset += 8;
return l;
}
public int getOffset() {
return offset;
}
}
@Override @Override
public TimelineEntity getEntity(String entityId, String entityType, public TimelineEntity getEntity(String entityId, String entityType,
@ -660,18 +544,6 @@ public int compare(byte[] o1, byte[] o2) {
return events; return events;
} }
/**
* Returns true if the byte array begins with the specified prefix.
*/
private static boolean prefixMatches(byte[] prefix, int prefixlen,
byte[] b) {
if (b.length < prefixlen) {
return false;
}
return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
prefixlen) == 0;
}
@Override @Override
public TimelineEntities getEntities(String entityType, public TimelineEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,

View File

@ -0,0 +1,420 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline.recovery;
import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.recovery.records.TimelineDelegationTokenIdentifierData;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.fusesource.leveldbjni.JniDBFactory;
import org.fusesource.leveldbjni.internal.NativeDB;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
/**
* A timeline service state storage implementation that supports any persistent
* storage that adheres to the LevelDB interface.
*/
public class LeveldbTimelineStateStore extends
TimelineStateStore {
public static final Log LOG =
LogFactory.getLog(LeveldbTimelineStateStore.class);
private static final String DB_NAME = "timeline-state-store.ldb";
private static final FsPermission LEVELDB_DIR_UMASK = FsPermission
.createImmutable((short) 0700);
private static final byte[] TOKEN_ENTRY_PREFIX = bytes("t");
private static final byte[] TOKEN_MASTER_KEY_ENTRY_PREFIX = bytes("k");
private static final byte[] LATEST_SEQUENCE_NUMBER_KEY = bytes("s");
private static final Version CURRENT_VERSION_INFO = Version.newInstance(1, 0);
private static final byte[] TIMELINE_STATE_STORE_VERSION_KEY = bytes("v");
private DB db;
public LeveldbTimelineStateStore() {
super(LeveldbTimelineStateStore.class.getName());
}
@Override
protected void initStorage(Configuration conf) throws IOException {
}
@Override
protected void startStorage() throws IOException {
Options options = new Options();
Path dbPath =
new Path(
getConfig().get(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_STATE_STORE_PATH),
DB_NAME);
FileSystem localFS = null;
try {
localFS = FileSystem.getLocal(getConfig());
if (!localFS.exists(dbPath)) {
if (!localFS.mkdirs(dbPath)) {
throw new IOException("Couldn't create directory for leveldb " +
"timeline store " + dbPath);
}
localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
}
} finally {
IOUtils.cleanup(LOG, localFS);
}
JniDBFactory factory = new JniDBFactory();
try {
options.createIfMissing(false);
db = factory.open(new File(dbPath.toString()), options);
LOG.info("Loading the existing database at th path: " + dbPath.toString());
checkVersion();
} catch (NativeDB.DBException e) {
if (e.isNotFound() || e.getMessage().contains(" does not exist ")) {
try {
options.createIfMissing(true);
db = factory.open(new File(dbPath.toString()), options);
LOG.info("Creating a new database at th path: " + dbPath.toString());
storeVersion(CURRENT_VERSION_INFO);
} catch (DBException ex) {
throw new IOException(ex);
}
} else {
throw new IOException(e);
}
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
protected void closeStorage() throws IOException {
IOUtils.cleanup(LOG, db);
}
@Override
public TimelineServiceState loadState() throws IOException {
LOG.info("Loading timeline service state from leveldb");
TimelineServiceState state = new TimelineServiceState();
int numKeys = loadTokenMasterKeys(state);
int numTokens = loadTokens(state);
loadLatestSequenceNumber(state);
LOG.info("Loaded " + numKeys + " master keys and " + numTokens
+ " tokens from leveldb, and latest sequence number is "
+ state.getLatestSequenceNumber());
return state;
}
@Override
public void storeToken(TimelineDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
DataOutputStream ds = null;
WriteBatch batch = null;
try {
byte[] k = createTokenEntryKey(tokenId.getSequenceNumber());
if (db.get(k) != null) {
throw new IOException(tokenId + " already exists");
}
byte[] v = buildTokenData(tokenId, renewDate);
ByteArrayOutputStream bs = new ByteArrayOutputStream();
ds = new DataOutputStream(bs);
ds.writeInt(tokenId.getSequenceNumber());
batch = db.createWriteBatch();
batch.put(k, v);
batch.put(LATEST_SEQUENCE_NUMBER_KEY, bs.toByteArray());
db.write(batch);
} catch (DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, ds);
IOUtils.cleanup(LOG, batch);
}
}
@Override
public void updateToken(TimelineDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
try {
byte[] k = createTokenEntryKey(tokenId.getSequenceNumber());
if (db.get(k) == null) {
throw new IOException(tokenId + " doesn't exist");
}
byte[] v = buildTokenData(tokenId, renewDate);
db.put(k, v);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void removeToken(TimelineDelegationTokenIdentifier tokenId)
throws IOException {
try {
byte[] key = createTokenEntryKey(tokenId.getSequenceNumber());
db.delete(key);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void storeTokenMasterKey(DelegationKey key) throws IOException {
try {
byte[] k = createTokenMasterKeyEntryKey(key.getKeyId());
if (db.get(k) != null) {
throw new IOException(key + " already exists");
}
byte[] v = buildTokenMasterKeyData(key);
db.put(k, v);
} catch (DBException e) {
throw new IOException(e);
}
}
@Override
public void removeTokenMasterKey(DelegationKey key) throws IOException {
try {
byte[] k = createTokenMasterKeyEntryKey(key.getKeyId());
db.delete(k);
} catch (DBException e) {
throw new IOException(e);
}
}
private static byte[] buildTokenData(
TimelineDelegationTokenIdentifier tokenId, Long renewDate)
throws IOException {
TimelineDelegationTokenIdentifierData data =
new TimelineDelegationTokenIdentifierData(tokenId, renewDate);
return data.toByteArray();
}
private static byte[] buildTokenMasterKeyData(DelegationKey key)
throws IOException {
ByteArrayOutputStream memStream = new ByteArrayOutputStream();
DataOutputStream dataStream = new DataOutputStream(memStream);
try {
key.write(dataStream);
dataStream.close();
} finally {
IOUtils.cleanup(LOG, dataStream);
}
return memStream.toByteArray();
}
private static void loadTokenMasterKeyData(TimelineServiceState state,
byte[] keyData)
throws IOException {
DelegationKey key = new DelegationKey();
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(keyData));
try {
key.readFields(in);
} finally {
IOUtils.cleanup(LOG, in);
}
state.tokenMasterKeyState.add(key);
}
private static void loadTokenData(TimelineServiceState state, byte[] tokenData)
throws IOException {
TimelineDelegationTokenIdentifierData data =
new TimelineDelegationTokenIdentifierData();
DataInputStream in =
new DataInputStream(new ByteArrayInputStream(tokenData));
try {
data.readFields(in);
} finally {
IOUtils.cleanup(LOG, in);
}
state.tokenState.put(data.getTokenIdentifier(), data.getRenewDate());
}
private int loadTokenMasterKeys(TimelineServiceState state)
throws IOException {
byte[] base = KeyBuilder.newInstance().add(TOKEN_MASTER_KEY_ENTRY_PREFIX)
.getBytesForLookup();
int numKeys = 0;
LeveldbIterator iterator = null;
try {
for (iterator = new LeveldbIterator(db), iterator.seek(base);
iterator.hasNext(); iterator.next()) {
byte[] k = iterator.peekNext().getKey();
if (!prefixMatches(base, base.length, k)) {
break;
}
byte[] v = iterator.peekNext().getValue();
loadTokenMasterKeyData(state, v);
++numKeys;
}
} finally {
IOUtils.cleanup(LOG, iterator);
}
return numKeys;
}
private int loadTokens(TimelineServiceState state) throws IOException {
byte[] base = KeyBuilder.newInstance().add(TOKEN_ENTRY_PREFIX)
.getBytesForLookup();
int numTokens = 0;
LeveldbIterator iterator = null;
try {
for (iterator = new LeveldbIterator(db), iterator.seek(base);
iterator.hasNext(); iterator.next()) {
byte[] k = iterator.peekNext().getKey();
if (!prefixMatches(base, base.length, k)) {
break;
}
byte[] v = iterator.peekNext().getValue();
loadTokenData(state, v);
++numTokens;
}
} catch (DBException e) {
throw new IOException(e);
} finally {
IOUtils.cleanup(LOG, iterator);
}
return numTokens;
}
private void loadLatestSequenceNumber(TimelineServiceState state)
throws IOException {
byte[] data = null;
try {
data = db.get(LATEST_SEQUENCE_NUMBER_KEY);
} catch (DBException e) {
throw new IOException(e);
}
if (data != null) {
DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
try {
state.latestSequenceNumber = in.readInt();
} finally {
IOUtils.cleanup(LOG, in);
}
}
}
/**
* Creates a domain entity key with column name suffix, of the form
* TOKEN_ENTRY_PREFIX + sequence number.
*/
private static byte[] createTokenEntryKey(int seqNum) throws IOException {
return KeyBuilder.newInstance().add(TOKEN_ENTRY_PREFIX)
.add(Integer.toString(seqNum)).getBytes();
}
/**
* Creates a domain entity key with column name suffix, of the form
* TOKEN_MASTER_KEY_ENTRY_PREFIX + sequence number.
*/
private static byte[] createTokenMasterKeyEntryKey(int keyId)
throws IOException {
return KeyBuilder.newInstance().add(TOKEN_MASTER_KEY_ENTRY_PREFIX)
.add(Integer.toString(keyId)).getBytes();
}
@VisibleForTesting
Version loadVersion() throws IOException {
try {
byte[] data = db.get(TIMELINE_STATE_STORE_VERSION_KEY);
// if version is not stored previously, treat it as CURRENT_VERSION_INFO.
if (data == null || data.length == 0) {
return getCurrentVersion();
}
Version version =
new VersionPBImpl(
YarnServerCommonProtos.VersionProto.parseFrom(data));
return version;
} catch (DBException e) {
throw new IOException(e);
}
}
@VisibleForTesting
void storeVersion(Version state) throws IOException {
byte[] data =
((VersionPBImpl) state).getProto().toByteArray();
try {
db.put(TIMELINE_STATE_STORE_VERSION_KEY, data);
} catch (DBException e) {
throw new IOException(e);
}
}
@VisibleForTesting
Version getCurrentVersion() {
return CURRENT_VERSION_INFO;
}
/**
* 1) Versioning timeline state store:
* major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
* 2) Any incompatible change of TS-store is a major upgrade, and any
* compatible change of TS-store is a minor upgrade.
* 3) Within a minor upgrade, say 1.1 to 1.2:
* overwrite the version info and proceed as normal.
* 4) Within a major upgrade, say 1.2 to 2.0:
* throw exception and indicate user to use a separate upgrade tool to
* upgrade timeline store or remove incompatible old state.
*/
private void checkVersion() throws IOException {
Version loadedVersion = loadVersion();
LOG.info("Loaded timeline state store version info " + loadedVersion);
if (loadedVersion.equals(getCurrentVersion())) {
return;
}
if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
LOG.info("Storing timeline state store version info " + getCurrentVersion());
storeVersion(CURRENT_VERSION_INFO);
} else {
String incompatibleMessage =
"Incompatible version for timeline state store: expecting version "
+ getCurrentVersion() + ", but loading version " + loadedVersion;
LOG.fatal(incompatibleMessage);
throw new IOException(incompatibleMessage);
}
}
}

View File

@ -0,0 +1,96 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline.recovery;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
/**
* A state store backed by memory for unit tests
*/
public class MemoryTimelineStateStore
extends TimelineStateStore {
private TimelineServiceState state;
@Override
protected void initStorage(Configuration conf) throws IOException {
}
@Override
protected void startStorage() throws IOException {
state = new TimelineServiceState();
}
@Override
protected void closeStorage() throws IOException {
state = null;
}
@Override
public TimelineServiceState loadState() throws IOException {
TimelineServiceState result = new TimelineServiceState();
result.tokenState.putAll(state.tokenState);
result.tokenMasterKeyState.addAll(state.tokenMasterKeyState);
result.latestSequenceNumber = state.latestSequenceNumber;
return result;
}
@Override
public void storeToken(TimelineDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
if (state.tokenState.containsKey(tokenId)) {
throw new IOException("token " + tokenId + " was stored twice");
}
state.tokenState.put(tokenId, renewDate);
state.latestSequenceNumber = tokenId.getSequenceNumber();
}
@Override
public void updateToken(TimelineDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException {
if (!state.tokenState.containsKey(tokenId)) {
throw new IOException("token " + tokenId + " not in store");
}
state.tokenState.put(tokenId, renewDate);
}
@Override
public void removeToken(TimelineDelegationTokenIdentifier tokenId)
throws IOException {
state.tokenState.remove(tokenId);
}
@Override
public void storeTokenMasterKey(DelegationKey key)
throws IOException {
if (state.tokenMasterKeyState.contains(key)) {
throw new IOException("token master key " + key + " was stored twice");
}
state.tokenMasterKeyState.add(key);
}
@Override
public void removeTokenMasterKey(DelegationKey key)
throws IOException {
state.tokenMasterKeyState.remove(key);
}
}

View File

@ -0,0 +1,193 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline.recovery;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
@Private
@Unstable
/**
* Base class for timeline service state storage.
* Storage implementations need to implement blocking store and load methods
* to actually store and load the state.
*/
public abstract class TimelineStateStore extends AbstractService {
public static class TimelineServiceState {
int latestSequenceNumber = 0;
Map<TimelineDelegationTokenIdentifier, Long> tokenState =
new HashMap<TimelineDelegationTokenIdentifier, Long>();
Set<DelegationKey> tokenMasterKeyState = new HashSet<DelegationKey>();
public int getLatestSequenceNumber() {
return latestSequenceNumber;
}
public Map<TimelineDelegationTokenIdentifier, Long> getTokenState() {
return tokenState;
}
public Set<DelegationKey> getTokenMasterKeyState() {
return tokenMasterKeyState;
}
}
public TimelineStateStore() {
super(TimelineStateStore.class.getName());
}
public TimelineStateStore(String name) {
super(name);
}
/**
* Initialize the state storage
*
* @param conf the configuration
* @throws IOException
*/
@Override
public void serviceInit(Configuration conf) throws IOException {
initStorage(conf);
}
/**
* Start the state storage for use
*
* @throws IOException
*/
@Override
public void serviceStart() throws IOException {
startStorage();
}
/**
* Shutdown the state storage.
*
* @throws IOException
*/
@Override
public void serviceStop() throws IOException {
closeStorage();
}
/**
* Implementation-specific initialization.
*
* @param conf the configuration
* @throws IOException
*/
protected abstract void initStorage(Configuration conf) throws IOException;
/**
* Implementation-specific startup.
*
* @throws IOException
*/
protected abstract void startStorage() throws IOException;
/**
* Implementation-specific shutdown.
*
* @throws IOException
*/
protected abstract void closeStorage() throws IOException;
/**
* Load the timeline service state from the state storage.
*
* @throws IOException
*/
public abstract TimelineServiceState loadState() throws IOException;
/**
* Blocking method to store a delegation token along with the current token
* sequence number to the state storage.
*
* Implementations must not return from this method until the token has been
* committed to the state store.
*
* @param tokenId the token to store
* @param renewDate the token renewal deadline
* @throws IOException
*/
public abstract void storeToken(TimelineDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException;
/**
* Blocking method to update the expiration of a delegation token
* in the state storage.
*
* Implementations must not return from this method until the expiration
* date of the token has been updated in the state store.
*
* @param tokenId the token to update
* @param renewDate the new token renewal deadline
* @throws IOException
*/
public abstract void updateToken(TimelineDelegationTokenIdentifier tokenId,
Long renewDate) throws IOException;
/**
* Blocking method to remove a delegation token from the state storage.
*
* Implementations must not return from this method until the token has been
* removed from the state store.
*
* @param tokenId the token to remove
* @throws IOException
*/
public abstract void removeToken(TimelineDelegationTokenIdentifier tokenId)
throws IOException;
/**
* Blocking method to store a delegation token master key.
*
* Implementations must not return from this method until the key has been
* committed to the state store.
*
* @param key the master key to store
* @throws IOException
*/
public abstract void storeTokenMasterKey(
DelegationKey key) throws IOException;
/**
* Blocking method to remove a delegation token master key.
*
* Implementations must not return from this method until the key has been
* removed from the state store.
*
* @param key the master key to remove
* @throws IOException
*/
public abstract void removeTokenMasterKey(DelegationKey key)
throws IOException;
}

View File

@ -0,0 +1,63 @@
package org.apache.hadoop.yarn.server.timeline.recovery.records;
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import org.apache.hadoop.yarn.proto.YarnServerTimelineServerRecoveryProtos.TimelineDelegationTokenIdentifierDataProto;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
public class TimelineDelegationTokenIdentifierData {
TimelineDelegationTokenIdentifierDataProto.Builder builder =
TimelineDelegationTokenIdentifierDataProto.newBuilder();
public TimelineDelegationTokenIdentifierData() {
}
public TimelineDelegationTokenIdentifierData(
TimelineDelegationTokenIdentifier identifier, long renewdate) {
builder.setTokenIdentifier(identifier.getProto());
builder.setRenewDate(renewdate);
}
public void readFields(DataInput in) throws IOException {
builder.mergeFrom((DataInputStream) in);
}
public byte[] toByteArray() throws IOException {
return builder.build().toByteArray();
}
public TimelineDelegationTokenIdentifier getTokenIdentifier()
throws IOException {
ByteArrayInputStream in =
new ByteArrayInputStream(builder.getTokenIdentifier().toByteArray());
TimelineDelegationTokenIdentifier identifer =
new TimelineDelegationTokenIdentifier();
identifer.readFields(new DataInputStream(in));
return identifer;
}
public long getRenewDate() {
return builder.getRenewDate();
}
}

View File

@ -18,33 +18,34 @@
package org.apache.hadoop.yarn.server.timeline.security; package org.apache.hadoop.yarn.server.timeline.security;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier; import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.server.timeline.recovery.LeveldbTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore.TimelineServiceState;
/** /**
* The service wrapper of {@link TimelineDelegationTokenSecretManager} * The service wrapper of {@link TimelineDelegationTokenSecretManager}
*/ */
@Private @Private
@Unstable @Unstable
public class TimelineDelegationTokenSecretManagerService extends AbstractService { public class TimelineDelegationTokenSecretManagerService extends
AbstractService {
private TimelineDelegationTokenSecretManager secretManager = null; private TimelineDelegationTokenSecretManager secretManager = null;
private InetSocketAddress serviceAddr = null; private TimelineStateStore stateStore = null;
public TimelineDelegationTokenSecretManagerService() { public TimelineDelegationTokenSecretManagerService() {
super(TimelineDelegationTokenSecretManagerService.class.getName()); super(TimelineDelegationTokenSecretManagerService.class.getName());
@ -52,6 +53,12 @@ public TimelineDelegationTokenSecretManagerService() {
@Override @Override
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_RECOVERY_ENABLED,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_RECOVERY_ENABLED)) {
stateStore = createStateStore(conf);
stateStore.init(conf);
}
long secretKeyInterval = long secretKeyInterval =
conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY, conf.getLong(YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_KEY,
YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT); YarnConfiguration.DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT);
@ -62,50 +69,77 @@ protected void serviceInit(Configuration conf) throws Exception {
conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT); YarnConfiguration.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT);
secretManager = new TimelineDelegationTokenSecretManager(secretKeyInterval, secretManager = new TimelineDelegationTokenSecretManager(secretKeyInterval,
tokenMaxLifetime, tokenRenewInterval, tokenMaxLifetime, tokenRenewInterval, 3600000, stateStore);
3600000);
secretManager.startThreads();
serviceAddr = TimelineUtils.getTimelineTokenServiceAddress(getConfig());
super.init(conf); super.init(conf);
} }
@Override
protected void serviceStart() throws Exception {
if (stateStore != null) {
stateStore.start();
TimelineServiceState state = stateStore.loadState();
secretManager.recover(state);
}
secretManager.startThreads();
super.serviceStart();
}
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
if (stateStore != null) {
stateStore.stop();
}
secretManager.stopThreads(); secretManager.stopThreads();
super.stop(); super.stop();
} }
/** protected TimelineStateStore createStateStore(
* Ge the instance of {link #TimelineDelegationTokenSecretManager} Configuration conf) {
* @return the instance of {link #TimelineDelegationTokenSecretManager} return ReflectionUtils.newInstance(
*/ conf.getClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
public TimelineDelegationTokenSecretManager getTimelineDelegationTokenSecretManager() { LeveldbTimelineStateStore.class,
return secretManager; TimelineStateStore.class), conf);
} }
/** /**
* Create a timeline secret manager * Ge the instance of {link #TimelineDelegationTokenSecretManager}
* *
* @param delegationKeyUpdateInterval * @return the instance of {link #TimelineDelegationTokenSecretManager}
* the number of seconds for rolling new secret keys.
* @param delegationTokenMaxLifetime
* the maximum lifetime of the delegation tokens
* @param delegationTokenRenewInterval
* how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval
* how often the tokens are scanned for expired tokens
*/ */
public TimelineDelegationTokenSecretManager
getTimelineDelegationTokenSecretManager() {
return secretManager;
}
@Private @Private
@Unstable @Unstable
public static class TimelineDelegationTokenSecretManager extends public static class TimelineDelegationTokenSecretManager extends
AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> { AbstractDelegationTokenSecretManager<TimelineDelegationTokenIdentifier> {
public TimelineDelegationTokenSecretManager(long delegationKeyUpdateInterval, public static final Log LOG =
long delegationTokenMaxLifetime, long delegationTokenRenewInterval, LogFactory.getLog(TimelineDelegationTokenSecretManager.class);
long delegationTokenRemoverScanInterval) {
private TimelineStateStore stateStore;
/**
* Create a timeline secret manager
*
* @param delegationKeyUpdateInterval the number of seconds for rolling new secret keys.
* @param delegationTokenMaxLifetime the maximum lifetime of the delegation tokens
* @param delegationTokenRenewInterval how often the tokens must be renewed
* @param delegationTokenRemoverScanInterval how often the tokens are scanned for expired tokens
*/
public TimelineDelegationTokenSecretManager(
long delegationKeyUpdateInterval,
long delegationTokenMaxLifetime,
long delegationTokenRenewInterval,
long delegationTokenRemoverScanInterval,
TimelineStateStore stateStore) {
super(delegationKeyUpdateInterval, delegationTokenMaxLifetime, super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
delegationTokenRenewInterval, delegationTokenRemoverScanInterval); delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
this.stateStore = stateStore;
} }
@Override @Override
@ -113,6 +147,90 @@ public TimelineDelegationTokenIdentifier createIdentifier() {
return new TimelineDelegationTokenIdentifier(); return new TimelineDelegationTokenIdentifier();
} }
@Override
protected void storeNewMasterKey(DelegationKey key) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing master key " + key.getKeyId());
}
try {
if (stateStore != null) {
stateStore.storeTokenMasterKey(key);
}
} catch (IOException e) {
LOG.error("Unable to store master key " + key.getKeyId(), e);
}
}
@Override
protected void removeStoredMasterKey(DelegationKey key) {
if (LOG.isDebugEnabled()) {
LOG.debug("Removing master key " + key.getKeyId());
}
try {
if (stateStore != null) {
stateStore.removeTokenMasterKey(key);
}
} catch (IOException e) {
LOG.error("Unable to remove master key " + key.getKeyId(), e);
}
}
@Override
protected void storeNewToken(TimelineDelegationTokenIdentifier tokenId,
long renewDate) {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing token " + tokenId.getSequenceNumber());
}
try {
if (stateStore != null) {
stateStore.storeToken(tokenId, renewDate);
}
} catch (IOException e) {
LOG.error("Unable to store token " + tokenId.getSequenceNumber(), e);
}
}
@Override
protected void removeStoredToken(TimelineDelegationTokenIdentifier tokenId)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Storing token " + tokenId.getSequenceNumber());
}
try {
if (stateStore != null) {
stateStore.removeToken(tokenId);
}
} catch (IOException e) {
LOG.error("Unable to remove token " + tokenId.getSequenceNumber(), e);
}
}
@Override
protected void updateStoredToken(TimelineDelegationTokenIdentifier tokenId,
long renewDate) {
if (LOG.isDebugEnabled()) {
LOG.debug("Updating token " + tokenId.getSequenceNumber());
}
try {
if (stateStore != null) {
stateStore.updateToken(tokenId, renewDate);
}
} catch (IOException e) {
LOG.error("Unable to update token " + tokenId.getSequenceNumber(), e);
}
}
public void recover(TimelineServiceState state) throws IOException {
LOG.info("Recovering " + getClass().getSimpleName());
for (DelegationKey key : state.getTokenMasterKeyState()) {
addKey(key);
}
this.delegationTokenSequenceNumber = state.getLatestSequenceNumber();
for (Entry<TimelineDelegationTokenIdentifier, Long> entry :
state.getTokenState().entrySet()) {
addPersistedDelegationToken(entry.getKey(), entry.getValue());
}
}
} }
} }

View File

@ -0,0 +1,141 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline.util;
import org.apache.hadoop.io.WritableComparator;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
public class LeveldbUtils {
public static class KeyBuilder {
private static final int MAX_NUMBER_OF_KEY_ELEMENTS = 10;
private byte[][] b;
private boolean[] useSeparator;
private int index;
private int length;
public KeyBuilder(int size) {
b = new byte[size][];
useSeparator = new boolean[size];
index = 0;
length = 0;
}
public static KeyBuilder newInstance() {
return new KeyBuilder(MAX_NUMBER_OF_KEY_ELEMENTS);
}
public KeyBuilder add(String s) {
return add(s.getBytes(Charset.forName("UTF-8")), true);
}
public KeyBuilder add(byte[] t) {
return add(t, false);
}
public KeyBuilder add(byte[] t, boolean sep) {
b[index] = t;
useSeparator[index] = sep;
length += t.length;
if (sep) {
length++;
}
index++;
return this;
}
public byte[] getBytes() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
for (int i = 0; i < index; i++) {
baos.write(b[i]);
if (i < index - 1 && useSeparator[i]) {
baos.write(0x0);
}
}
return baos.toByteArray();
}
public byte[] getBytesForLookup() throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(length);
for (int i = 0; i < index; i++) {
baos.write(b[i]);
if (useSeparator[i]) {
baos.write(0x0);
}
}
return baos.toByteArray();
}
}
public static class KeyParser {
private final byte[] b;
private int offset;
public KeyParser(byte[] b, int offset) {
this.b = b;
this.offset = offset;
}
public String getNextString() throws IOException {
if (offset >= b.length) {
throw new IOException(
"tried to read nonexistent string from byte array");
}
int i = 0;
while (offset + i < b.length && b[offset + i] != 0x0) {
i++;
}
String s = new String(b, offset, i, Charset.forName("UTF-8"));
offset = offset + i + 1;
return s;
}
public long getNextLong() throws IOException {
if (offset + 8 >= b.length) {
throw new IOException("byte array ran out when trying to read long");
}
long l = readReverseOrderedLong(b, offset);
offset += 8;
return l;
}
public int getOffset() {
return offset;
}
}
/**
* Returns true if the byte array begins with the specified prefix.
*/
public static boolean prefixMatches(byte[] prefix, int prefixlen,
byte[] b) {
if (b.length < prefixlen) {
return false;
}
return WritableComparator.compareBytes(prefix, 0, prefixlen, b, 0,
prefixlen) == 0;
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
option java_package = "org.apache.hadoop.yarn.proto";
option java_outer_classname = "YarnServerTimelineServerRecoveryProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;
import "yarn_security_token.proto";
message TimelineDelegationTokenIdentifierDataProto {
optional YARNDelegationTokenIdentifierProto token_identifier = 1;
optional int64 renewDate = 2;
}

View File

@ -31,6 +31,8 @@
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp; import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer; import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
@ -48,6 +50,8 @@ public void testStartStopServer() throws Exception {
Configuration config = new YarnConfiguration(); Configuration config = new YarnConfiguration();
config.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, config.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class); MemoryTimelineStore.class, TimelineStore.class);
config.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:0"); config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:0");
try { try {
try { try {
@ -128,6 +132,8 @@ public void testFilterOverrides() throws Exception {
Configuration config = new YarnConfiguration(); Configuration config = new YarnConfiguration();
config.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, config.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class); MemoryTimelineStore.class, TimelineStore.class);
config.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:0"); config.set(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS, "localhost:0");
try { try {
config.set("hadoop.http.filter.initializers", filterInitializer); config.set("hadoop.http.filter.initializers", filterInitializer);

View File

@ -0,0 +1,213 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline.recovery;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore.TimelineServiceState;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestLeveldbTimelineStateStore {
private FileContext fsContext;
private File fsPath;
private Configuration conf;
private TimelineStateStore store;
@Before
public void setup() throws Exception {
fsPath = new File("target", getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile();
fsContext = FileContext.getLocalFSFileContext();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
conf = new YarnConfiguration();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_RECOVERY_ENABLED, true);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
LeveldbTimelineStateStore.class,
TimelineStateStore.class);
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_STATE_STORE_PATH,
fsPath.getAbsolutePath());
}
@After
public void tearDown() throws Exception {
if (store != null) {
store.stop();
}
if (fsContext != null) {
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
}
}
private LeveldbTimelineStateStore initAndStartTimelineServiceStateStoreService() {
store = new LeveldbTimelineStateStore();
store.init(conf);
store.start();
return (LeveldbTimelineStateStore) store;
}
@Test
public void testTokenStore() throws Exception {
initAndStartTimelineServiceStateStoreService();
TimelineServiceState state = store.loadState();
assertTrue("token state not empty", state.tokenState.isEmpty());
assertTrue("key state not empty", state.tokenMasterKeyState.isEmpty());
final DelegationKey key1 = new DelegationKey(1, 2, "keyData1".getBytes());
final TimelineDelegationTokenIdentifier token1 =
new TimelineDelegationTokenIdentifier(new Text("tokenOwner1"),
new Text("tokenRenewer1"), new Text("tokenUser1"));
token1.setSequenceNumber(1);
token1.getBytes();
final Long tokenDate1 = 1L;
final TimelineDelegationTokenIdentifier token2 =
new TimelineDelegationTokenIdentifier(new Text("tokenOwner2"),
new Text("tokenRenewer2"), new Text("tokenUser2"));
token2.setSequenceNumber(12345678);
token2.getBytes();
final Long tokenDate2 = 87654321L;
store.storeTokenMasterKey(key1);
try {
store.storeTokenMasterKey(key1);
fail("redundant store of key undetected");
} catch (IOException e) {
// expected
}
store.storeToken(token1, tokenDate1);
store.storeToken(token2, tokenDate2);
try {
store.storeToken(token1, tokenDate1);
fail("redundant store of token undetected");
} catch (IOException e) {
// expected
}
store.close();
initAndStartTimelineServiceStateStoreService();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertTrue("missing token 1", state.tokenState.containsKey(token1));
assertEquals("incorrect token 1 date", tokenDate1,
state.tokenState.get(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", tokenDate2,
state.tokenState.get(token2));
assertEquals("incorrect master key count", 1,
state.tokenMasterKeyState.size());
assertTrue("missing master key 1",
state.tokenMasterKeyState.contains(key1));
assertEquals("incorrect latest sequence number", 12345678,
state.getLatestSequenceNumber());
final DelegationKey key2 = new DelegationKey(3, 4, "keyData2".getBytes());
final DelegationKey key3 = new DelegationKey(5, 6, "keyData3".getBytes());
final TimelineDelegationTokenIdentifier token3 =
new TimelineDelegationTokenIdentifier(new Text("tokenOwner3"),
new Text("tokenRenewer3"), new Text("tokenUser3"));
token3.setSequenceNumber(12345679);
token3.getBytes();
final Long tokenDate3 = 87654321L;
store.removeToken(token1);
store.storeTokenMasterKey(key2);
final Long newTokenDate2 = 975318642L;
store.updateToken(token2, newTokenDate2);
store.removeTokenMasterKey(key1);
store.storeTokenMasterKey(key3);
store.storeToken(token3, tokenDate3);
store.close();
initAndStartTimelineServiceStateStoreService();
state = store.loadState();
assertEquals("incorrect loaded token count", 2, state.tokenState.size());
assertFalse("token 1 not removed", state.tokenState.containsKey(token1));
assertTrue("missing token 2", state.tokenState.containsKey(token2));
assertEquals("incorrect token 2 date", newTokenDate2,
state.tokenState.get(token2));
assertTrue("missing token 3", state.tokenState.containsKey(token3));
assertEquals("incorrect token 3 date", tokenDate3,
state.tokenState.get(token3));
assertEquals("incorrect master key count", 2,
state.tokenMasterKeyState.size());
assertFalse("master key 1 not removed",
state.tokenMasterKeyState.contains(key1));
assertTrue("missing master key 2",
state.tokenMasterKeyState.contains(key2));
assertTrue("missing master key 3",
state.tokenMasterKeyState.contains(key3));
assertEquals("incorrect latest sequence number", 12345679,
state.getLatestSequenceNumber());
store.close();
}
@Test
public void testCheckVersion() throws IOException {
LeveldbTimelineStateStore store =
initAndStartTimelineServiceStateStoreService();
// default version
Version defaultVersion = store.getCurrentVersion();
Assert.assertEquals(defaultVersion, store.loadVersion());
// compatible version
Version compatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion(),
defaultVersion.getMinorVersion() + 2);
store.storeVersion(compatibleVersion);
Assert.assertEquals(compatibleVersion, store.loadVersion());
store.stop();
// overwrite the compatible version
store = initAndStartTimelineServiceStateStoreService();
Assert.assertEquals(defaultVersion, store.loadVersion());
// incompatible version
Version incompatibleVersion =
Version.newInstance(defaultVersion.getMajorVersion() + 1,
defaultVersion.getMinorVersion());
store.storeVersion(incompatibleVersion);
store.stop();
try {
initAndStartTimelineServiceStateStoreService();
Assert.fail("Incompatible version, should expect fail here.");
} catch (ServiceStateException e) {
Assert.assertTrue("Exception message mismatch",
e.getMessage().contains("Incompatible version for timeline state store"));
}
}
}

View File

@ -50,6 +50,8 @@
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -68,6 +70,8 @@ public static void setup() throws Exception {
conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true); conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class); MemoryTimelineStore.class, TimelineStore.class);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
conf.setInt( conf.setInt(
YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
2); 2);

View File

@ -71,6 +71,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptUnregistrationEvent;
import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
import org.apache.hadoop.yarn.server.timeline.TimelineStore; import org.apache.hadoop.yarn.server.timeline.TimelineStore;
import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
@ -682,6 +684,8 @@ protected synchronized void serviceInit(Configuration conf)
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class); MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
MemoryTimelineStore.class, TimelineStore.class); MemoryTimelineStore.class, TimelineStore.class);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
MemoryTimelineStateStore.class, TimelineStateStore.class);
appHistoryServer.init(conf); appHistoryServer.init(conf);
super.serviceInit(conf); super.serviceInit(conf);
} }