YARN-5359. FileSystemTimelineReader/Writer uses unix-specific default storage path (Sangjin Lee via Varun Saxena)

This commit is contained in:
Varun Saxena 2016-07-13 21:15:05 +05:30
parent d6d41e820a
commit 56142171b9
6 changed files with 85 additions and 54 deletions

View File

@ -24,8 +24,10 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ExitUtil;
@ -56,6 +58,9 @@
import org.junit.Test;
public class TestTimelineServiceClientIntegration {
private static final String ROOT_DIR = new File("target",
TestTimelineServiceClientIntegration.class.getSimpleName()).
getAbsolutePath();
private static NodeTimelineCollectorManager collectorManager;
private static PerNodeTimelineCollectorsAuxService auxService;
private static Configuration conf;
@ -70,6 +75,8 @@ public static void setupClass() throws Exception {
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
ROOT_DIR);
auxService =
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
collectorManager, conf);
@ -84,6 +91,7 @@ public static void tearDownClass() throws Exception {
if (auxService != null) {
auxService.stop();
}
FileUtils.deleteDirectory(new File(ROOT_DIR));
}
@Test

View File

@ -77,15 +77,12 @@ public class FileSystemTimelineReaderImpl extends AbstractService
/** Default extension for output files. */
static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv";
@VisibleForTesting
/** Config param for timeline service file system storage root. */
static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
@VisibleForTesting
/** Default value for storage location on local disk. */
static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT =
"/tmp/timeline_service_data";
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
private final CSVFormat csvFormat =
CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
@ -159,13 +156,13 @@ private static void fillFields(TimelineEntity finalEntity,
private String getFlowRunPath(String userId, String clusterId,
String flowName, Long flowRunId, String appId) throws IOException {
if (userId != null && flowName != null && flowRunId != null) {
return userId + "/" + flowName + "/" + flowRunId;
return userId + File.separator + flowName + File.separator + flowRunId;
}
if (clusterId == null || appId == null) {
throw new IOException("Unable to get flow info");
}
String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" +
clusterId + "/" + APP_FLOW_MAPPING_FILE;
String appFlowMappingFile = rootPath + File.separator + ENTITIES_DIR +
File.separator + clusterId + File.separator + APP_FLOW_MAPPING_FILE;
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(
new FileInputStream(
@ -180,8 +177,8 @@ private String getFlowRunPath(String userId, String clusterId,
!applicationId.trim().equals(appId)) {
continue;
}
return record.get(1).trim() + "/" + record.get(2).trim() + "/" +
record.get(3).trim();
return record.get(1).trim() + File.separator + record.get(2).trim() +
File.separator + record.get(3).trim();
}
parser.close();
}
@ -364,7 +361,7 @@ public int compare(Long l1, Long l2) {
@Override
public void serviceInit(Configuration conf) throws Exception {
rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
super.serviceInit(conf);
}
@ -375,8 +372,8 @@ public TimelineEntity getEntity(TimelineReaderContext context,
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
context.getAppId());
File dir = new File(new File(rootPath, ENTITIES_DIR),
context.getClusterId() + "/" + flowRunPath + "/" + context.getAppId() +
"/" + context.getEntityType());
context.getClusterId() + File.separator + flowRunPath + File.separator +
context.getAppId() + File.separator + context.getEntityType());
File entityFile = new File(
dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
try (BufferedReader reader =
@ -401,8 +398,9 @@ public Set<TimelineEntity> getEntities(TimelineReaderContext context,
context.getAppId());
File dir =
new File(new File(rootPath, ENTITIES_DIR),
context.getClusterId() + "/" + flowRunPath + "/" +
context.getAppId() + "/" + context.getEntityType());
context.getClusterId() + File.separator + flowRunPath +
File.separator + context.getAppId() + File.separator +
context.getEntityType());
return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
}
}

View File

@ -36,6 +36,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting;
/**
* This implements a local file based backend for storing application timeline
* information. This implementation may not provide a complete implementation of
@ -53,15 +55,14 @@ public class FileSystemTimelineWriterImpl extends AbstractService
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
= YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
/** default value for storage location on local disk. */
public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
= "/tmp/timeline_service_data";
public static final String ENTITIES_DIR = "entities";
/** Default extension for output files. */
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
/** default value for storage location on local disk. */
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
FileSystemTimelineWriterImpl() {
super((FileSystemTimelineWriterImpl.class.getName()));
}
@ -117,14 +118,15 @@ public TimelineWriteResponse aggregate(TimelineEntity data,
}
public String getOutputRoot() {
@VisibleForTesting
String getOutputRoot() {
return outputRoot;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
}
@Override
@ -140,7 +142,7 @@ public void flush() throws IOException {
private static String mkdirs(String... dirStrs) throws IOException {
StringBuilder path = new StringBuilder();
for (String dirStr : dirStrs) {
path.append(dirStr).append('/');
path.append(dirStr).append(File.separatorChar);
File dir = new File(path.toString());
if (!dir.exists()) {
if (!dir.mkdirs()) {

View File

@ -23,6 +23,7 @@
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.HttpURLConnection;
@ -32,6 +33,7 @@
import javax.ws.rs.core.MediaType;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@ -57,17 +59,21 @@
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
public class TestTimelineReaderWebServices {
private static final String ROOT_DIR = new File("target",
TestTimelineReaderWebServices.class.getSimpleName()).getAbsolutePath();
private int serverPort;
private TimelineReaderServer server;
@BeforeClass
public static void setup() throws Exception {
TestFileSystemTimelineReaderImpl.setup();
TestFileSystemTimelineReaderImpl.initializeDataDirectory(ROOT_DIR);
}
@AfterClass
public static void tearDown() throws Exception {
TestFileSystemTimelineReaderImpl.tearDown();
FileUtils.deleteDirectory(new File(ROOT_DIR));
}
@Before
@ -81,6 +87,8 @@ public void init() throws Exception {
config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
config.setClass(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
FileSystemTimelineReaderImpl.class, TimelineReader.class);
config.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
ROOT_DIR);
server = new TimelineReaderServer();
server.init(config);
server.start();

View File

@ -43,11 +43,11 @@
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.AfterClass;
@ -58,17 +58,22 @@
public class TestFileSystemTimelineReaderImpl {
private static final String ROOT_DIR =
FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
private static final String ROOT_DIR = new File("target",
TestFileSystemTimelineReaderImpl.class.getSimpleName()).getAbsolutePath();
private FileSystemTimelineReaderImpl reader;
@BeforeClass
public static void setup() throws Exception {
loadEntityData();
initializeDataDirectory(ROOT_DIR);
}
public static void initializeDataDirectory(String rootDir) throws Exception {
loadEntityData(rootDir);
// Create app flow mapping file.
CSVFormat format =
CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
String appFlowMappingFile = ROOT_DIR + "/entities/cluster1/" +
String appFlowMappingFile = rootDir + File.separator + "entities" +
File.separator + "cluster1" + File.separator +
FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE;
try (PrintWriter out =
new PrintWriter(new BufferedWriter(
@ -78,7 +83,7 @@ public static void setup() throws Exception {
printer.printRecord("app2", "user1", "flow1,flow", 1);
printer.close();
}
(new File(ROOT_DIR)).deleteOnExit();
(new File(rootDir)).deleteOnExit();
}
@AfterClass
@ -102,7 +107,8 @@ private static void writeEntityFile(TimelineEntity entity, File dir)
throw new IOException("Could not create directories for " + dir);
}
}
String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist";
String fileName = dir.getAbsolutePath() + File.separator + entity.getId() +
".thist";
try (PrintWriter out =
new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){
out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
@ -111,9 +117,8 @@ private static void writeEntityFile(TimelineEntity entity, File dir)
}
}
private static void loadEntityData() throws Exception {
File appDir = new File(ROOT_DIR +
"/entities/cluster1/user1/flow1/1/app1/app/");
private static void loadEntityData(String rootDir) throws Exception {
File appDir = getAppDir(rootDir, "cluster1", "user1", "flow1", "1", "app1");
TimelineEntity entity11 = new TimelineEntity();
entity11.setId("id_1");
entity11.setType("app");
@ -254,8 +259,8 @@ private static void loadEntityData() throws Exception {
entity4.addEvent(event44);
writeEntityFile(entity4, appDir);
File appDir2 = new File(ROOT_DIR +
"/entities/cluster1/user1/flow1,flow/1/app2/app/");
File appDir2 =
getAppDir(rootDir, "cluster1", "user1", "flow1,flow", "1", "app2");
TimelineEntity entity5 = new TimelineEntity();
entity5.setId("id_5");
entity5.setType("app");
@ -263,8 +268,12 @@ private static void loadEntityData() throws Exception {
writeEntityFile(entity5, appDir2);
}
public TimelineReader getTimelineReader() {
return reader;
private static File getAppDir(String rootDir, String cluster, String user,
String flowName, String flowRunId, String appId) {
return new File(rootDir + File.separator + "entities" + File.separator +
cluster + File.separator + user + File.separator + flowName +
File.separator + flowRunId + File.separator + appId + File.separator +
"app" + File.separator);
}
@Test

View File

@ -29,16 +29,20 @@
import java.util.List;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class TestFileSystemTimelineWriterImpl {
@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();
/**
* Unit test for PoC YARN 3264.
@ -79,14 +83,20 @@ public void testWriteEntityToFile() throws Exception {
FileSystemTimelineWriterImpl fsi = null;
try {
fsi = new FileSystemTimelineWriterImpl();
fsi.init(new YarnConfiguration());
Configuration conf = new YarnConfiguration();
String outputRoot = tmpFolder.newFolder().getAbsolutePath();
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
outputRoot);
fsi.init(conf);
fsi.start();
fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L,
"app_id", te);
String fileName = fsi.getOutputRoot() +
"/entities/cluster_id/user_id/flow_name/flow_version/12345678/" +
"app_id/" + type + "/" + id +
String fileName = fsi.getOutputRoot() + File.separator + "entities" +
File.separator + "cluster_id" + File.separator + "user_id" +
File.separator + "flow_name" + File.separator + "flow_version" +
File.separator + "12345678" + File.separator + "app_id" +
File.separator + type + File.separator + id +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path = Paths.get(fileName);
File f = new File(fileName);
@ -99,9 +109,11 @@ public void testWriteEntityToFile() throws Exception {
assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
// verify aggregated metrics
String fileName2 = fsi.getOutputRoot() +
"/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/"
+ type2 + "/" + id2 +
String fileName2 = fsi.getOutputRoot() + File.separator + "entities" +
File.separator + "cluster_id" + File.separator + "user_id" +
File.separator + "flow_name" + File.separator + "flow_version" +
File.separator + "12345678" + File.separator + "app_id" +
File.separator + type2 + File.separator + id2 +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path2 = Paths.get(fileName2);
File file = new File(fileName2);
@ -113,15 +125,9 @@ public void testWriteEntityToFile() throws Exception {
// confirm the contents same as what was written
assertEquals(metricToString,
TimelineUtils.dumpTimelineRecordtoJSON(entity2));
// delete the directory
File outputDir = new File(fsi.getOutputRoot());
FileUtils.deleteDirectory(outputDir);
assertTrue(!(f.exists()));
} finally {
if (fsi != null) {
fsi.close();
FileUtils.deleteDirectory(new File(fsi.getOutputRoot()));
}
}
}