diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index ac0902fc2b2..c284f8ffcc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -18,16 +18,16 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; -import java.io.BufferedWriter; import java.io.File; -import java.io.FileOutputStream; import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.PrintWriter; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; @@ -35,14 +35,17 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError; +import org.apache.hadoop.yarn.client.api.impl.FileSystemTimelineWriter; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** - * This implements a local file based backend for storing application timeline + * This implements a FileSystem based backend for storing application timeline * information. This implementation may not provide a complete implementation of * all the necessary features. This implementation is provided solely for basic * testing purposes, and should not be used in a non-test situation. @@ -52,20 +55,36 @@ import com.google.common.annotations.VisibleForTesting; public class FileSystemTimelineWriterImpl extends AbstractService implements TimelineWriter { - private String outputRoot; - /** Config param for timeline service storage tmp root for FILE YARN-3264. */ - public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT - = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; + public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; + + public static final String TIMELINE_FS_WRITER_NUM_RETRIES = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.num-retries"; + public static final int DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES = 0; + + public static final String TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = + YarnConfiguration.TIMELINE_SERVICE_PREFIX + + "fs-writer.retry-interval-ms"; + public static final long DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = 1000L; public static final String ENTITIES_DIR = "entities"; /** Default extension for output files. */ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; + private FileSystem fs; + private Path rootPath; + private int fsNumRetries; + private long fsRetryInterval; + private Path entitiesPath; + /** default value for storage location on local disk. */ private static final String STORAGE_DIR_ROOT = "timeline_service_data"; + private static final Logger LOG = + LoggerFactory.getLogger(FileSystemTimelineWriter.class); + FileSystemTimelineWriterImpl() { super((FileSystemTimelineWriterImpl.class.getName())); } @@ -83,8 +102,8 @@ public class FileSystemTimelineWriterImpl extends AbstractService String appId = context.getAppId(); for (TimelineEntity entity : entities.getEntities()) { - write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, - response); + writeInternal(clusterId, userId, flowName, flowVersion, + flowRunId, appId, entity, response); } return response; } @@ -97,59 +116,78 @@ public class FileSystemTimelineWriterImpl extends AbstractService return null; } - private synchronized void write(String clusterId, String userId, - String flowName, String flowVersion, long flowRun, String appId, - TimelineEntity entity, TimelineWriteResponse response) - throws IOException { - PrintWriter out = null; + private synchronized void writeInternal(String clusterId, String userId, + String flowName, String flowVersion, + long flowRun, String appId, + TimelineEntity entity, + TimelineWriteResponse response) + throws IOException { + Path clusterIdPath = new Path(entitiesPath, clusterId); + Path userIdPath = new Path(clusterIdPath, userId); + Path flowNamePath = new Path(userIdPath, escape(flowName)); + Path flowVersionPath = new Path(flowNamePath, escape(flowVersion)); + Path flowRunPath = new Path(flowVersionPath, String.valueOf(flowRun)); + Path appIdPath = new Path(flowRunPath, appId); + Path entityTypePath = new Path(appIdPath, entity.getType()); try { - String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId, - escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId, - entity.getType()); - String fileName = dir + entity.getId() + - TIMELINE_SERVICE_STORAGE_EXTENSION; - out = - new PrintWriter(new BufferedWriter(new OutputStreamWriter( - new FileOutputStream(fileName, true), "UTF-8"))); - out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); - out.write("\n"); - } catch (IOException ioe) { - TimelineWriteError error = new TimelineWriteError(); - error.setEntityId(entity.getId()); - error.setEntityType(entity.getType()); + mkdirs(rootPath, entitiesPath, clusterIdPath, userIdPath, + flowNamePath, flowVersionPath, flowRunPath, appIdPath, + entityTypePath); + Path filePath = + new Path(entityTypePath, + entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION); + createFileWithRetries(filePath); + + byte[] record = new StringBuilder() + .append(TimelineUtils.dumpTimelineRecordtoJSON(entity)) + .append("\n").toString().getBytes("UTF-8"); + writeFileWithRetries(filePath, record); + } catch (Exception ioe) { + LOG.warn("Interrupted operation:" + ioe.getMessage()); + TimelineWriteError error = createTimelineWriteError(entity); /* * TODO: set an appropriate error code after PoC could possibly be: * error.setErrorCode(TimelineWriteError.IO_EXCEPTION); */ response.addError(error); - } finally { - if (out != null) { - out.close(); - } } } + private TimelineWriteError createTimelineWriteError(TimelineEntity entity) { + TimelineWriteError error = new TimelineWriteError(); + error.setEntityId(entity.getId()); + error.setEntityType(entity.getType()); + return error; + } + @Override public TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException { return null; - } @VisibleForTesting String getOutputRoot() { - return outputRoot; + return rootPath.toString(); } @Override public void serviceInit(Configuration conf) throws Exception { - outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, + String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT); + rootPath = new Path(outputRoot); + entitiesPath = new Path(rootPath, ENTITIES_DIR); + fsNumRetries = conf.getInt(TIMELINE_FS_WRITER_NUM_RETRIES, + DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES); + fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS, + DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS); + fs = rootPath.getFileSystem(getConfig()); } @Override public void serviceStart() throws Exception { - mkdirs(outputRoot, ENTITIES_DIR); + mkdirsWithRetries(rootPath); + mkdirsWithRetries(entitiesPath); } @Override @@ -157,18 +195,103 @@ public class FileSystemTimelineWriterImpl extends AbstractService // no op } - private static String mkdirs(String... dirStrs) throws IOException { - StringBuilder path = new StringBuilder(); - for (String dirStr : dirStrs) { - path.append(dirStr).append(File.separatorChar); - File dir = new File(path.toString()); - if (!dir.exists()) { - if (!dir.mkdirs()) { - throw new IOException("Could not create directories for " + dir); + private void mkdirs(Path... paths) throws IOException, InterruptedException { + for (Path path: paths) { + if (!existsWithRetries(path)) { + mkdirsWithRetries(path); + } + } + } + + // Code from FSRMStateStore. + private void mkdirsWithRetries(final Path dirPath) + throws IOException, InterruptedException { + new FSAction() { + @Override + public Void run() throws IOException { + fs.mkdirs(dirPath); + return null; + } + }.runWithRetries(); + } + + private void writeFileWithRetries(final Path outputPath, final byte[] data) + throws Exception { + new FSAction() { + @Override + public Void run() throws IOException { + writeFile(outputPath, data); + return null; + } + }.runWithRetries(); + } + + private boolean createFileWithRetries(final Path newFile) + throws IOException, InterruptedException { + return new FSAction() { + @Override + public Boolean run() throws IOException { + return createFile(newFile); + } + }.runWithRetries(); + } + + private boolean existsWithRetries(final Path path) + throws IOException, InterruptedException { + return new FSAction() { + @Override + public Boolean run() throws IOException { + return fs.exists(path); + } + }.runWithRetries(); + } + + private abstract class FSAction { + abstract T run() throws IOException; + + T runWithRetries() throws IOException, InterruptedException { + int retry = 0; + while (true) { + try { + return run(); + } catch (IOException e) { + LOG.info("Exception while executing a FS operation.", e); + if (++retry > fsNumRetries) { + LOG.info("Maxed out FS retries. Giving up!"); + throw e; + } + LOG.info("Will retry operation on FS. Retry no. " + retry + + " after sleeping for " + fsRetryInterval + " seconds"); + Thread.sleep(fsRetryInterval); } } } - return path.toString(); + } + + private boolean createFile(Path newFile) throws IOException { + return fs.createNewFile(newFile); + } + + /** + * In order to make this writeInternal atomic as a part of writeInternal + * we will first writeInternal data to .tmp file and then rename it. + * Here we are assuming that rename is atomic for underlying file system. + */ + protected void writeFile(Path outputPath, byte[] data) throws IOException { + Path tempPath = + new Path(outputPath.getParent(), outputPath.getName() + ".tmp"); + FSDataOutputStream fsOut = null; + // This file will be overwritten when app/attempt finishes for saving the + // final status. + try { + fsOut = fs.create(tempPath, true); + fsOut.write(data); + fsOut.close(); + fsOut = null; + fs.rename(tempPath, outputPath); + } finally { + IOUtils.cleanupWithLogger(LOG, fsOut); + } } // specifically escape the separator character diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index bb9f54f9379..4073b85d1b7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -20,16 +20,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import java.io.BufferedReader; import java.io.File; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -96,16 +99,20 @@ public class TestFileSystemTimelineWriterImpl { "flow_version", 12345678L, "app_id"), te, UserGroupInformation.createRemoteUser("user_id")); - String fileName = fsi.getOutputRoot() + File.separator + "entities" + + String fileName = outputRoot + File.separator + "entities" + File.separator + "cluster_id" + File.separator + "user_id" + File.separator + "flow_name" + File.separator + "flow_version" + File.separator + "12345678" + File.separator + "app_id" + File.separator + type + File.separator + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - Path path = Paths.get(fileName); - File f = new File(fileName); - assertTrue(f.exists() && !f.isDirectory()); - List data = Files.readAllLines(path, StandardCharsets.UTF_8); + Path path = new Path(fileName); + FileSystem fs = FileSystem.get(conf); + assertTrue("Specified path(" + fileName + ") should exist: ", + fs.exists(path)); + FileStatus fileStatus = fs.getFileStatus(path); + assertTrue("Specified path should be a file", + !fileStatus.isDirectory()); + List data = readFromFile(fs, path); // ensure there's only one entity + 1 new line assertTrue("data size is:" + data.size(), data.size() == 2); String d = data.get(0); @@ -119,12 +126,15 @@ public class TestFileSystemTimelineWriterImpl { File.separator + "12345678" + File.separator + "app_id" + File.separator + type2 + File.separator + id2 + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; - Path path2 = Paths.get(fileName2); - File file = new File(fileName2); - assertTrue(file.exists() && !file.isDirectory()); - List data2 = Files.readAllLines(path2, StandardCharsets.UTF_8); + Path path2 = new Path(fileName2); + assertTrue("Specified path(" + fileName + ") should exist: ", + fs.exists(path2)); + FileStatus fileStatus2 = fs.getFileStatus(path2); + assertTrue("Specified path should be a file", + !fileStatus2.isDirectory()); + List data2 = readFromFile(fs, path2); // ensure there's only one entity + 1 new line - assertTrue("data size is:" + data.size(), data2.size() == 2); + assertTrue("data size is:" + data2.size(), data2.size() == 2); String metricToString = data2.get(0); // confirm the contents same as what was written assertEquals(metricToString, @@ -136,4 +146,17 @@ public class TestFileSystemTimelineWriterImpl { } } + private List readFromFile(FileSystem fs, Path path) + throws IOException { + BufferedReader br = new BufferedReader( + new InputStreamReader(fs.open(path))); + List data = new ArrayList<>(); + String line = br.readLine(); + data.add(line); + while(line != null) { + line = br.readLine(); + data.add(line); + } + return data; + } }