YARN-9063. ATS 1.5 fails to start if RollingLevelDb files are corrupt or missing (#3728)

Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
Ashutosh Gupta 2021-12-06 16:15:44 +05:30 committed by GitHub
parent 1509ea5de2
commit 5a950b8900
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 91 additions and 24 deletions

View File

@ -21,7 +21,6 @@
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import org.apache.commons.collections.map.LRUMap; import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -32,7 +31,6 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.*; import org.apache.hadoop.yarn.api.records.timeline.*;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
@ -41,6 +39,7 @@
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder; import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser; import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
@ -48,7 +47,6 @@
import org.iq80.leveldb.*; import org.iq80.leveldb.*;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.*; import java.util.*;
@ -242,19 +240,7 @@ protected void serviceInit(Configuration conf) throws Exception {
IOUtils.cleanupWithLogger(LOG, localFS); IOUtils.cleanupWithLogger(LOG, localFS);
} }
LOG.info("Using leveldb path " + dbPath); LOG.info("Using leveldb path " + dbPath);
try { db = LeveldbUtils.loadOrRepairLevelDb(factory, dbPath, options);
db = factory.open(new File(dbPath.toString()), options);
} catch (IOException ioe) {
File dbFile = new File(dbPath.toString());
File backupPath = new File(
dbPath.toString() + BACKUP_EXT + Time.monotonicNow());
LOG.warn("Incurred exception while loading LevelDb database. Backing " +
"up at "+ backupPath, ioe);
FileUtils.copyDirectory(dbFile, backupPath);
LOG.warn("Going to try repair");
factory.repair(dbFile, options);
db = factory.open(dbFile, options);
}
checkVersion(); checkVersion();
startTimeWriteCache = startTimeWriteCache =
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(

View File

@ -21,7 +21,6 @@
import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions; import org.apache.hadoop.util.Preconditions;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
@ -62,6 +61,7 @@
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch; import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder; import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser; import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
@ -199,6 +199,11 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
static final String STARTTIME = "starttime-ldb"; static final String STARTTIME = "starttime-ldb";
static final String OWNER = "owner-ldb"; static final String OWNER = "owner-ldb";
@VisibleForTesting
//Extension to FILENAME where backup will be stored in case we need to
//call LevelDb recovery
static final String BACKUP_EXT = ".backup-";
private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8); private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8); private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8); private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
@ -240,6 +245,12 @@ public RollingLevelDBTimelineStore() {
super(RollingLevelDBTimelineStore.class.getName()); super(RollingLevelDBTimelineStore.class.getName());
} }
private JniDBFactory factory;
@VisibleForTesting
void setFactory(JniDBFactory fact) {
this.factory = fact;
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
@ -284,7 +295,9 @@ protected void serviceInit(Configuration conf) throws Exception {
options.cacheSize(conf.getLong( options.cacheSize(conf.getLong(
TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
JniDBFactory factory = new JniDBFactory(); if(factory == null) {
factory = new JniDBFactory();
}
Path dbPath = new Path( Path dbPath = new Path(
conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME); conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
Path domainDBPath = new Path(dbPath, DOMAIN); Path domainDBPath = new Path(dbPath, DOMAIN);
@ -327,13 +340,13 @@ protected void serviceInit(Configuration conf) throws Exception {
TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE, TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE)); DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
LOG.info("Using leveldb path " + dbPath); LOG.info("Using leveldb path " + dbPath);
domaindb = factory.open(new File(domainDBPath.toString()), options); domaindb = LeveldbUtils.loadOrRepairLevelDb(factory, domainDBPath, options);
entitydb = new RollingLevelDB(ENTITY); entitydb = new RollingLevelDB(ENTITY);
entitydb.init(conf); entitydb.init(conf);
indexdb = new RollingLevelDB(INDEX); indexdb = new RollingLevelDB(INDEX);
indexdb.init(conf); indexdb.init(conf);
starttimedb = factory.open(new File(starttimeDBPath.toString()), options); starttimedb = LeveldbUtils.loadOrRepairLevelDb(factory, starttimeDBPath, options);
ownerdb = factory.open(new File(ownerDBPath.toString()), options); ownerdb = LeveldbUtils.loadOrRepairLevelDb(factory, ownerDBPath, options);
checkVersion(); checkVersion();
startTimeWriteCache = Collections.synchronizedMap(new LRUMap( startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
getStartTimeWriteCacheSize(conf))); getStartTimeWriteCacheSize(conf)));

View File

@ -19,16 +19,30 @@
package org.apache.hadoop.yarn.server.timeline.util; package org.apache.hadoop.yarn.server.timeline.util;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.util.Time;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.nio.charset.StandardCharsets.UTF_8; import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
public class LeveldbUtils { public class LeveldbUtils {
private static final String BACKUP_EXT = ".backup-";
private static final Logger LOG = LoggerFactory
.getLogger(LeveldbUtils.class);
/** A string builder utility for building timeline server leveldb keys. */ /** A string builder utility for building timeline server leveldb keys. */
public static class KeyBuilder { public static class KeyBuilder {
/** Maximum subkeys that can be added to construct a key. */ /** Maximum subkeys that can be added to construct a key. */
@ -184,4 +198,22 @@ public static boolean prefixMatches(byte[] prefix, int prefixlen,
public static final FsPermission LEVELDB_DIR_UMASK = FsPermission public static final FsPermission LEVELDB_DIR_UMASK = FsPermission
.createImmutable((short) 0700); .createImmutable((short) 0700);
public static DB loadOrRepairLevelDb(JniDBFactory factory, Path dbPath, Options options)
throws IOException {
DB db;
try{
db = factory.open(new File(dbPath.toString()), options);
} catch (IOException ioe){
File dbFile = new File(dbPath.toString());
File dbBackupPath = new File(
dbPath.toString() + BACKUP_EXT + Time.monotonicNow());
LOG.warn("Incurred exception while loading LevelDb database. Backing " +
"up at "+ dbBackupPath, ioe);
FileUtils.copyDirectory(dbFile, dbBackupPath);
factory.repair(dbFile, options);
db = factory.open(dbFile, options);
}
return db;
}
} }

View File

@ -21,8 +21,10 @@
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import java.io.File; import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -38,11 +40,15 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.Options;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.mockito.Mockito;
/** Test class to verify RollingLevelDBTimelineStore. */ /** Test class to verify RollingLevelDBTimelineStore. */
@InterfaceAudience.Private @InterfaceAudience.Private
@ -417,6 +423,36 @@ public void testStorePerformance() throws IOException {
Log.getLog().info("Duration for " + num + ": " + duration); Log.getLog().info("Duration for " + num + ": " + duration);
} }
@Test
/**
* Test that RollingLevelDb repair is attempted at least once during
* serviceInit for RollingLeveldbTimelineStore in case open fails the
* first time.
*/ public void testLevelDbRepair() throws IOException {
RollingLevelDBTimelineStore store = new RollingLevelDBTimelineStore();
JniDBFactory factory = Mockito.mock(JniDBFactory.class);
Mockito.when(factory.open(Mockito.any(File.class), Mockito.any(Options.class)))
.thenThrow(new IOException()).thenCallRealMethod();
store.setFactory(factory);
//Create the LevelDb in a different location
File path = new File("target", this.getClass().getSimpleName() + "-tmpDir2").getAbsoluteFile();
Configuration conf = new Configuration(this.config);
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, path.getAbsolutePath());
try {
store.init(conf);
Mockito.verify(factory, Mockito.times(1))
.repair(Mockito.any(File.class), Mockito.any(Options.class));
FilenameFilter fileFilter =
new WildcardFileFilter("*" + RollingLevelDBTimelineStore.BACKUP_EXT + "*");
Assert.assertTrue(new File(path.getAbsolutePath(), RollingLevelDBTimelineStore.FILENAME)
.list(fileFilter).length > 0);
} finally {
store.close();
fsContext.delete(new Path(path.getAbsolutePath()), true);
}
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
TestRollingLevelDBTimelineStore store = TestRollingLevelDBTimelineStore store =
new TestRollingLevelDBTimelineStore(); new TestRollingLevelDBTimelineStore();