YARN-6354. LeveldbRMStateStore can parse invalid keys when recovering reservations. Contributed by Jason Lowe
(cherry picked from commit 318bfb01bc
)
This commit is contained in:
parent
14e5a8ed28
commit
800d632caa
|
@ -214,6 +214,11 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
return db == null;
|
return db == null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
DB getDatabase() {
|
||||||
|
return db;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Version loadVersion() throws Exception {
|
protected Version loadVersion() throws Exception {
|
||||||
Version version = null;
|
Version version = null;
|
||||||
|
@ -284,6 +289,9 @@ public class LeveldbRMStateStore extends RMStateStore {
|
||||||
while (iter.hasNext()) {
|
while (iter.hasNext()) {
|
||||||
Entry<byte[],byte[]> entry = iter.next();
|
Entry<byte[],byte[]> entry = iter.next();
|
||||||
String key = asString(entry.getKey());
|
String key = asString(entry.getKey());
|
||||||
|
if (!key.startsWith(RM_RESERVATION_KEY_PREFIX)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
String planReservationString =
|
String planReservationString =
|
||||||
key.substring(RM_RESERVATION_KEY_PREFIX.length());
|
key.substring(RM_RESERVATION_KEY_PREFIX.length());
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
import org.iq80.leveldb.DB;
|
import org.iq80.leveldb.DB;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
@ -119,17 +120,27 @@ public class TestLeveldbRMStateStore extends RMStateStoreTestBase {
|
||||||
public void testCompactionCycle() throws Exception {
|
public void testCompactionCycle() throws Exception {
|
||||||
final DB mockdb = mock(DB.class);
|
final DB mockdb = mock(DB.class);
|
||||||
conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1);
|
conf.setLong(YarnConfiguration.RM_LEVELDB_COMPACTION_INTERVAL_SECS, 1);
|
||||||
LeveldbRMStateStore store = new LeveldbRMStateStore() {
|
stateStore = new LeveldbRMStateStore() {
|
||||||
@Override
|
@Override
|
||||||
protected DB openDatabase() throws Exception {
|
protected DB openDatabase() throws Exception {
|
||||||
return mockdb;
|
return mockdb;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
store.init(conf);
|
stateStore.init(conf);
|
||||||
store.start();
|
stateStore.start();
|
||||||
verify(mockdb, timeout(10000)).compactRange(
|
verify(mockdb, timeout(10000)).compactRange(
|
||||||
(byte[]) isNull(), (byte[]) isNull());
|
(byte[]) isNull(), (byte[]) isNull());
|
||||||
store.close();
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testBadKeyIteration() throws Exception {
|
||||||
|
stateStore = new LeveldbRMStateStore();
|
||||||
|
stateStore.init(conf);
|
||||||
|
stateStore.start();
|
||||||
|
DB db = stateStore.getDatabase();
|
||||||
|
// add an entry that appears at the end of the database when iterating
|
||||||
|
db.put(JniDBFactory.bytes("zzz"), JniDBFactory.bytes("z"));
|
||||||
|
stateStore.loadState();
|
||||||
}
|
}
|
||||||
|
|
||||||
class LeveldbStateStoreTester implements RMStateStoreHelper {
|
class LeveldbStateStoreTester implements RMStateStoreHelper {
|
||||||
|
|
Loading…
Reference in New Issue