From 56142171b9528646f26072e022902549a16c8f27 Mon Sep 17 00:00:00 2001 From: Varun Saxena Date: Wed, 13 Jul 2016 21:15:05 +0530 Subject: [PATCH] YARN-5359. FileSystemTimelineReader/Writer uses unix-specific default storage path (Sangjin Lee via Varun Saxena) --- .../TestTimelineServiceClientIntegration.java | 8 ++++ .../storage/FileSystemTimelineReaderImpl.java | 28 +++++++------ .../storage/FileSystemTimelineWriterImpl.java | 16 ++++---- .../reader/TestTimelineReaderWebServices.java | 12 +++++- .../TestFileSystemTimelineReaderImpl.java | 39 ++++++++++++------- .../TestFileSystemTimelineWriterImpl.java | 36 ++++++++++------- 6 files changed, 85 insertions(+), 54 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java index 5a63547e2f0..3ec222fe6ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/TestTimelineServiceClientIntegration.java @@ -24,8 +24,10 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.io.File; import java.io.IOException; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.ExitUtil; @@ -56,6 +58,9 @@ import org.junit.Test; public class TestTimelineServiceClientIntegration { + private static final String ROOT_DIR = new File("target", + TestTimelineServiceClientIntegration.class.getSimpleName()). + getAbsolutePath(); private static NodeTimelineCollectorManager collectorManager; private static PerNodeTimelineCollectorsAuxService auxService; private static Configuration conf; @@ -70,6 +75,8 @@ public static void setupClass() throws Exception { conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS, FileSystemTimelineWriterImpl.class, TimelineWriter.class); + conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + ROOT_DIR); auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0], collectorManager, conf); @@ -84,6 +91,7 @@ public static void tearDownClass() throws Exception { if (auxService != null) { auxService.stop(); } + FileUtils.deleteDirectory(new File(ROOT_DIR)); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java index 047f401ab06..ebb73b3a573 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineReaderImpl.java @@ -77,15 +77,12 @@ public class FileSystemTimelineReaderImpl extends AbstractService /** Default extension for output files. */ static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv"; - @VisibleForTesting /** Config param for timeline service file system storage root. */ - static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = + public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; - @VisibleForTesting /** Default value for storage location on local disk. */ - static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT = - "/tmp/timeline_service_data"; + private static final String STORAGE_DIR_ROOT = "timeline_service_data"; private final CSVFormat csvFormat = CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); @@ -159,13 +156,13 @@ private static void fillFields(TimelineEntity finalEntity, private String getFlowRunPath(String userId, String clusterId, String flowName, Long flowRunId, String appId) throws IOException { if (userId != null && flowName != null && flowRunId != null) { - return userId + "/" + flowName + "/" + flowRunId; + return userId + File.separator + flowName + File.separator + flowRunId; } if (clusterId == null || appId == null) { throw new IOException("Unable to get flow info"); } - String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" + - clusterId + "/" + APP_FLOW_MAPPING_FILE; + String appFlowMappingFile = rootPath + File.separator + ENTITIES_DIR + + File.separator + clusterId + File.separator + APP_FLOW_MAPPING_FILE; try (BufferedReader reader = new BufferedReader(new InputStreamReader( new FileInputStream( @@ -180,8 +177,8 @@ private String getFlowRunPath(String userId, String clusterId, !applicationId.trim().equals(appId)) { continue; } - return record.get(1).trim() + "/" + record.get(2).trim() + "/" + - record.get(3).trim(); + return record.get(1).trim() + File.separator + record.get(2).trim() + + File.separator + record.get(3).trim(); } parser.close(); } @@ -364,7 +361,7 @@ public int compare(Long l1, Long l2) { @Override public void serviceInit(Configuration conf) throws Exception { rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, - DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT); super.serviceInit(conf); } @@ -375,8 +372,8 @@ public TimelineEntity getEntity(TimelineReaderContext context, context.getClusterId(), context.getFlowName(), context.getFlowRunId(), context.getAppId()); File dir = new File(new File(rootPath, ENTITIES_DIR), - context.getClusterId() + "/" + flowRunPath + "/" + context.getAppId() + - "/" + context.getEntityType()); + context.getClusterId() + File.separator + flowRunPath + File.separator + + context.getAppId() + File.separator + context.getEntityType()); File entityFile = new File( dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION); try (BufferedReader reader = @@ -401,8 +398,9 @@ public Set getEntities(TimelineReaderContext context, context.getAppId()); File dir = new File(new File(rootPath, ENTITIES_DIR), - context.getClusterId() + "/" + flowRunPath + "/" + - context.getAppId() + "/" + context.getEntityType()); + context.getClusterId() + File.separator + flowRunPath + + File.separator + context.getAppId() + File.separator + + context.getEntityType()); return getEntities(dir, context.getEntityType(), filters, dataToRetrieve); } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java index 1bb77a0ead9..1f527f24508 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/FileSystemTimelineWriterImpl.java @@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import com.google.common.annotations.VisibleForTesting; + /** * This implements a local file based backend for storing application timeline * information. This implementation may not provide a complete implementation of @@ -53,15 +55,14 @@ public class FileSystemTimelineWriterImpl extends AbstractService public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT = YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; - /** default value for storage location on local disk. */ - public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT - = "/tmp/timeline_service_data"; - public static final String ENTITIES_DIR = "entities"; /** Default extension for output files. */ public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; + /** default value for storage location on local disk. */ + private static final String STORAGE_DIR_ROOT = "timeline_service_data"; + FileSystemTimelineWriterImpl() { super((FileSystemTimelineWriterImpl.class.getName())); } @@ -117,14 +118,15 @@ public TimelineWriteResponse aggregate(TimelineEntity data, } - public String getOutputRoot() { + @VisibleForTesting + String getOutputRoot() { return outputRoot; } @Override public void serviceInit(Configuration conf) throws Exception { outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, - DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT); + conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT); } @Override @@ -140,7 +142,7 @@ public void flush() throws IOException { private static String mkdirs(String... dirStrs) throws IOException { StringBuilder path = new StringBuilder(); for (String dirStr : dirStrs) { - path.append(dirStr).append('/'); + path.append(dirStr).append(File.separatorChar); File dir = new File(path.toString()); if (!dir.exists()) { if (!dir.mkdirs()) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java index 4ade024382a..a5ef66cb2a6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/reader/TestTimelineReaderWebServices.java @@ -23,6 +23,7 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import java.io.File; import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.net.HttpURLConnection; @@ -32,6 +33,7 @@ import javax.ws.rs.core.MediaType; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; @@ -57,17 +59,21 @@ import com.sun.jersey.client.urlconnection.URLConnectionClientHandler; public class TestTimelineReaderWebServices { + + private static final String ROOT_DIR = new File("target", + TestTimelineReaderWebServices.class.getSimpleName()).getAbsolutePath(); + private int serverPort; private TimelineReaderServer server; @BeforeClass public static void setup() throws Exception { - TestFileSystemTimelineReaderImpl.setup(); + TestFileSystemTimelineReaderImpl.initializeDataDirectory(ROOT_DIR); } @AfterClass public static void tearDown() throws Exception { - TestFileSystemTimelineReaderImpl.tearDown(); + FileUtils.deleteDirectory(new File(ROOT_DIR)); } @Before @@ -81,6 +87,8 @@ public void init() throws Exception { config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); config.setClass(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, FileSystemTimelineReaderImpl.class, TimelineReader.class); + config.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + ROOT_DIR); server = new TimelineReaderServer(); server.init(config); server.start(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java index b58bbe358d6..90f11a50a23 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineReaderImpl.java @@ -43,11 +43,11 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; -import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList; import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.junit.AfterClass; @@ -58,17 +58,22 @@ public class TestFileSystemTimelineReaderImpl { - private static final String ROOT_DIR = - FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT; + private static final String ROOT_DIR = new File("target", + TestFileSystemTimelineReaderImpl.class.getSimpleName()).getAbsolutePath(); private FileSystemTimelineReaderImpl reader; @BeforeClass public static void setup() throws Exception { - loadEntityData(); + initializeDataDirectory(ROOT_DIR); + } + + public static void initializeDataDirectory(String rootDir) throws Exception { + loadEntityData(rootDir); // Create app flow mapping file. CSVFormat format = CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN"); - String appFlowMappingFile = ROOT_DIR + "/entities/cluster1/" + + String appFlowMappingFile = rootDir + File.separator + "entities" + + File.separator + "cluster1" + File.separator + FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE; try (PrintWriter out = new PrintWriter(new BufferedWriter( @@ -78,7 +83,7 @@ public static void setup() throws Exception { printer.printRecord("app2", "user1", "flow1,flow", 1); printer.close(); } - (new File(ROOT_DIR)).deleteOnExit(); + (new File(rootDir)).deleteOnExit(); } @AfterClass @@ -102,7 +107,8 @@ private static void writeEntityFile(TimelineEntity entity, File dir) throw new IOException("Could not create directories for " + dir); } } - String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist"; + String fileName = dir.getAbsolutePath() + File.separator + entity.getId() + + ".thist"; try (PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){ out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); @@ -111,9 +117,8 @@ private static void writeEntityFile(TimelineEntity entity, File dir) } } - private static void loadEntityData() throws Exception { - File appDir = new File(ROOT_DIR + - "/entities/cluster1/user1/flow1/1/app1/app/"); + private static void loadEntityData(String rootDir) throws Exception { + File appDir = getAppDir(rootDir, "cluster1", "user1", "flow1", "1", "app1"); TimelineEntity entity11 = new TimelineEntity(); entity11.setId("id_1"); entity11.setType("app"); @@ -254,8 +259,8 @@ private static void loadEntityData() throws Exception { entity4.addEvent(event44); writeEntityFile(entity4, appDir); - File appDir2 = new File(ROOT_DIR + - "/entities/cluster1/user1/flow1,flow/1/app2/app/"); + File appDir2 = + getAppDir(rootDir, "cluster1", "user1", "flow1,flow", "1", "app2"); TimelineEntity entity5 = new TimelineEntity(); entity5.setId("id_5"); entity5.setType("app"); @@ -263,8 +268,12 @@ private static void loadEntityData() throws Exception { writeEntityFile(entity5, appDir2); } - public TimelineReader getTimelineReader() { - return reader; + private static File getAppDir(String rootDir, String cluster, String user, + String flowName, String flowRunId, String appId) { + return new File(rootDir + File.separator + "entities" + File.separator + + cluster + File.separator + user + File.separator + flowName + + File.separator + flowRunId + File.separator + appId + File.separator + + "app" + File.separator); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java index 15be494984a..4f12c57f7bc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestFileSystemTimelineWriterImpl.java @@ -29,16 +29,20 @@ import java.util.List; import java.util.Map; -import org.apache.commons.io.FileUtils; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.util.timeline.TimelineUtils; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; public class TestFileSystemTimelineWriterImpl { + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); /** * Unit test for PoC YARN 3264. @@ -79,14 +83,20 @@ public void testWriteEntityToFile() throws Exception { FileSystemTimelineWriterImpl fsi = null; try { fsi = new FileSystemTimelineWriterImpl(); - fsi.init(new YarnConfiguration()); + Configuration conf = new YarnConfiguration(); + String outputRoot = tmpFolder.newFolder().getAbsolutePath(); + conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT, + outputRoot); + fsi.init(conf); fsi.start(); fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L, "app_id", te); - String fileName = fsi.getOutputRoot() + - "/entities/cluster_id/user_id/flow_name/flow_version/12345678/" + - "app_id/" + type + "/" + id + + String fileName = fsi.getOutputRoot() + File.separator + "entities" + + File.separator + "cluster_id" + File.separator + "user_id" + + File.separator + "flow_name" + File.separator + "flow_version" + + File.separator + "12345678" + File.separator + "app_id" + + File.separator + type + File.separator + id + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; Path path = Paths.get(fileName); File f = new File(fileName); @@ -99,9 +109,11 @@ public void testWriteEntityToFile() throws Exception { assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity)); // verify aggregated metrics - String fileName2 = fsi.getOutputRoot() + - "/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/" - + type2 + "/" + id2 + + String fileName2 = fsi.getOutputRoot() + File.separator + "entities" + + File.separator + "cluster_id" + File.separator + "user_id" + + File.separator + "flow_name" + File.separator + "flow_version" + + File.separator + "12345678" + File.separator + "app_id" + + File.separator + type2 + File.separator + id2 + FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; Path path2 = Paths.get(fileName2); File file = new File(fileName2); @@ -113,15 +125,9 @@ public void testWriteEntityToFile() throws Exception { // confirm the contents same as what was written assertEquals(metricToString, TimelineUtils.dumpTimelineRecordtoJSON(entity2)); - - // delete the directory - File outputDir = new File(fsi.getOutputRoot()); - FileUtils.deleteDirectory(outputDir); - assertTrue(!(f.exists())); } finally { if (fsi != null) { fsi.close(); - FileUtils.deleteDirectory(new File(fsi.getOutputRoot())); } } }