YARN-9063. ATS 1.5 fails to start if RollingLevelDb files are corrupt or missing (#3728)
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
(cherry picked from commit 5a950b8900
)
This commit is contained in:
parent
63495978b2
commit
53f76b9b1c
|
@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.timeline;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.collections.map.LRUMap;
|
import org.apache.commons.collections.map.LRUMap;
|
||||||
import org.apache.commons.io.FileUtils;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
@ -32,7 +31,6 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.WritableComparator;
|
import org.apache.hadoop.io.WritableComparator;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.util.Time;
|
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.*;
|
import org.apache.hadoop.yarn.api.records.timeline.*;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||||
|
@ -41,6 +39,7 @@ import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
|
||||||
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
|
||||||
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
|
||||||
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||||
|
@ -48,7 +47,6 @@ import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
import org.iq80.leveldb.*;
|
import org.iq80.leveldb.*;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.nio.charset.Charset;
|
import java.nio.charset.Charset;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
@ -242,19 +240,7 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
IOUtils.cleanupWithLogger(LOG, localFS);
|
IOUtils.cleanupWithLogger(LOG, localFS);
|
||||||
}
|
}
|
||||||
LOG.info("Using leveldb path " + dbPath);
|
LOG.info("Using leveldb path " + dbPath);
|
||||||
try {
|
db = LeveldbUtils.loadOrRepairLevelDb(factory, dbPath, options);
|
||||||
db = factory.open(new File(dbPath.toString()), options);
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
File dbFile = new File(dbPath.toString());
|
|
||||||
File backupPath = new File(
|
|
||||||
dbPath.toString() + BACKUP_EXT + Time.monotonicNow());
|
|
||||||
LOG.warn("Incurred exception while loading LevelDb database. Backing " +
|
|
||||||
"up at "+ backupPath, ioe);
|
|
||||||
FileUtils.copyDirectory(dbFile, backupPath);
|
|
||||||
LOG.warn("Going to try repair");
|
|
||||||
factory.repair(dbFile, options);
|
|
||||||
db = factory.open(dbFile, options);
|
|
||||||
}
|
|
||||||
checkVersion();
|
checkVersion();
|
||||||
startTimeWriteCache =
|
startTimeWriteCache =
|
||||||
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
|
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.yarn.server.timeline;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
|
||||||
import java.io.File;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -62,6 +61,7 @@ import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
|
import org.apache.hadoop.yarn.server.timeline.RollingLevelDB.RollingWriteBatch;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils;
|
||||||
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
|
||||||
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
|
||||||
|
|
||||||
|
@ -199,6 +199,11 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
||||||
static final String STARTTIME = "starttime-ldb";
|
static final String STARTTIME = "starttime-ldb";
|
||||||
static final String OWNER = "owner-ldb";
|
static final String OWNER = "owner-ldb";
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
//Extension to FILENAME where backup will be stored in case we need to
|
||||||
|
//call LevelDb recovery
|
||||||
|
static final String BACKUP_EXT = ".backup-";
|
||||||
|
|
||||||
private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
|
private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(UTF_8);
|
||||||
private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
|
private static final byte[] EVENTS_COLUMN = "e".getBytes(UTF_8);
|
||||||
private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
|
private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(UTF_8);
|
||||||
|
@ -240,6 +245,12 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
||||||
super(RollingLevelDBTimelineStore.class.getName());
|
super(RollingLevelDBTimelineStore.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private JniDBFactory factory;
|
||||||
|
@VisibleForTesting
|
||||||
|
void setFactory(JniDBFactory fact) {
|
||||||
|
this.factory = fact;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
@ -284,7 +295,9 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
||||||
options.cacheSize(conf.getLong(
|
options.cacheSize(conf.getLong(
|
||||||
TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
|
TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
|
||||||
DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
|
DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
|
||||||
JniDBFactory factory = new JniDBFactory();
|
if(factory == null) {
|
||||||
|
factory = new JniDBFactory();
|
||||||
|
}
|
||||||
Path dbPath = new Path(
|
Path dbPath = new Path(
|
||||||
conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
|
conf.get(TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
|
||||||
Path domainDBPath = new Path(dbPath, DOMAIN);
|
Path domainDBPath = new Path(dbPath, DOMAIN);
|
||||||
|
@ -327,13 +340,13 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
||||||
TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
|
TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE,
|
||||||
DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
|
DEFAULT_TIMELINE_SERVICE_LEVELDB_WRITE_BUFFER_SIZE));
|
||||||
LOG.info("Using leveldb path " + dbPath);
|
LOG.info("Using leveldb path " + dbPath);
|
||||||
domaindb = factory.open(new File(domainDBPath.toString()), options);
|
domaindb = LeveldbUtils.loadOrRepairLevelDb(factory, domainDBPath, options);
|
||||||
entitydb = new RollingLevelDB(ENTITY);
|
entitydb = new RollingLevelDB(ENTITY);
|
||||||
entitydb.init(conf);
|
entitydb.init(conf);
|
||||||
indexdb = new RollingLevelDB(INDEX);
|
indexdb = new RollingLevelDB(INDEX);
|
||||||
indexdb.init(conf);
|
indexdb.init(conf);
|
||||||
starttimedb = factory.open(new File(starttimeDBPath.toString()), options);
|
starttimedb = LeveldbUtils.loadOrRepairLevelDb(factory, starttimeDBPath, options);
|
||||||
ownerdb = factory.open(new File(ownerDBPath.toString()), options);
|
ownerdb = LeveldbUtils.loadOrRepairLevelDb(factory, ownerDBPath, options);
|
||||||
checkVersion();
|
checkVersion();
|
||||||
startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
|
startTimeWriteCache = Collections.synchronizedMap(new LRUMap(
|
||||||
getStartTimeWriteCacheSize(conf)));
|
getStartTimeWriteCacheSize(conf)));
|
||||||
|
@ -346,7 +359,7 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
||||||
|
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
|
if (getConfig().getBoolean(TIMELINE_SERVICE_TTL_ENABLE, true)) {
|
||||||
|
@ -1816,4 +1829,4 @@ public class RollingLevelDBTimelineStore extends AbstractService implements
|
||||||
return domain;
|
return domain;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -19,16 +19,30 @@
|
||||||
package org.apache.hadoop.yarn.server.timeline.util;
|
package org.apache.hadoop.yarn.server.timeline.util;
|
||||||
|
|
||||||
|
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.WritableComparator;
|
import org.apache.hadoop.io.WritableComparator;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
|
import org.iq80.leveldb.DB;
|
||||||
|
import org.iq80.leveldb.Options;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import static java.nio.charset.StandardCharsets.UTF_8;
|
import static java.nio.charset.StandardCharsets.UTF_8;
|
||||||
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
|
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
|
||||||
|
|
||||||
public class LeveldbUtils {
|
public class LeveldbUtils {
|
||||||
|
|
||||||
|
private static final String BACKUP_EXT = ".backup-";
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(LeveldbUtils.class);
|
||||||
|
|
||||||
/** A string builder utility for building timeline server leveldb keys. */
|
/** A string builder utility for building timeline server leveldb keys. */
|
||||||
public static class KeyBuilder {
|
public static class KeyBuilder {
|
||||||
/** Maximum subkeys that can be added to construct a key. */
|
/** Maximum subkeys that can be added to construct a key. */
|
||||||
|
@ -184,4 +198,22 @@ public class LeveldbUtils {
|
||||||
public static final FsPermission LEVELDB_DIR_UMASK = FsPermission
|
public static final FsPermission LEVELDB_DIR_UMASK = FsPermission
|
||||||
.createImmutable((short) 0700);
|
.createImmutable((short) 0700);
|
||||||
|
|
||||||
|
public static DB loadOrRepairLevelDb(JniDBFactory factory, Path dbPath, Options options)
|
||||||
|
throws IOException {
|
||||||
|
DB db;
|
||||||
|
try{
|
||||||
|
db = factory.open(new File(dbPath.toString()), options);
|
||||||
|
} catch (IOException ioe){
|
||||||
|
File dbFile = new File(dbPath.toString());
|
||||||
|
File dbBackupPath = new File(
|
||||||
|
dbPath.toString() + BACKUP_EXT + Time.monotonicNow());
|
||||||
|
LOG.warn("Incurred exception while loading LevelDb database. Backing " +
|
||||||
|
"up at "+ dbBackupPath, ioe);
|
||||||
|
FileUtils.copyDirectory(dbFile, dbBackupPath);
|
||||||
|
factory.repair(dbFile, options);
|
||||||
|
db = factory.open(dbFile, options);
|
||||||
|
}
|
||||||
|
return db;
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,8 +21,10 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.commons.io.filefilter.WildcardFileFilter;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -38,11 +40,15 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
|
|
||||||
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
|
import org.iq80.leveldb.Options;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.eclipse.jetty.util.log.Log;
|
import org.eclipse.jetty.util.log.Log;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
/** Test class to verify RollingLevelDBTimelineStore. */
|
/** Test class to verify RollingLevelDBTimelineStore. */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
|
@ -417,6 +423,36 @@ public class TestRollingLevelDBTimelineStore extends TimelineStoreTestUtils {
|
||||||
Log.getLog().info("Duration for " + num + ": " + duration);
|
Log.getLog().info("Duration for " + num + ": " + duration);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* Test that RollingLevelDb repair is attempted at least once during
|
||||||
|
* serviceInit for RollingLeveldbTimelineStore in case open fails the
|
||||||
|
* first time.
|
||||||
|
*/ public void testLevelDbRepair() throws IOException {
|
||||||
|
RollingLevelDBTimelineStore store = new RollingLevelDBTimelineStore();
|
||||||
|
JniDBFactory factory = Mockito.mock(JniDBFactory.class);
|
||||||
|
Mockito.when(factory.open(Mockito.any(File.class), Mockito.any(Options.class)))
|
||||||
|
.thenThrow(new IOException()).thenCallRealMethod();
|
||||||
|
store.setFactory(factory);
|
||||||
|
|
||||||
|
//Create the LevelDb in a different location
|
||||||
|
File path = new File("target", this.getClass().getSimpleName() + "-tmpDir2").getAbsoluteFile();
|
||||||
|
Configuration conf = new Configuration(this.config);
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, path.getAbsolutePath());
|
||||||
|
try {
|
||||||
|
store.init(conf);
|
||||||
|
Mockito.verify(factory, Mockito.times(1))
|
||||||
|
.repair(Mockito.any(File.class), Mockito.any(Options.class));
|
||||||
|
FilenameFilter fileFilter =
|
||||||
|
new WildcardFileFilter("*" + RollingLevelDBTimelineStore.BACKUP_EXT + "*");
|
||||||
|
Assert.assertTrue(new File(path.getAbsolutePath(), RollingLevelDBTimelineStore.FILENAME)
|
||||||
|
.list(fileFilter).length > 0);
|
||||||
|
} finally {
|
||||||
|
store.close();
|
||||||
|
fsContext.delete(new Path(path.getAbsolutePath()), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestRollingLevelDBTimelineStore store =
|
TestRollingLevelDBTimelineStore store =
|
||||||
new TestRollingLevelDBTimelineStore();
|
new TestRollingLevelDBTimelineStore();
|
||||||
|
@ -424,4 +460,4 @@ public class TestRollingLevelDBTimelineStore extends TimelineStoreTestUtils {
|
||||||
store.testStorePerformance();
|
store.testStorePerformance();
|
||||||
store.tearDown();
|
store.tearDown();
|
||||||
}
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue