diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java index c353cf04087..95a008a8753 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/security/TestTimelineAuthFilterForV2.java @@ -44,7 +44,9 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.http.HttpConfig; import org.apache.hadoop.io.Text; import org.apache.hadoop.minikdc.MiniKdc; @@ -144,6 +146,8 @@ public static void setup() { // Setup timeline service v2. try { conf = new Configuration(false); + conf.setClass("fs.file.impl", RawLocalFileSystem.class, + FileSystem.class); conf.setStrings(TimelineAuthenticationFilterInitializer.PREFIX + "type", "kerberos"); conf.set(TimelineAuthenticationFilterInitializer.PREFIX + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index c284f8ffcc6..023d496bed5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -24,6 +24,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -78,6 +79,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService private int fsNumRetries; private long fsRetryInterval; private Path entitiesPath; + private Configuration config; /** default value for storage location on local disk. */ private static final String STORAGE_DIR_ROOT = "timeline_service_data"; @@ -122,17 +124,13 @@ private synchronized void writeInternal(String clusterId, String userId, TimelineEntity entity, TimelineWriteResponse response) throws IOException { - Path clusterIdPath = new Path(entitiesPath, clusterId); - Path userIdPath = new Path(clusterIdPath, userId); - Path flowNamePath = new Path(userIdPath, escape(flowName)); - Path flowVersionPath = new Path(flowNamePath, escape(flowVersion)); - Path flowRunPath = new Path(flowVersionPath, String.valueOf(flowRun)); - Path appIdPath = new Path(flowRunPath, appId); - Path entityTypePath = new Path(appIdPath, entity.getType()); + String entityTypePathStr = clusterId + File.separator + userId + + File.separator + escape(flowName) + File.separator + + escape(flowVersion) + File.separator + flowRun + File.separator + appId + + File.separator + entity.getType(); + Path entityTypePath = new Path(entitiesPath, entityTypePathStr); try { - mkdirs(rootPath, entitiesPath, clusterIdPath, userIdPath, - flowNamePath, flowVersionPath, flowRunPath, appIdPath, - entityTypePath); + mkdirs(entityTypePath); Path filePath = new Path(entityTypePath, entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION); @@ -181,7 +179,8 @@ public void serviceInit(Configuration conf) throws Exception { DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES); fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS, DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS); - fs = rootPath.getFileSystem(getConfig()); + config = conf; + fs = rootPath.getFileSystem(config); } @Override @@ -285,12 +284,15 @@ protected void writeFile(Path outputPath, byte[] data) throws IOException { // final status. try { fsOut = fs.create(tempPath, true); + FSDataInputStream fsIn = fs.open(outputPath); + IOUtils.copyBytes(fsIn, fsOut, config, false); + fsIn.close(); + fs.delete(outputPath, false); fsOut.write(data); fsOut.close(); - fsOut = null; fs.rename(tempPath, outputPath); - } finally { - IOUtils.cleanupWithLogger(LOG, fsOut); + } catch (IOException ie) { + LOG.error("Got an exception while writing file", ie); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 4073b85d1b7..b880b9a6482 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -146,6 +146,125 @@ public void testWriteEntityToFile() throws Exception { } } + @Test + public void testWriteMultipleEntities() throws Exception { + String id = "appId"; + String type = "app"; + + TimelineEntities te1 = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(1425016501000L); + te1.addEntity(entity); + + TimelineEntities te2 = new TimelineEntities(); + TimelineEntity entity2 = new TimelineEntity(); + entity2.setId(id); + entity2.setType(type); + entity2.setCreatedTime(1425016503000L); + te2.addEntity(entity2); + + FileSystemTimelineWriterImpl fsi = null; + try { + fsi = new FileSystemTimelineWriterImpl(); + Configuration conf = new YarnConfiguration(); + String outputRoot = tmpFolder.newFolder().getAbsolutePath(); + conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + outputRoot); + fsi.init(conf); + fsi.start(); + fsi.write( + new TimelineCollectorContext("cluster_id", "user_id", "flow_name", + "flow_version", 12345678L, "app_id"), + te1, UserGroupInformation.createRemoteUser("user_id")); + fsi.write( + new TimelineCollectorContext("cluster_id", "user_id", "flow_name", + "flow_version", 12345678L, "app_id"), + te2, UserGroupInformation.createRemoteUser("user_id")); + + String fileName = outputRoot + File.separator + "entities" + + File.separator + "cluster_id" + File.separator + "user_id" + + File.separator + "flow_name" + File.separator + "flow_version" + + File.separator + "12345678" + File.separator + "app_id" + + File.separator + type + File.separator + id + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + Path path = new Path(fileName); + FileSystem fs = FileSystem.get(conf); + assertTrue("Specified path(" + fileName + ") should exist: ", + fs.exists(path)); + FileStatus fileStatus = fs.getFileStatus(path); + assertTrue("Specified path should be a file", + !fileStatus.isDirectory()); + List data = readFromFile(fs, path); + assertTrue("data size is:" + data.size(), data.size() == 3); + String d = data.get(0); + // confirm the contents same as what was written + assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); + + + String metricToString = data.get(1); + // confirm the contents same as what was written + assertEquals(metricToString, + TimelineUtils.dumpTimelineRecordtoJSON(entity2)); + } finally { + if (fsi != null) { + fsi.close(); + } + } + } + + @Test + public void testWriteEntitiesWithEmptyFlowName() throws Exception { + String id = "appId"; + String type = "app"; + + TimelineEntities te = new TimelineEntities(); + TimelineEntity entity = new TimelineEntity(); + entity.setId(id); + entity.setType(type); + entity.setCreatedTime(1425016501000L); + te.addEntity(entity); + + FileSystemTimelineWriterImpl fsi = null; + try { + fsi = new FileSystemTimelineWriterImpl(); + Configuration conf = new YarnConfiguration(); + String outputRoot = tmpFolder.newFolder().getAbsolutePath(); + conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + outputRoot); + fsi.init(conf); + fsi.start(); + fsi.write( + new TimelineCollectorContext("cluster_id", "user_id", "", + "flow_version", 12345678L, "app_id"), + te, UserGroupInformation.createRemoteUser("user_id")); + + String fileName = outputRoot + File.separator + "entities" + + File.separator + "cluster_id" + File.separator + "user_id" + + File.separator + "" + File.separator + "flow_version" + + File.separator + "12345678" + File.separator + "app_id" + + File.separator + type + File.separator + id + + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; + Path path = new Path(fileName); + FileSystem fs = FileSystem.get(conf); + assertTrue("Specified path(" + fileName + ") should exist: ", + fs.exists(path)); + FileStatus fileStatus = fs.getFileStatus(path); + assertTrue("Specified path should be a file", + !fileStatus.isDirectory()); + List data = readFromFile(fs, path); + assertTrue("data size is:" + data.size(), data.size() == 2); + String d = data.get(0); + // confirm the contents same as what was written + assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); + } finally { + if (fsi != null) { + fsi.close(); + } + } + } + private List readFromFile(FileSystem fs, Path path) throws IOException { BufferedReader br = new BufferedReader(