From 7ed627af6b3503e2b5446e582c83678218996d72 Mon Sep 17 00:00:00 2001 From: Vrushali C Date: Thu, 11 Oct 2018 21:23:34 -0700 Subject: [PATCH] YARN-3879 [Storage implementation] Create HDFS backing storage implementation for ATS reads. Contributed by Abhishek Modi. --- .../storage/FileSystemTimelineReaderImpl.java | 87 +++++++++++-------- 1 file changed, 50 insertions(+), 37 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index a0ee2bea3ae..6260c75f692 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.BufferedReader; import java.io.File; -import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStreamReader; @@ -38,6 +37,11 @@ import org.apache.commons.csv.CSVFormat; import org.apache.commons.csv.CSVParser; import org.apache.commons.csv.CSVRecord; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; @@ -68,7 +72,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService private static final Logger LOG = LoggerFactory.getLogger(FileSystemTimelineReaderImpl.class); - private String rootPath; + private FileSystem fs; + private Path rootPath; + private Path entitiesPath; private static final String ENTITIES_DIR = "entities"; /** Default extension for output files. */ @@ -94,7 +100,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService @VisibleForTesting String getRootPath() { - return rootPath; + return rootPath.toString(); } private static ObjectMapper mapper; @@ -162,12 +168,12 @@ public class FileSystemTimelineReaderImpl extends AbstractService if (clusterId == null || appId == null) { throw new IOException("Unable to get flow info"); } - String appFlowMappingFile = rootPath + File.separator + ENTITIES_DIR + - File.separator + clusterId + File.separator + APP_FLOW_MAPPING_FILE; + Path clusterIdPath = new Path(entitiesPath, clusterId); + Path appFlowMappingFilePath = new Path(clusterIdPath, + APP_FLOW_MAPPING_FILE); try (BufferedReader reader = new BufferedReader(new InputStreamReader( - new FileInputStream( - appFlowMappingFile), Charset.forName("UTF-8"))); + fs.open(appFlowMappingFilePath), Charset.forName("UTF-8"))); CSVParser parser = new CSVParser(reader, csvFormat)) { for (CSVRecord record : parser.getRecords()) { if (record.size() < 4) { @@ -266,7 +272,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService return entity; } - private Set getEntities(File dir, String entityType, + private Set getEntities(Path dir, String entityType, TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) throws IOException { // First sort the selected entities based on created/start time. @@ -280,15 +286,18 @@ public class FileSystemTimelineReaderImpl extends AbstractService } ); if (dir != null) { - File[] files = dir.listFiles(); - if (files != null) { - for (File entityFile : files) { + RemoteIterator fileStatuses = fs.listFiles(dir, + false); + if (fileStatuses != null) { + while (fileStatuses.hasNext()) { + LocatedFileStatus locatedFileStatus = fileStatuses.next(); + Path entityFile = locatedFileStatus.getPath(); if (!entityFile.getName() .contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) { continue; } try (BufferedReader reader = new BufferedReader( - new InputStreamReader(new FileInputStream(entityFile), + new InputStreamReader(fs.open(entityFile), Charset.forName("UTF-8")))) { TimelineEntity entity = readEntityFromFile(reader); if (!entity.getType().equals(entityType)) { @@ -366,25 +375,30 @@ public class FileSystemTimelineReaderImpl extends AbstractService @Override public void serviceInit(Configuration conf) throws Exception { - rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, + String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT); + rootPath = new Path(outputRoot); + entitiesPath = new Path(rootPath, ENTITIES_DIR); + fs = rootPath.getFileSystem(conf); super.serviceInit(conf); } @Override public TimelineEntity getEntity(TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) throws IOException { - String flowRunPath = getFlowRunPath(context.getUserId(), + String flowRunPathStr = getFlowRunPath(context.getUserId(), context.getClusterId(), context.getFlowName(), context.getFlowRunId(), context.getAppId()); - File dir = new File(new File(rootPath, ENTITIES_DIR), - context.getClusterId() + File.separator + flowRunPath + File.separator + - context.getAppId() + File.separator + context.getEntityType()); - File entityFile = new File( - dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION); + Path clusterIdPath = new Path(entitiesPath, context.getClusterId()); + Path flowRunPath = new Path(clusterIdPath, flowRunPathStr); + Path appIdPath = new Path(flowRunPath, context.getAppId()); + Path entityTypePath = new Path(appIdPath, context.getEntityType()); + Path entityFilePath = new Path(entityTypePath, + context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION); + try (BufferedReader reader = new BufferedReader(new InputStreamReader( - new FileInputStream(entityFile), Charset.forName("UTF-8")))) { + fs.open(entityFilePath), Charset.forName("UTF-8")))) { TimelineEntity entity = readEntityFromFile(reader); return createEntityToBeReturned( entity, dataToRetrieve.getFieldsToRetrieve()); @@ -399,32 +413,31 @@ public class FileSystemTimelineReaderImpl extends AbstractService public Set getEntities(TimelineReaderContext context, TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) throws IOException { - String flowRunPath = getFlowRunPath(context.getUserId(), + String flowRunPathStr = getFlowRunPath(context.getUserId(), context.getClusterId(), context.getFlowName(), context.getFlowRunId(), context.getAppId()); - File dir = - new File(new File(rootPath, ENTITIES_DIR), - context.getClusterId() + File.separator + flowRunPath + - File.separator + context.getAppId() + File.separator + - context.getEntityType()); - return getEntities(dir, context.getEntityType(), filters, dataToRetrieve); + Path clusterIdPath = new Path(entitiesPath, context.getClusterId()); + Path flowRunPath = new Path(clusterIdPath, flowRunPathStr); + Path appIdPath = new Path(flowRunPath, context.getAppId()); + Path entityTypePath = new Path(appIdPath, context.getEntityType()); + + return getEntities(entityTypePath, context.getEntityType(), filters, + dataToRetrieve); } @Override public Set getEntityTypes(TimelineReaderContext context) throws IOException { Set result = new TreeSet<>(); - String flowRunPath = getFlowRunPath(context.getUserId(), + String flowRunPathStr = getFlowRunPath(context.getUserId(), context.getClusterId(), context.getFlowName(), context.getFlowRunId(), context.getAppId()); - File dir = new File(new File(rootPath, ENTITIES_DIR), - context.getClusterId() + File.separator + flowRunPath - + File.separator + context.getAppId()); - File[] fileList = dir.listFiles(); - if (fileList != null) { - for (File f : fileList) { - if (f.isDirectory()) { - result.add(f.getName()); - } + Path clusterIdPath = new Path(entitiesPath, context.getClusterId()); + Path flowRunPath = new Path(clusterIdPath, flowRunPathStr); + Path appIdPath = new Path(flowRunPath, context.getAppId()); + FileStatus[] fileStatuses = fs.listStatus(appIdPath); + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isDirectory()) { + result.add(fileStatus.getPath().getName()); } } return result;