YARN-6054. TimelineServer fails to start when some LevelDb state files are missing. Contributed by Ravi Prakash.
This commit is contained in:
parent
41db07d532
commit
4c431a6940
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.timeline;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.apache.commons.collections.map.LRUMap;
|
import org.apache.commons.collections.map.LRUMap;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -33,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.WritableComparator;
|
import org.apache.hadoop.io.WritableComparator;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.*;
|
import org.apache.hadoop.yarn.api.records.timeline.*;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||||
|
@ -123,6 +125,11 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static final String FILENAME = "leveldb-timeline-store.ldb";
|
static final String FILENAME = "leveldb-timeline-store.ldb";
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
//Extension to FILENAME where backup will be stored in case we need to
|
||||||
|
//call LevelDb recovery
|
||||||
|
static final String BACKUP_EXT = ".backup-";
|
||||||
|
|
||||||
private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(Charset.forName("UTF-8"));
|
private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(Charset.forName("UTF-8"));
|
||||||
private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(Charset.forName("UTF-8"));
|
private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(Charset.forName("UTF-8"));
|
||||||
private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(Charset.forName("UTF-8"));
|
private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(Charset.forName("UTF-8"));
|
||||||
|
@ -175,6 +182,13 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
super(LeveldbTimelineStore.class.getName());
|
super(LeveldbTimelineStore.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private JniDBFactory factory;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setFactory(JniDBFactory fact) {
|
||||||
|
this.factory = fact;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
|
@ -209,7 +223,10 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
options.cacheSize(conf.getLong(
|
options.cacheSize(conf.getLong(
|
||||||
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
|
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
|
||||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
|
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
|
||||||
JniDBFactory factory = new JniDBFactory();
|
if(factory == null) {
|
||||||
|
factory = new JniDBFactory();
|
||||||
|
}
|
||||||
|
|
||||||
Path dbPath = new Path(
|
Path dbPath = new Path(
|
||||||
conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
|
conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
|
||||||
FileSystem localFS = null;
|
FileSystem localFS = null;
|
||||||
|
@ -226,7 +243,19 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
IOUtils.cleanup(LOG, localFS);
|
IOUtils.cleanup(LOG, localFS);
|
||||||
}
|
}
|
||||||
LOG.info("Using leveldb path " + dbPath);
|
LOG.info("Using leveldb path " + dbPath);
|
||||||
|
try {
|
||||||
db = factory.open(new File(dbPath.toString()), options);
|
db = factory.open(new File(dbPath.toString()), options);
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
File dbFile = new File(dbPath.toString());
|
||||||
|
File backupPath = new File(
|
||||||
|
dbPath.toString() + BACKUP_EXT + Time.monotonicNow());
|
||||||
|
LOG.warn("Incurred exception while loading LevelDb database. Backing " +
|
||||||
|
"up at "+ backupPath, ioe);
|
||||||
|
FileUtils.copyDirectory(dbFile, backupPath);
|
||||||
|
LOG.warn("Going to try repair");
|
||||||
|
factory.repair(dbFile, options);
|
||||||
|
db = factory.open(dbFile, options);
|
||||||
|
}
|
||||||
checkVersion();
|
checkVersion();
|
||||||
startTimeWriteCache =
|
startTimeWriteCache =
|
||||||
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
|
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
|
||||||
|
|
|
@ -22,12 +22,14 @@ import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.commons.io.filefilter.WildcardFileFilter;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -44,11 +46,14 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelineP
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||||
|
import org.fusesource.leveldbjni.JniDBFactory;
|
||||||
import org.iq80.leveldb.DBException;
|
import org.iq80.leveldb.DBException;
|
||||||
|
import org.iq80.leveldb.Options;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
|
@ -464,4 +469,38 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
|
||||||
.iterator().next().size());
|
.iterator().next().size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
/**
|
||||||
|
* Test that LevelDb repair is attempted at least once during
|
||||||
|
* serviceInit for LeveldbTimelineStore in case open fails the
|
||||||
|
* first time.
|
||||||
|
*/
|
||||||
|
public void testLevelDbRepair() throws IOException {
|
||||||
|
LeveldbTimelineStore store = new LeveldbTimelineStore();
|
||||||
|
|
||||||
|
JniDBFactory factory = Mockito.mock(JniDBFactory.class);
|
||||||
|
Mockito.when(
|
||||||
|
factory.open(Mockito.any(File.class), Mockito.any(Options.class)))
|
||||||
|
.thenThrow(new IOException()).thenCallRealMethod();
|
||||||
|
store.setFactory(factory);
|
||||||
|
|
||||||
|
//Create the LevelDb in a different location
|
||||||
|
File path = new File("target", this.getClass().getSimpleName() +
|
||||||
|
"-tmpDir1").getAbsoluteFile();
|
||||||
|
Configuration conf = new Configuration(this.config);
|
||||||
|
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
|
||||||
|
path.getAbsolutePath());
|
||||||
|
|
||||||
|
try {
|
||||||
|
store.init(conf);
|
||||||
|
Mockito.verify(factory, Mockito.times(1))
|
||||||
|
.repair(Mockito.any(File.class), Mockito.any(Options.class));
|
||||||
|
FileFilter fileFilter = new WildcardFileFilter(
|
||||||
|
"*" + LeveldbTimelineStore.BACKUP_EXT +"*");
|
||||||
|
Assert.assertTrue(path.listFiles(fileFilter).length > 0);
|
||||||
|
} finally {
|
||||||
|
store.close();
|
||||||
|
fsContext.delete(new Path(path.getAbsolutePath()), true);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue