diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 6cc44000170..57cc247a941 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3189,6 +3189,12 @@ public class YarnConfiguration extends Configuration { TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT = 7 * 24 * 60 * 60; + public static final String + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED = + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "recovery-enabled"; + public static final boolean + TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED_DEFAULT = true; + // how old the most recent log of an UNKNOWN app needs to be in the active // directory before we treat it as COMPLETED public static final String diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java index 5eab08badfd..1f4a9f42a9f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/EntityGroupFSTimelineStore.java @@ -25,14 +25,19 @@ import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.lang3.mutable.MutableBoolean; +import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.service.ServiceOperations; import org.apache.hadoop.ipc.CallerContext; @@ -59,6 +64,8 @@ import org.apache.hadoop.yarn.util.Apps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.MalformedURLException; @@ -70,6 +77,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.EnumSet; +import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; @@ -132,6 +140,11 @@ public class EntityGroupFSTimelineStore extends CompositeService private long logRetainMillis; private long unknownActiveMillis; private int appCacheMaxSize = 0; + private boolean recoveryEnabled; + private Path checkpointFile; + private ConcurrentMap> recoveredLogs = + new ConcurrentHashMap>(); + private List cacheIdPlugins; private Map cachedLogs; private boolean aclsEnabled; @@ -205,6 +218,11 @@ public class EntityGroupFSTimelineStore extends CompositeService YarnConfiguration .TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT)); fs = activeRootPath.getFileSystem(conf); + checkpointFile = new Path(fs.getHomeDirectory(), "atscheckpoint"); + recoveryEnabled = conf.getBoolean( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED, + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED_DEFAULT); + aclsEnabled = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE, YarnConfiguration.DEFAULT_YARN_ACL_ENABLE); CallerContext.setCurrent( @@ -293,6 +311,15 @@ public class EntityGroupFSTimelineStore extends CompositeService fs.setPermission(doneRootPath, DONE_DIR_PERMISSION); } + // Recover the lastProcessedTime and offset for logfiles + if (recoveryEnabled && fs.exists(checkpointFile)) { + try (FSDataInputStream in = fs.open(checkpointFile)) { + recoveredLogs.putAll(recoverLogFiles(in)); + } catch (IOException e) { + LOG.warn("Failed to recover summarylog files from the checkpointfile", e); + } + } + objMapper = new ObjectMapper(); objMapper.setAnnotationIntrospector( new JaxbAnnotationIntrospector(TypeFactory.defaultInstance())); @@ -352,10 +379,62 @@ public class EntityGroupFSTimelineStore extends CompositeService super.serviceStop(); } + /* Returns Map of SummaryLog files. The Value Pair has + lastProcessedTime and offset */ + HashMap> recoverLogFiles( + DataInputStream in) throws IOException { + HashMap> logFiles = new HashMap<>(); + long totalEntries = in.readLong(); + for (long i = 0; i < totalEntries; i++) { + Text attemptDirName = new Text(); + attemptDirName.readFields(in); + Text fileName = new Text(); + fileName.readFields(in); + LongWritable lastProcessedTime = new LongWritable(); + lastProcessedTime.readFields(in); + LongWritable offset = new LongWritable(); + offset.readFields(in); + Pair pair = Pair.of(lastProcessedTime.get(), offset.get()); + logFiles.put(attemptDirName + Path.SEPARATOR + fileName, pair); + } + LOG.info("Recovered {} summarylog files", totalEntries); + return logFiles; + } + + // Stores set of SummaryLog files + void storeLogFiles(Collection appLogs, + DataOutputStream checkPointStream) throws IOException { + long totalEntries = 0L; + for (AppLogs appLog : appLogs) { + totalEntries += appLog.summaryLogs.size(); + } + checkPointStream.writeLong(totalEntries); + for (AppLogs appLog : appLogs) { + for (LogInfo summaryLog : appLog.summaryLogs) { + new Text(summaryLog.getAttemptDirName()).write(checkPointStream); + new Text(summaryLog.getFilename()).write(checkPointStream); + new LongWritable(summaryLog.getLastProcessedTime()).write(checkPointStream); + new LongWritable(summaryLog.getOffset()).write(checkPointStream); + } + } + LOG.info("Stored {} summarylog files into checkPointFile", totalEntries); + } + @InterfaceAudience.Private @VisibleForTesting int scanActiveLogs() throws IOException { long startTime = Time.monotonicNow(); + // Store the Last Processed Time and Offset + if (recoveryEnabled && appIdLogMap.size() > 0) { + + try (FSDataOutputStream checkPointStream = fs.create(checkpointFile, true)) { + + storeLogFiles(appIdLogMap.values(), checkPointStream); + + } catch (Exception e) { + LOG.warn("Failed to checkpoint the summarylog files", e); + } + } int logsToScanCount = scanActiveLogs(activeRootPath); metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime); return logsToScanCount; @@ -824,6 +903,15 @@ public class EntityGroupFSTimelineStore extends CompositeService log = new EntityLogInfo(attemptDirName, filename, owner); summaryLogs.add(log); } + // This is to avoid processing summary files again during Restart of ATS + if (recoveryEnabled) { + Pair pair = recoveredLogs.remove(log.getAttemptDirName() + + Path.SEPARATOR + log.getFilename()); + if (pair != null) { + log.setLastProcessedTime(pair.getKey()); + log.setOffset(pair.getValue()); + } + } } private synchronized void addDetailLog(String attemptDirName, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java index a8cb9330b32..8cc6b72ab9c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/main/java/org/apache/hadoop/yarn/server/timeline/LogInfo.java @@ -58,7 +58,17 @@ abstract class LogInfo { this.offset = newOffset; } + + public long getLastProcessedTime() { + return lastProcessedTime; + } + + public void setLastProcessedTime(long lastProcessedTime) { + this.lastProcessedTime = lastProcessedTime; + } + private String attemptDirName; + private long lastProcessedTime = -1; private String filename; private String user; private long offset = 0; @@ -108,22 +118,31 @@ abstract class LogInfo { FileStatus status = fs.getFileStatus(logPath); long numParsed = 0; if (status != null) { - long startTime = Time.monotonicNow(); - try { - LOG.debug("Parsing {} at offset {}", logPath, offset); - long count = parsePath(tdm, logPath, appCompleted, jsonFactory, - objMapper, fs); - LOG.info("Parsed {} entities from {} in {} msec", - count, logPath, Time.monotonicNow() - startTime); - numParsed += count; - } catch (RuntimeException e) { - // If AppLogs cannot parse this log, it may be corrupted or just empty - if (e.getCause() instanceof JsonParseException && - (status.getLen() > 0 || offset > 0)) { - // log on parse problems if the file as been read in the past or - // is visibly non-empty - LOG.info("Log {} appears to be corrupted. Skip. ", logPath); + long curModificationTime = status.getModificationTime(); + if (curModificationTime > getLastProcessedTime()) { + long startTime = Time.monotonicNow(); + try { + LOG.info("Parsing {} at offset {}", logPath, offset); + long count = + parsePath(tdm, logPath, appCompleted, jsonFactory, objMapper, fs); + setLastProcessedTime(curModificationTime); + LOG.info("Parsed {} entities from {} in {} msec", count, logPath, + Time.monotonicNow() - startTime); + numParsed += count; + } catch (RuntimeException e) { + // If AppLogs cannot parse this log, it may be corrupted or just empty + if (e.getCause() instanceof JsonParseException + && (status.getLen() > 0 || offset > 0)) { + // log on parse problems if the file as been read in the past or + // is visibly non-empty + LOG.info("Log {} appears to be corrupted. Skip. ", logPath); + } else { + LOG.error("Failed to parse " + logPath + " from offset " + offset, + e); + } } + } else { + LOG.info("Skip Parsing {} as there is no change", logPath); } } else { LOG.warn("{} no longer exists. Skip for scanning. ", logPath); @@ -182,21 +201,19 @@ class EntityLogInfo extends LogInfo { long count = 0; TimelineEntities entities = new TimelineEntities(); ArrayList entityList = new ArrayList(1); - long bytesParsed; - long bytesParsedLastBatch = 0; boolean postError = false; try { MappingIterator iter = objMapper.readValues(parser, TimelineEntity.class); - + long curPos; while (iter.hasNext()) { TimelineEntity entity = iter.next(); String etype = entity.getEntityType(); String eid = entity.getEntityId(); - LOG.trace("Read entity {}", etype); + LOG.debug("Read entity {} of {}", eid, etype); ++count; - bytesParsed = parser.getCurrentLocation().getCharOffset() + 1; - LOG.trace("Parser now at offset {}", bytesParsed); + curPos = ((FSDataInputStream) parser.getInputSource()).getPos(); + LOG.debug("Parser now at offset {}", curPos); try { LOG.debug("Adding {}({}) to store", eid, etype); @@ -208,8 +225,7 @@ class EntityLogInfo extends LogInfo { LOG.warn("Error putting entity: {} ({}): {}", e.getEntityId(), e.getEntityType(), e.getErrorCode()); } - setOffset(getOffset() + bytesParsed - bytesParsedLastBatch); - bytesParsedLastBatch = bytesParsed; + setOffset(curPos); entityList.clear(); } catch (YarnException e) { postError = true; @@ -247,8 +263,7 @@ class DomainLogInfo extends LogInfo { ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted) throws IOException { long count = 0; - long bytesParsed; - long bytesParsedLastBatch = 0; + long curPos; boolean putError = false; try { MappingIterator iter = objMapper.readValues(parser, @@ -259,13 +274,12 @@ class DomainLogInfo extends LogInfo { domain.setOwner(ugi.getShortUserName()); LOG.trace("Read domain {}", domain.getId()); ++count; - bytesParsed = parser.getCurrentLocation().getCharOffset() + 1; - LOG.trace("Parser now at offset {}", bytesParsed); + curPos = ((FSDataInputStream) parser.getInputSource()).getPos(); + LOG.debug("Parser now at offset {}", curPos); try { tdm.putDomain(domain, ugi); - setOffset(getOffset() + bytesParsed - bytesParsedLastBatch); - bytesParsedLastBatch = bytesParsed; + setOffset(curPos); } catch (YarnException e) { putError = true; throw new IOException("Error posting domain", e); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java index fdd47727c50..d421d35ce28 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestEntityGroupFSTimelineStore.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.timeline; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileContext; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState; import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; @@ -50,8 +52,19 @@ import org.junit.Rule; import org.junit.Test; import org.junit.rules.TestName; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.util.MinimalPrettyPrinter; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.TypeFactory; +import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector; + +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.io.File; import java.io.IOException; +import java.io.OutputStream; import java.net.URL; import java.net.URLClassLoader; import java.util.ArrayList; @@ -61,6 +74,8 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertEquals; @@ -602,6 +617,149 @@ public class TestEntityGroupFSTimelineStore extends TimelineStoreTestUtils { } } + // TestTimelineStore to validate the put entities call + static class TestTimelineStore extends LeveldbTimelineStore { + static final AtomicInteger ENTITIES_COUNT = new AtomicInteger(0); + + TestTimelineStore() { + super(); + } + + @Override + public TimelinePutResponse put(TimelineEntities entities) { + ENTITIES_COUNT.getAndAdd(entities.getEntities().size()); + return new TimelinePutResponse(); + } + + public static int getEntitiesCount() { + return ENTITIES_COUNT.get(); + } + } + + @Test + public void testIfAnyDuplicateEntities() throws Exception { + // Create an application with some entities + ApplicationId appId = + ApplicationId.fromString("application_1501509265053_0002"); + String user = UserGroupInformation.getCurrentUser().getShortUserName(); + Path activeDirPath = getTestRootPath("active1"); + Path doneDirPath = getTestRootPath("done1"); + Path userBase = new Path(activeDirPath, user); + Path userAppRoot = new Path(userBase, appId.toString()); + Path attemptDirPath = new Path(userAppRoot, getAttemptDirName(appId)); + + String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX + + EntityGroupPlugInForTest.getStandardTimelineGroupId(appId); + createTestFiles(appId, attemptDirPath, logFileName); + + // stop the default store before creating new store to get the lock + store.stop(); + EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() { + @Override + protected AppState getAppState(ApplicationId appId) throws IOException { + return AppState.ACTIVE; + } + }; + + try { + // Start ATS with TestTimelineStore + Configuration newConfig = new YarnConfiguration(config); + newConfig.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE, + TestTimelineStore.class.getName()); + newConfig.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR, + doneDirPath.toString()); + newConfig.set( + YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR, + activeDirPath.toString()); + newStore.init(newConfig); + newStore.setFs(fs); + newStore.start(); + + // Validate if the initial entities count are correct + newStore.scanActiveLogs(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return TestTimelineStore.getEntitiesCount() == 2; + } + }, 100, 10000); + assertEquals("Wrong Initial Entities Count", + 2, TestTimelineStore.getEntitiesCount()); + + // Append the Summary log file with few more entities + TimelineEntities entities = PluginStoreTestUtils.generateTestEntities(); + FSDataOutputStream outStream = fs.append( + new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME)); + JsonGenerator jsonGenerator + = new JsonFactory().createGenerator((OutputStream)outStream); + jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n")); + ObjectMapper objMapper = new ObjectMapper(); + objMapper.setAnnotationIntrospector( + new JaxbAnnotationIntrospector(TypeFactory.defaultInstance())); + objMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); + for (TimelineEntity entity : entities.getEntities()) { + objMapper.writeValue(jsonGenerator, entity); + } + outStream.close(); + + // Validate if there are any duplicates + newStore.scanActiveLogs(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + return TestTimelineStore.getEntitiesCount() == 4; + } + }, 100, 10000); + assertEquals("Duplicate Entities present", + 4, TestTimelineStore.getEntitiesCount()); + + } finally { + if (newStore != null) { + newStore.stop(); + } + fs.delete(userAppRoot, true); + } + } + + @Test + public void testStateStoreAndRecovery() throws Exception { + // Prepare the AppLogs Data + EntityGroupFSTimelineStore.AppLogs appLogs = + store.new AppLogs(mainTestAppId, mainTestAppDirPath, AppState.COMPLETED); + appLogs.scanForLogs(); + List summaryLogs = appLogs.getSummaryLogs(); + List logsList = new ArrayList<>(); + logsList.add(appLogs); + + // Store the Log files + Path checkpointFile = new Path(fs.getHomeDirectory(), "atscheckpoint"); + try (DataOutputStream dataOutputStream = fs.create(checkpointFile)) { + store.storeLogFiles(logsList, dataOutputStream); + } catch (IOException e) { + Assert.fail("Failed to store the log files"); + } + + // Recover the Log files and validate the contents + try (DataInputStream dataInputStream = fs.open(checkpointFile)) { + HashMap> logFiles = + store.recoverLogFiles(dataInputStream); + assertEquals(summaryLogs.size(), logFiles.size()); + for (LogInfo logInfo : summaryLogs) { + String logFileName = logInfo.getAttemptDirName() + + Path.SEPARATOR + logInfo.getFilename(); + Pair pair = logFiles.get(logFileName); + assertNotNull("Failed to recover " + logFileName, pair); + assertTrue("LastProcessedTime is not same", + logInfo.getLastProcessedTime() == pair.getLeft()); + assertTrue("Offset is not same", + logInfo.getOffset() == pair.getRight()); + } + } catch (IOException e) { + Assert.fail("Failed to recover the log files"); + } + } + + private EntityGroupFSTimelineStore createAndStartTimelineStore( AppState appstate) { // stop before creating new store to get the lock diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java index bccf4b84406..dd38565cc59 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timeline-pluginstorage/src/test/java/org/apache/hadoop/yarn/server/timeline/TestLogInfo.java @@ -156,23 +156,6 @@ public class TestLogInfo { fs); // Verify for the first batch PluginStoreTestUtils.verifyTestEntities(tdm); - // Load new data - TimelineEntity entityNew = PluginStoreTestUtils - .createEntity("id_3", "type_3", 789l, null, null, - null, null, "domain_id_1"); - TimelineEntities entityList = new TimelineEntities(); - entityList.addEntity(entityNew); - writeEntitiesLeaveOpen(entityList, - new Path(getTestRootPath(TEST_ATTEMPT_DIR_NAME), TEST_ENTITY_FILE_NAME)); - testLogInfo.parseForStore(tdm, getTestRootPath(), true, jsonFactory, objMapper, - fs); - // Verify the newly added data - TimelineEntity entity3 = tdm.getEntity(entityNew.getEntityType(), - entityNew.getEntityId(), EnumSet.allOf(TimelineReader.Field.class), - UserGroupInformation.getLoginUser()); - assertNotNull(entity3); - assertEquals("Failed to read out entity new", - entityNew.getStartTime(), entity3.getStartTime()); tdm.close(); }