YARN-3879 [Storage implementation] Create HDFS backing storage implementation for ATS reads. Contributed by Abhishek Modi.
This commit is contained in:
parent
74db993a61
commit
bca928d3c7
|
@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
|
|||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStreamReader;
|
||||
|
@ -41,6 +40,11 @@ import org.apache.commons.csv.CSVFormat;
|
|||
import org.apache.commons.csv.CSVParser;
|
||||
import org.apache.commons.csv.CSVRecord;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileStatus;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.LocatedFileStatus;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.RemoteIterator;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
|
@ -68,7 +72,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(FileSystemTimelineReaderImpl.class);
|
||||
|
||||
private String rootPath;
|
||||
private FileSystem fs;
|
||||
private Path rootPath;
|
||||
private Path entitiesPath;
|
||||
private static final String ENTITIES_DIR = "entities";
|
||||
|
||||
/** Default extension for output files. */
|
||||
|
@ -94,7 +100,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
|
||||
@VisibleForTesting
|
||||
String getRootPath() {
|
||||
return rootPath;
|
||||
return rootPath.toString();
|
||||
}
|
||||
|
||||
private static ObjectMapper mapper;
|
||||
|
@ -162,12 +168,12 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
if (clusterId == null || appId == null) {
|
||||
throw new IOException("Unable to get flow info");
|
||||
}
|
||||
String appFlowMappingFile = rootPath + File.separator + ENTITIES_DIR +
|
||||
File.separator + clusterId + File.separator + APP_FLOW_MAPPING_FILE;
|
||||
Path clusterIdPath = new Path(entitiesPath, clusterId);
|
||||
Path appFlowMappingFilePath = new Path(clusterIdPath,
|
||||
APP_FLOW_MAPPING_FILE);
|
||||
try (BufferedReader reader =
|
||||
new BufferedReader(new InputStreamReader(
|
||||
new FileInputStream(
|
||||
appFlowMappingFile), Charset.forName("UTF-8")));
|
||||
fs.open(appFlowMappingFilePath), Charset.forName("UTF-8")));
|
||||
CSVParser parser = new CSVParser(reader, csvFormat)) {
|
||||
for (CSVRecord record : parser.getRecords()) {
|
||||
if (record.size() < 4) {
|
||||
|
@ -266,7 +272,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
return entity;
|
||||
}
|
||||
|
||||
private Set<TimelineEntity> getEntities(File dir, String entityType,
|
||||
private Set<TimelineEntity> getEntities(Path dir, String entityType,
|
||||
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
|
||||
throws IOException {
|
||||
// First sort the selected entities based on created/start time.
|
||||
|
@ -280,15 +286,18 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
}
|
||||
);
|
||||
if (dir != null) {
|
||||
File[] files = dir.listFiles();
|
||||
if (files != null) {
|
||||
for (File entityFile : files) {
|
||||
RemoteIterator<LocatedFileStatus> fileStatuses = fs.listFiles(dir,
|
||||
false);
|
||||
if (fileStatuses != null) {
|
||||
while (fileStatuses.hasNext()) {
|
||||
LocatedFileStatus locatedFileStatus = fileStatuses.next();
|
||||
Path entityFile = locatedFileStatus.getPath();
|
||||
if (!entityFile.getName()
|
||||
.contains(TIMELINE_SERVICE_STORAGE_EXTENSION)) {
|
||||
continue;
|
||||
}
|
||||
try (BufferedReader reader = new BufferedReader(
|
||||
new InputStreamReader(new FileInputStream(entityFile),
|
||||
new InputStreamReader(fs.open(entityFile),
|
||||
Charset.forName("UTF-8")))) {
|
||||
TimelineEntity entity = readEntityFromFile(reader);
|
||||
if (!entity.getType().equals(entityType)) {
|
||||
|
@ -366,25 +375,30 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||
String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
|
||||
rootPath = new Path(outputRoot);
|
||||
entitiesPath = new Path(rootPath, ENTITIES_DIR);
|
||||
fs = rootPath.getFileSystem(conf);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimelineEntity getEntity(TimelineReaderContext context,
|
||||
TimelineDataToRetrieve dataToRetrieve) throws IOException {
|
||||
String flowRunPath = getFlowRunPath(context.getUserId(),
|
||||
String flowRunPathStr = getFlowRunPath(context.getUserId(),
|
||||
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
|
||||
context.getAppId());
|
||||
File dir = new File(new File(rootPath, ENTITIES_DIR),
|
||||
context.getClusterId() + File.separator + flowRunPath + File.separator +
|
||||
context.getAppId() + File.separator + context.getEntityType());
|
||||
File entityFile = new File(
|
||||
dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
|
||||
Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
|
||||
Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
|
||||
Path appIdPath = new Path(flowRunPath, context.getAppId());
|
||||
Path entityTypePath = new Path(appIdPath, context.getEntityType());
|
||||
Path entityFilePath = new Path(entityTypePath,
|
||||
context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
|
||||
|
||||
try (BufferedReader reader =
|
||||
new BufferedReader(new InputStreamReader(
|
||||
new FileInputStream(entityFile), Charset.forName("UTF-8")))) {
|
||||
fs.open(entityFilePath), Charset.forName("UTF-8")))) {
|
||||
TimelineEntity entity = readEntityFromFile(reader);
|
||||
return createEntityToBeReturned(
|
||||
entity, dataToRetrieve.getFieldsToRetrieve());
|
||||
|
@ -399,32 +413,31 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
public Set<TimelineEntity> getEntities(TimelineReaderContext context,
|
||||
TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve)
|
||||
throws IOException {
|
||||
String flowRunPath = getFlowRunPath(context.getUserId(),
|
||||
String flowRunPathStr = getFlowRunPath(context.getUserId(),
|
||||
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
|
||||
context.getAppId());
|
||||
File dir =
|
||||
new File(new File(rootPath, ENTITIES_DIR),
|
||||
context.getClusterId() + File.separator + flowRunPath +
|
||||
File.separator + context.getAppId() + File.separator +
|
||||
context.getEntityType());
|
||||
return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
|
||||
Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
|
||||
Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
|
||||
Path appIdPath = new Path(flowRunPath, context.getAppId());
|
||||
Path entityTypePath = new Path(appIdPath, context.getEntityType());
|
||||
|
||||
return getEntities(entityTypePath, context.getEntityType(), filters,
|
||||
dataToRetrieve);
|
||||
}
|
||||
|
||||
@Override public Set<String> getEntityTypes(TimelineReaderContext context)
|
||||
throws IOException {
|
||||
Set<String> result = new TreeSet<>();
|
||||
String flowRunPath = getFlowRunPath(context.getUserId(),
|
||||
String flowRunPathStr = getFlowRunPath(context.getUserId(),
|
||||
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
|
||||
context.getAppId());
|
||||
File dir = new File(new File(rootPath, ENTITIES_DIR),
|
||||
context.getClusterId() + File.separator + flowRunPath
|
||||
+ File.separator + context.getAppId());
|
||||
File[] fileList = dir.listFiles();
|
||||
if (fileList != null) {
|
||||
for (File f : fileList) {
|
||||
if (f.isDirectory()) {
|
||||
result.add(f.getName());
|
||||
}
|
||||
Path clusterIdPath = new Path(entitiesPath, context.getClusterId());
|
||||
Path flowRunPath = new Path(clusterIdPath, flowRunPathStr);
|
||||
Path appIdPath = new Path(flowRunPath, context.getAppId());
|
||||
FileStatus[] fileStatuses = fs.listStatus(appIdPath);
|
||||
for (FileStatus fileStatus : fileStatuses) {
|
||||
if (fileStatus.isDirectory()) {
|
||||
result.add(fileStatus.getPath().getName());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
|
|
Loading…
Reference in New Issue