YARN-6054. TimelineServer fails to start when some LevelDb state files are missing. Contributed by Ravi Prakash.

(cherry picked from commit 4c431a6940)
This commit is contained in:
Naganarasimha 2017-01-10 15:54:16 +05:30
parent ea38256fab
commit a7cdd39234
2 changed files with 70 additions and 2 deletions

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.timeline;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import org.apache.commons.collections.map.LRUMap; import org.apache.commons.collections.map.LRUMap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -33,6 +34,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.timeline.*; import org.apache.hadoop.yarn.api.records.timeline.*;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
@ -123,6 +125,11 @@ public class LeveldbTimelineStore extends AbstractService
@VisibleForTesting @VisibleForTesting
static final String FILENAME = "leveldb-timeline-store.ldb"; static final String FILENAME = "leveldb-timeline-store.ldb";
@VisibleForTesting
//Extension to FILENAME where backup will be stored in case we need to
//call LevelDb recovery
static final String BACKUP_EXT = ".backup-";
private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(Charset.forName("UTF-8")); private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(Charset.forName("UTF-8"));
private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(Charset.forName("UTF-8")); private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(Charset.forName("UTF-8"));
private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(Charset.forName("UTF-8")); private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(Charset.forName("UTF-8"));
@ -175,6 +182,13 @@ public class LeveldbTimelineStore extends AbstractService
super(LeveldbTimelineStore.class.getName()); super(LeveldbTimelineStore.class.getName());
} }
private JniDBFactory factory;
@VisibleForTesting
void setFactory(JniDBFactory fact) {
this.factory = fact;
}
@Override @Override
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
protected void serviceInit(Configuration conf) throws Exception { protected void serviceInit(Configuration conf) throws Exception {
@ -209,7 +223,10 @@ public class LeveldbTimelineStore extends AbstractService
options.cacheSize(conf.getLong( options.cacheSize(conf.getLong(
YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
JniDBFactory factory = new JniDBFactory(); if(factory == null) {
factory = new JniDBFactory();
}
Path dbPath = new Path( Path dbPath = new Path(
conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME); conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
FileSystem localFS = null; FileSystem localFS = null;
@ -226,7 +243,19 @@ public class LeveldbTimelineStore extends AbstractService
IOUtils.cleanup(LOG, localFS); IOUtils.cleanup(LOG, localFS);
} }
LOG.info("Using leveldb path " + dbPath); LOG.info("Using leveldb path " + dbPath);
db = factory.open(new File(dbPath.toString()), options); try {
db = factory.open(new File(dbPath.toString()), options);
} catch (IOException ioe) {
File dbFile = new File(dbPath.toString());
File backupPath = new File(
dbPath.toString() + BACKUP_EXT + Time.monotonicNow());
LOG.warn("Incurred exception while loading LevelDb database. Backing " +
"up at "+ backupPath, ioe);
FileUtils.copyDirectory(dbFile, backupPath);
LOG.warn("Going to try repair");
factory.repair(dbFile, options);
db = factory.open(dbFile, options);
}
checkVersion(); checkVersion();
startTimeWriteCache = startTimeWriteCache =
Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(

View File

@ -22,12 +22,14 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import java.io.File; import java.io.File;
import java.io.FileFilter;
import java.io.IOException; import java.io.IOException;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -44,11 +46,14 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelineP
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator; import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DBException; import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Options;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
@ -464,4 +469,38 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
.iterator().next().size()); .iterator().next().size());
} }
@Test
/**
* Test that LevelDb repair is attempted at least once during
* serviceInit for LeveldbTimelineStore in case open fails the
* first time.
*/
public void testLevelDbRepair() throws IOException {
LeveldbTimelineStore store = new LeveldbTimelineStore();
JniDBFactory factory = Mockito.mock(JniDBFactory.class);
Mockito.when(
factory.open(Mockito.any(File.class), Mockito.any(Options.class)))
.thenThrow(new IOException()).thenCallRealMethod();
store.setFactory(factory);
//Create the LevelDb in a different location
File path = new File("target", this.getClass().getSimpleName() +
"-tmpDir1").getAbsoluteFile();
Configuration conf = new Configuration(this.config);
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
path.getAbsolutePath());
try {
store.init(conf);
Mockito.verify(factory, Mockito.times(1))
.repair(Mockito.any(File.class), Mockito.any(Options.class));
FileFilter fileFilter = new WildcardFileFilter(
"*" + LeveldbTimelineStore.BACKUP_EXT +"*");
Assert.assertTrue(path.listFiles(fileFilter).length > 0);
} finally {
store.close();
fsContext.delete(new Path(path.getAbsolutePath()), true);
}
}
} }