YARN-10975 EntityGroupFSTimelineStore#ActiveLogParser parses already processed files (#3735)

Contributed by  sravuri <sravuri@microsoft.com> and reviewed by Prabhu Joseph
This commit is contained in:
Sushmasree-28 2021-11-30 00:00:25 +05:30 committed by GitHub
parent c65c87f211
commit 7b840f2693
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 295 additions and 46 deletions

View File

@ -3189,6 +3189,12 @@ public static boolean isAclEnabled(Configuration conf) {
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RETAIN_SECONDS_DEFAULT =
7 * 24 * 60 * 60;
public static final String
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED =
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_PREFIX + "recovery-enabled";
public static final boolean
TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED_DEFAULT = true;
// how old the most recent log of an UNKNOWN app needs to be in the active
// directory before we treat it as COMPLETED
public static final String

View File

@ -25,14 +25,19 @@
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.service.ServiceOperations;
import org.apache.hadoop.ipc.CallerContext;
@ -59,6 +64,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.MalformedURLException;
@ -70,6 +77,7 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
@ -132,6 +140,11 @@ public class EntityGroupFSTimelineStore extends CompositeService
private long logRetainMillis;
private long unknownActiveMillis;
private int appCacheMaxSize = 0;
private boolean recoveryEnabled;
private Path checkpointFile;
private ConcurrentMap<String, Pair<Long, Long>> recoveredLogs =
new ConcurrentHashMap<String, Pair<Long, Long>>();
private List<TimelineEntityGroupPlugin> cacheIdPlugins;
private Map<TimelineEntityGroupId, EntityCacheItem> cachedLogs;
private boolean aclsEnabled;
@ -205,6 +218,11 @@ protected boolean removeEldestEntry(
YarnConfiguration
.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR_DEFAULT));
fs = activeRootPath.getFileSystem(conf);
checkpointFile = new Path(fs.getHomeDirectory(), "atscheckpoint");
recoveryEnabled = conf.getBoolean(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED,
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_RECOVERY_ENABLED_DEFAULT);
aclsEnabled = conf.getBoolean(YarnConfiguration.YARN_ACL_ENABLE,
YarnConfiguration.DEFAULT_YARN_ACL_ENABLE);
CallerContext.setCurrent(
@ -293,6 +311,15 @@ protected void serviceStart() throws Exception {
fs.setPermission(doneRootPath, DONE_DIR_PERMISSION);
}
// Recover the lastProcessedTime and offset for logfiles
if (recoveryEnabled && fs.exists(checkpointFile)) {
try (FSDataInputStream in = fs.open(checkpointFile)) {
recoveredLogs.putAll(recoverLogFiles(in));
} catch (IOException e) {
LOG.warn("Failed to recover summarylog files from the checkpointfile", e);
}
}
objMapper = new ObjectMapper();
objMapper.setAnnotationIntrospector(
new JaxbAnnotationIntrospector(TypeFactory.defaultInstance()));
@ -352,10 +379,62 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
/* Returns Map of SummaryLog files. The Value Pair has
lastProcessedTime and offset */
HashMap<String, Pair<Long, Long>> recoverLogFiles(
DataInputStream in) throws IOException {
HashMap<String, Pair<Long, Long>> logFiles = new HashMap<>();
long totalEntries = in.readLong();
for (long i = 0; i < totalEntries; i++) {
Text attemptDirName = new Text();
attemptDirName.readFields(in);
Text fileName = new Text();
fileName.readFields(in);
LongWritable lastProcessedTime = new LongWritable();
lastProcessedTime.readFields(in);
LongWritable offset = new LongWritable();
offset.readFields(in);
Pair<Long, Long> pair = Pair.of(lastProcessedTime.get(), offset.get());
logFiles.put(attemptDirName + Path.SEPARATOR + fileName, pair);
}
LOG.info("Recovered {} summarylog files", totalEntries);
return logFiles;
}
// Stores set of SummaryLog files
void storeLogFiles(Collection<AppLogs> appLogs,
DataOutputStream checkPointStream) throws IOException {
long totalEntries = 0L;
for (AppLogs appLog : appLogs) {
totalEntries += appLog.summaryLogs.size();
}
checkPointStream.writeLong(totalEntries);
for (AppLogs appLog : appLogs) {
for (LogInfo summaryLog : appLog.summaryLogs) {
new Text(summaryLog.getAttemptDirName()).write(checkPointStream);
new Text(summaryLog.getFilename()).write(checkPointStream);
new LongWritable(summaryLog.getLastProcessedTime()).write(checkPointStream);
new LongWritable(summaryLog.getOffset()).write(checkPointStream);
}
}
LOG.info("Stored {} summarylog files into checkPointFile", totalEntries);
}
@InterfaceAudience.Private
@VisibleForTesting
int scanActiveLogs() throws IOException {
long startTime = Time.monotonicNow();
// Store the Last Processed Time and Offset
if (recoveryEnabled && appIdLogMap.size() > 0) {
try (FSDataOutputStream checkPointStream = fs.create(checkpointFile, true)) {
storeLogFiles(appIdLogMap.values(), checkPointStream);
} catch (Exception e) {
LOG.warn("Failed to checkpoint the summarylog files", e);
}
}
int logsToScanCount = scanActiveLogs(activeRootPath);
metrics.addActiveLogDirScanTime(Time.monotonicNow() - startTime);
return logsToScanCount;
@ -824,6 +903,15 @@ private void addSummaryLog(String attemptDirName,
log = new EntityLogInfo(attemptDirName, filename, owner);
summaryLogs.add(log);
}
// This is to avoid processing summary files again during Restart of ATS
if (recoveryEnabled) {
Pair<Long, Long> pair = recoveredLogs.remove(log.getAttemptDirName()
+ Path.SEPARATOR + log.getFilename());
if (pair != null) {
log.setLastProcessedTime(pair.getKey());
log.setOffset(pair.getValue());
}
}
}
private synchronized void addDetailLog(String attemptDirName,

View File

@ -58,7 +58,17 @@ public void setOffset(long newOffset) {
this.offset = newOffset;
}
public long getLastProcessedTime() {
return lastProcessedTime;
}
public void setLastProcessedTime(long lastProcessedTime) {
this.lastProcessedTime = lastProcessedTime;
}
private String attemptDirName;
private long lastProcessedTime = -1;
private String filename;
private String user;
private long offset = 0;
@ -108,22 +118,31 @@ public long parseForStore(TimelineDataManager tdm, Path appDirPath,
FileStatus status = fs.getFileStatus(logPath);
long numParsed = 0;
if (status != null) {
long startTime = Time.monotonicNow();
try {
LOG.debug("Parsing {} at offset {}", logPath, offset);
long count = parsePath(tdm, logPath, appCompleted, jsonFactory,
objMapper, fs);
LOG.info("Parsed {} entities from {} in {} msec",
count, logPath, Time.monotonicNow() - startTime);
numParsed += count;
} catch (RuntimeException e) {
// If AppLogs cannot parse this log, it may be corrupted or just empty
if (e.getCause() instanceof JsonParseException &&
(status.getLen() > 0 || offset > 0)) {
// log on parse problems if the file as been read in the past or
// is visibly non-empty
LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
long curModificationTime = status.getModificationTime();
if (curModificationTime > getLastProcessedTime()) {
long startTime = Time.monotonicNow();
try {
LOG.info("Parsing {} at offset {}", logPath, offset);
long count =
parsePath(tdm, logPath, appCompleted, jsonFactory, objMapper, fs);
setLastProcessedTime(curModificationTime);
LOG.info("Parsed {} entities from {} in {} msec", count, logPath,
Time.monotonicNow() - startTime);
numParsed += count;
} catch (RuntimeException e) {
// If AppLogs cannot parse this log, it may be corrupted or just empty
if (e.getCause() instanceof JsonParseException
&& (status.getLen() > 0 || offset > 0)) {
// log on parse problems if the file as been read in the past or
// is visibly non-empty
LOG.info("Log {} appears to be corrupted. Skip. ", logPath);
} else {
LOG.error("Failed to parse " + logPath + " from offset " + offset,
e);
}
}
} else {
LOG.info("Skip Parsing {} as there is no change", logPath);
}
} else {
LOG.warn("{} no longer exists. Skip for scanning. ", logPath);
@ -182,21 +201,19 @@ protected long doParse(TimelineDataManager tdm, JsonParser parser,
long count = 0;
TimelineEntities entities = new TimelineEntities();
ArrayList<TimelineEntity> entityList = new ArrayList<TimelineEntity>(1);
long bytesParsed;
long bytesParsedLastBatch = 0;
boolean postError = false;
try {
MappingIterator<TimelineEntity> iter = objMapper.readValues(parser,
TimelineEntity.class);
long curPos;
while (iter.hasNext()) {
TimelineEntity entity = iter.next();
String etype = entity.getEntityType();
String eid = entity.getEntityId();
LOG.trace("Read entity {}", etype);
LOG.debug("Read entity {} of {}", eid, etype);
++count;
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
LOG.trace("Parser now at offset {}", bytesParsed);
curPos = ((FSDataInputStream) parser.getInputSource()).getPos();
LOG.debug("Parser now at offset {}", curPos);
try {
LOG.debug("Adding {}({}) to store", eid, etype);
@ -208,8 +225,7 @@ protected long doParse(TimelineDataManager tdm, JsonParser parser,
LOG.warn("Error putting entity: {} ({}): {}",
e.getEntityId(), e.getEntityType(), e.getErrorCode());
}
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
bytesParsedLastBatch = bytesParsed;
setOffset(curPos);
entityList.clear();
} catch (YarnException e) {
postError = true;
@ -247,8 +263,7 @@ protected long doParse(TimelineDataManager tdm, JsonParser parser,
ObjectMapper objMapper, UserGroupInformation ugi, boolean appCompleted)
throws IOException {
long count = 0;
long bytesParsed;
long bytesParsedLastBatch = 0;
long curPos;
boolean putError = false;
try {
MappingIterator<TimelineDomain> iter = objMapper.readValues(parser,
@ -259,13 +274,12 @@ protected long doParse(TimelineDataManager tdm, JsonParser parser,
domain.setOwner(ugi.getShortUserName());
LOG.trace("Read domain {}", domain.getId());
++count;
bytesParsed = parser.getCurrentLocation().getCharOffset() + 1;
LOG.trace("Parser now at offset {}", bytesParsed);
curPos = ((FSDataInputStream) parser.getInputSource()).getPos();
LOG.debug("Parser now at offset {}", curPos);
try {
tdm.putDomain(domain, ugi);
setOffset(getOffset() + bytesParsed - bytesParsedLastBatch);
bytesParsedLastBatch = bytesParsed;
setOffset(curPos);
} catch (YarnException e) {
putError = true;
throw new IOException("Error posting domain", e);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.timeline;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileContext;
@ -37,6 +38,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.EntityGroupFSTimelineStore.AppState;
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
@ -50,8 +52,19 @@
import org.junit.Test;
import org.junit.rules.TestName;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.util.MinimalPrettyPrinter;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.type.TypeFactory;
import com.fasterxml.jackson.module.jaxb.JaxbAnnotationIntrospector;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
@ -61,6 +74,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
@ -602,6 +617,149 @@ public void testScanActiveLogsAndMoveToDonePluginRead() throws Exception {
}
}
// TestTimelineStore to validate the put entities call
static class TestTimelineStore extends LeveldbTimelineStore {
static final AtomicInteger ENTITIES_COUNT = new AtomicInteger(0);
TestTimelineStore() {
super();
}
@Override
public TimelinePutResponse put(TimelineEntities entities) {
ENTITIES_COUNT.getAndAdd(entities.getEntities().size());
return new TimelinePutResponse();
}
public static int getEntitiesCount() {
return ENTITIES_COUNT.get();
}
}
@Test
public void testIfAnyDuplicateEntities() throws Exception {
// Create an application with some entities
ApplicationId appId =
ApplicationId.fromString("application_1501509265053_0002");
String user = UserGroupInformation.getCurrentUser().getShortUserName();
Path activeDirPath = getTestRootPath("active1");
Path doneDirPath = getTestRootPath("done1");
Path userBase = new Path(activeDirPath, user);
Path userAppRoot = new Path(userBase, appId.toString());
Path attemptDirPath = new Path(userAppRoot, getAttemptDirName(appId));
String logFileName = EntityGroupFSTimelineStore.ENTITY_LOG_PREFIX
+ EntityGroupPlugInForTest.getStandardTimelineGroupId(appId);
createTestFiles(appId, attemptDirPath, logFileName);
// stop the default store before creating new store to get the lock
store.stop();
EntityGroupFSTimelineStore newStore = new EntityGroupFSTimelineStore() {
@Override
protected AppState getAppState(ApplicationId appId) throws IOException {
return AppState.ACTIVE;
}
};
try {
// Start ATS with TestTimelineStore
Configuration newConfig = new YarnConfiguration(config);
newConfig.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_SUMMARY_STORE,
TestTimelineStore.class.getName());
newConfig.set(YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_DONE_DIR,
doneDirPath.toString());
newConfig.set(
YarnConfiguration.TIMELINE_SERVICE_ENTITYGROUP_FS_STORE_ACTIVE_DIR,
activeDirPath.toString());
newStore.init(newConfig);
newStore.setFs(fs);
newStore.start();
// Validate if the initial entities count are correct
newStore.scanActiveLogs();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return TestTimelineStore.getEntitiesCount() == 2;
}
}, 100, 10000);
assertEquals("Wrong Initial Entities Count",
2, TestTimelineStore.getEntitiesCount());
// Append the Summary log file with few more entities
TimelineEntities entities = PluginStoreTestUtils.generateTestEntities();
FSDataOutputStream outStream = fs.append(
new Path(attemptDirPath, TEST_SUMMARY_LOG_FILE_NAME));
JsonGenerator jsonGenerator
= new JsonFactory().createGenerator((OutputStream)outStream);
jsonGenerator.setPrettyPrinter(new MinimalPrettyPrinter("\n"));
ObjectMapper objMapper = new ObjectMapper();
objMapper.setAnnotationIntrospector(
new JaxbAnnotationIntrospector(TypeFactory.defaultInstance()));
objMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
for (TimelineEntity entity : entities.getEntities()) {
objMapper.writeValue(jsonGenerator, entity);
}
outStream.close();
// Validate if there are any duplicates
newStore.scanActiveLogs();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return TestTimelineStore.getEntitiesCount() == 4;
}
}, 100, 10000);
assertEquals("Duplicate Entities present",
4, TestTimelineStore.getEntitiesCount());
} finally {
if (newStore != null) {
newStore.stop();
}
fs.delete(userAppRoot, true);
}
}
@Test
public void testStateStoreAndRecovery() throws Exception {
// Prepare the AppLogs Data
EntityGroupFSTimelineStore.AppLogs appLogs =
store.new AppLogs(mainTestAppId, mainTestAppDirPath, AppState.COMPLETED);
appLogs.scanForLogs();
List<LogInfo> summaryLogs = appLogs.getSummaryLogs();
List<EntityGroupFSTimelineStore.AppLogs> logsList = new ArrayList<>();
logsList.add(appLogs);
// Store the Log files
Path checkpointFile = new Path(fs.getHomeDirectory(), "atscheckpoint");
try (DataOutputStream dataOutputStream = fs.create(checkpointFile)) {
store.storeLogFiles(logsList, dataOutputStream);
} catch (IOException e) {
Assert.fail("Failed to store the log files");
}
// Recover the Log files and validate the contents
try (DataInputStream dataInputStream = fs.open(checkpointFile)) {
HashMap<String, Pair<Long, Long>> logFiles =
store.recoverLogFiles(dataInputStream);
assertEquals(summaryLogs.size(), logFiles.size());
for (LogInfo logInfo : summaryLogs) {
String logFileName = logInfo.getAttemptDirName() +
Path.SEPARATOR + logInfo.getFilename();
Pair<Long, Long> pair = logFiles.get(logFileName);
assertNotNull("Failed to recover " + logFileName, pair);
assertTrue("LastProcessedTime is not same",
logInfo.getLastProcessedTime() == pair.getLeft());
assertTrue("Offset is not same",
logInfo.getOffset() == pair.getRight());
}
} catch (IOException e) {
Assert.fail("Failed to recover the log files");
}
}
private EntityGroupFSTimelineStore createAndStartTimelineStore(
AppState appstate) {
// stop before creating new store to get the lock

View File

@ -156,23 +156,6 @@ public void testParseEntity() throws Exception {
fs);
// Verify for the first batch
PluginStoreTestUtils.verifyTestEntities(tdm);
// Load new data
TimelineEntity entityNew = PluginStoreTestUtils
.createEntity("id_3", "type_3", 789l, null, null,
null, null, "domain_id_1");
TimelineEntities entityList = new TimelineEntities();
entityList.addEntity(entityNew);
writeEntitiesLeaveOpen(entityList,
new Path(getTestRootPath(TEST_ATTEMPT_DIR_NAME), TEST_ENTITY_FILE_NAME));
testLogInfo.parseForStore(tdm, getTestRootPath(), true, jsonFactory, objMapper,
fs);
// Verify the newly added data
TimelineEntity entity3 = tdm.getEntity(entityNew.getEntityType(),
entityNew.getEntityId(), EnumSet.allOf(TimelineReader.Field.class),
UserGroupInformation.getLoginUser());
assertNotNull(entity3);
assertEquals("Failed to read out entity new",
entityNew.getStartTime(), entity3.getStartTime());
tdm.close();
}