YARN-5359. FileSystemTimelineReader/Writer uses unix-specific default storage path (Sangjin Lee via Varun Saxena)
(cherry picked from commit 56142171b9
)
This commit is contained in:
parent
0820df611d
commit
057a4728ff
|
@ -24,8 +24,10 @@ import static org.mockito.Matchers.any;
|
|||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.util.ExitUtil;
|
||||
|
@ -56,6 +58,9 @@ import org.junit.BeforeClass;
|
|||
import org.junit.Test;
|
||||
|
||||
public class TestTimelineServiceClientIntegration {
|
||||
private static final String ROOT_DIR = new File("target",
|
||||
TestTimelineServiceClientIntegration.class.getSimpleName()).
|
||||
getAbsolutePath();
|
||||
private static NodeTimelineCollectorManager collectorManager;
|
||||
private static PerNodeTimelineCollectorsAuxService auxService;
|
||||
private static Configuration conf;
|
||||
|
@ -70,6 +75,8 @@ public class TestTimelineServiceClientIntegration {
|
|||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
|
||||
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||
ROOT_DIR);
|
||||
auxService =
|
||||
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
|
||||
collectorManager, conf);
|
||||
|
@ -84,6 +91,7 @@ public class TestTimelineServiceClientIntegration {
|
|||
if (auxService != null) {
|
||||
auxService.stop();
|
||||
}
|
||||
FileUtils.deleteDirectory(new File(ROOT_DIR));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -77,15 +77,12 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
/** Default extension for output files. */
|
||||
static final String APP_FLOW_MAPPING_FILE = "app_flow_mapping.csv";
|
||||
|
||||
@VisibleForTesting
|
||||
/** Config param for timeline service file system storage root. */
|
||||
static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
|
||||
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
|
||||
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
|
||||
|
||||
@VisibleForTesting
|
||||
/** Default value for storage location on local disk. */
|
||||
static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT =
|
||||
"/tmp/timeline_service_data";
|
||||
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
|
||||
|
||||
private final CSVFormat csvFormat =
|
||||
CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
|
||||
|
@ -159,13 +156,13 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
private String getFlowRunPath(String userId, String clusterId,
|
||||
String flowName, Long flowRunId, String appId) throws IOException {
|
||||
if (userId != null && flowName != null && flowRunId != null) {
|
||||
return userId + "/" + flowName + "/" + flowRunId;
|
||||
return userId + File.separator + flowName + File.separator + flowRunId;
|
||||
}
|
||||
if (clusterId == null || appId == null) {
|
||||
throw new IOException("Unable to get flow info");
|
||||
}
|
||||
String appFlowMappingFile = rootPath + "/" + ENTITIES_DIR + "/" +
|
||||
clusterId + "/" + APP_FLOW_MAPPING_FILE;
|
||||
String appFlowMappingFile = rootPath + File.separator + ENTITIES_DIR +
|
||||
File.separator + clusterId + File.separator + APP_FLOW_MAPPING_FILE;
|
||||
try (BufferedReader reader =
|
||||
new BufferedReader(new InputStreamReader(
|
||||
new FileInputStream(
|
||||
|
@ -180,8 +177,8 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
!applicationId.trim().equals(appId)) {
|
||||
continue;
|
||||
}
|
||||
return record.get(1).trim() + "/" + record.get(2).trim() + "/" +
|
||||
record.get(3).trim();
|
||||
return record.get(1).trim() + File.separator + record.get(2).trim() +
|
||||
File.separator + record.get(3).trim();
|
||||
}
|
||||
parser.close();
|
||||
}
|
||||
|
@ -364,7 +361,7 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
rootPath = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||
DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
|
||||
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
|
@ -375,8 +372,8 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
context.getClusterId(), context.getFlowName(), context.getFlowRunId(),
|
||||
context.getAppId());
|
||||
File dir = new File(new File(rootPath, ENTITIES_DIR),
|
||||
context.getClusterId() + "/" + flowRunPath + "/" + context.getAppId() +
|
||||
"/" + context.getEntityType());
|
||||
context.getClusterId() + File.separator + flowRunPath + File.separator +
|
||||
context.getAppId() + File.separator + context.getEntityType());
|
||||
File entityFile = new File(
|
||||
dir, context.getEntityId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
|
||||
try (BufferedReader reader =
|
||||
|
@ -401,8 +398,9 @@ public class FileSystemTimelineReaderImpl extends AbstractService
|
|||
context.getAppId());
|
||||
File dir =
|
||||
new File(new File(rootPath, ENTITIES_DIR),
|
||||
context.getClusterId() + "/" + flowRunPath + "/" +
|
||||
context.getAppId() + "/" + context.getEntityType());
|
||||
context.getClusterId() + File.separator + flowRunPath +
|
||||
File.separator + context.getAppId() + File.separator +
|
||||
context.getEntityType());
|
||||
return getEntities(dir, context.getEntityType(), filters, dataToRetrieve);
|
||||
}
|
||||
}
|
|
@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.
|
|||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* This implements a local file based backend for storing application timeline
|
||||
* information. This implementation may not provide a complete implementation of
|
||||
|
@ -53,15 +55,14 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
|||
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
|
||||
= YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
|
||||
|
||||
/** default value for storage location on local disk. */
|
||||
public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
|
||||
= "/tmp/timeline_service_data";
|
||||
|
||||
public static final String ENTITIES_DIR = "entities";
|
||||
|
||||
/** Default extension for output files. */
|
||||
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
|
||||
|
||||
/** default value for storage location on local disk. */
|
||||
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
|
||||
|
||||
FileSystemTimelineWriterImpl() {
|
||||
super((FileSystemTimelineWriterImpl.class.getName()));
|
||||
}
|
||||
|
@ -117,14 +118,15 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
|||
|
||||
}
|
||||
|
||||
public String getOutputRoot() {
|
||||
@VisibleForTesting
|
||||
String getOutputRoot() {
|
||||
return outputRoot;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void serviceInit(Configuration conf) throws Exception {
|
||||
outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||
DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
|
||||
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -140,7 +142,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
|||
private static String mkdirs(String... dirStrs) throws IOException {
|
||||
StringBuilder path = new StringBuilder();
|
||||
for (String dirStr : dirStrs) {
|
||||
path.append(dirStr).append('/');
|
||||
path.append(dirStr).append(File.separatorChar);
|
||||
File dir = new File(path.toString());
|
||||
if (!dir.exists()) {
|
||||
if (!dir.mkdirs()) {
|
||||
|
|
|
@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
|
|||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.HttpURLConnection;
|
||||
|
@ -32,6 +33,7 @@ import java.util.Set;
|
|||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
|
@ -57,17 +59,21 @@ import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
|||
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
||||
|
||||
public class TestTimelineReaderWebServices {
|
||||
|
||||
private static final String ROOT_DIR = new File("target",
|
||||
TestTimelineReaderWebServices.class.getSimpleName()).getAbsolutePath();
|
||||
|
||||
private int serverPort;
|
||||
private TimelineReaderServer server;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
TestFileSystemTimelineReaderImpl.setup();
|
||||
TestFileSystemTimelineReaderImpl.initializeDataDirectory(ROOT_DIR);
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void tearDown() throws Exception {
|
||||
TestFileSystemTimelineReaderImpl.tearDown();
|
||||
FileUtils.deleteDirectory(new File(ROOT_DIR));
|
||||
}
|
||||
|
||||
@Before
|
||||
|
@ -81,6 +87,8 @@ public class TestTimelineReaderWebServices {
|
|||
config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1");
|
||||
config.setClass(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS,
|
||||
FileSystemTimelineReaderImpl.class, TimelineReader.class);
|
||||
config.set(FileSystemTimelineReaderImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||
ROOT_DIR);
|
||||
server = new TimelineReaderServer();
|
||||
server.init(config);
|
||||
server.start();
|
||||
|
|
|
@ -43,11 +43,11 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilter
|
|||
import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineExistsFilter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilterList.Operator;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValueFilter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter;
|
||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -58,17 +58,22 @@ import org.junit.Test;
|
|||
|
||||
public class TestFileSystemTimelineReaderImpl {
|
||||
|
||||
private static final String ROOT_DIR =
|
||||
FileSystemTimelineReaderImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
|
||||
private static final String ROOT_DIR = new File("target",
|
||||
TestFileSystemTimelineReaderImpl.class.getSimpleName()).getAbsolutePath();
|
||||
private FileSystemTimelineReaderImpl reader;
|
||||
|
||||
@BeforeClass
|
||||
public static void setup() throws Exception {
|
||||
loadEntityData();
|
||||
initializeDataDirectory(ROOT_DIR);
|
||||
}
|
||||
|
||||
public static void initializeDataDirectory(String rootDir) throws Exception {
|
||||
loadEntityData(rootDir);
|
||||
// Create app flow mapping file.
|
||||
CSVFormat format =
|
||||
CSVFormat.DEFAULT.withHeader("APP", "USER", "FLOW", "FLOWRUN");
|
||||
String appFlowMappingFile = ROOT_DIR + "/entities/cluster1/" +
|
||||
String appFlowMappingFile = rootDir + File.separator + "entities" +
|
||||
File.separator + "cluster1" + File.separator +
|
||||
FileSystemTimelineReaderImpl.APP_FLOW_MAPPING_FILE;
|
||||
try (PrintWriter out =
|
||||
new PrintWriter(new BufferedWriter(
|
||||
|
@ -78,7 +83,7 @@ public class TestFileSystemTimelineReaderImpl {
|
|||
printer.printRecord("app2", "user1", "flow1,flow", 1);
|
||||
printer.close();
|
||||
}
|
||||
(new File(ROOT_DIR)).deleteOnExit();
|
||||
(new File(rootDir)).deleteOnExit();
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
|
@ -102,7 +107,8 @@ public class TestFileSystemTimelineReaderImpl {
|
|||
throw new IOException("Could not create directories for " + dir);
|
||||
}
|
||||
}
|
||||
String fileName = dir.getAbsolutePath() + "/" + entity.getId() + ".thist";
|
||||
String fileName = dir.getAbsolutePath() + File.separator + entity.getId() +
|
||||
".thist";
|
||||
try (PrintWriter out =
|
||||
new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)))){
|
||||
out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
||||
|
@ -111,9 +117,8 @@ public class TestFileSystemTimelineReaderImpl {
|
|||
}
|
||||
}
|
||||
|
||||
private static void loadEntityData() throws Exception {
|
||||
File appDir = new File(ROOT_DIR +
|
||||
"/entities/cluster1/user1/flow1/1/app1/app/");
|
||||
private static void loadEntityData(String rootDir) throws Exception {
|
||||
File appDir = getAppDir(rootDir, "cluster1", "user1", "flow1", "1", "app1");
|
||||
TimelineEntity entity11 = new TimelineEntity();
|
||||
entity11.setId("id_1");
|
||||
entity11.setType("app");
|
||||
|
@ -254,8 +259,8 @@ public class TestFileSystemTimelineReaderImpl {
|
|||
entity4.addEvent(event44);
|
||||
writeEntityFile(entity4, appDir);
|
||||
|
||||
File appDir2 = new File(ROOT_DIR +
|
||||
"/entities/cluster1/user1/flow1,flow/1/app2/app/");
|
||||
File appDir2 =
|
||||
getAppDir(rootDir, "cluster1", "user1", "flow1,flow", "1", "app2");
|
||||
TimelineEntity entity5 = new TimelineEntity();
|
||||
entity5.setId("id_5");
|
||||
entity5.setType("app");
|
||||
|
@ -263,8 +268,12 @@ public class TestFileSystemTimelineReaderImpl {
|
|||
writeEntityFile(entity5, appDir2);
|
||||
}
|
||||
|
||||
public TimelineReader getTimelineReader() {
|
||||
return reader;
|
||||
private static File getAppDir(String rootDir, String cluster, String user,
|
||||
String flowName, String flowRunId, String appId) {
|
||||
return new File(rootDir + File.separator + "entities" + File.separator +
|
||||
cluster + File.separator + user + File.separator + flowName +
|
||||
File.separator + flowRunId + File.separator + appId + File.separator +
|
||||
"app" + File.separator);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -29,16 +29,20 @@ import java.util.HashMap;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.TemporaryFolder;
|
||||
|
||||
public class TestFileSystemTimelineWriterImpl {
|
||||
@Rule
|
||||
public TemporaryFolder tmpFolder = new TemporaryFolder();
|
||||
|
||||
/**
|
||||
* Unit test for PoC YARN 3264.
|
||||
|
@ -79,14 +83,20 @@ public class TestFileSystemTimelineWriterImpl {
|
|||
FileSystemTimelineWriterImpl fsi = null;
|
||||
try {
|
||||
fsi = new FileSystemTimelineWriterImpl();
|
||||
fsi.init(new YarnConfiguration());
|
||||
Configuration conf = new YarnConfiguration();
|
||||
String outputRoot = tmpFolder.newFolder().getAbsolutePath();
|
||||
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||
outputRoot);
|
||||
fsi.init(conf);
|
||||
fsi.start();
|
||||
fsi.write("cluster_id", "user_id", "flow_name", "flow_version", 12345678L,
|
||||
"app_id", te);
|
||||
|
||||
String fileName = fsi.getOutputRoot() +
|
||||
"/entities/cluster_id/user_id/flow_name/flow_version/12345678/" +
|
||||
"app_id/" + type + "/" + id +
|
||||
String fileName = fsi.getOutputRoot() + File.separator + "entities" +
|
||||
File.separator + "cluster_id" + File.separator + "user_id" +
|
||||
File.separator + "flow_name" + File.separator + "flow_version" +
|
||||
File.separator + "12345678" + File.separator + "app_id" +
|
||||
File.separator + type + File.separator + id +
|
||||
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
Path path = Paths.get(fileName);
|
||||
File f = new File(fileName);
|
||||
|
@ -99,9 +109,11 @@ public class TestFileSystemTimelineWriterImpl {
|
|||
assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
||||
|
||||
// verify aggregated metrics
|
||||
String fileName2 = fsi.getOutputRoot() +
|
||||
"/entities/cluster_id/user_id/flow_name/flow_version/12345678/app_id/"
|
||||
+ type2 + "/" + id2 +
|
||||
String fileName2 = fsi.getOutputRoot() + File.separator + "entities" +
|
||||
File.separator + "cluster_id" + File.separator + "user_id" +
|
||||
File.separator + "flow_name" + File.separator + "flow_version" +
|
||||
File.separator + "12345678" + File.separator + "app_id" +
|
||||
File.separator + type2 + File.separator + id2 +
|
||||
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||
Path path2 = Paths.get(fileName2);
|
||||
File file = new File(fileName2);
|
||||
|
@ -113,15 +125,9 @@ public class TestFileSystemTimelineWriterImpl {
|
|||
// confirm the contents same as what was written
|
||||
assertEquals(metricToString,
|
||||
TimelineUtils.dumpTimelineRecordtoJSON(entity2));
|
||||
|
||||
// delete the directory
|
||||
File outputDir = new File(fsi.getOutputRoot());
|
||||
FileUtils.deleteDirectory(outputDir);
|
||||
assertTrue(!(f.exists()));
|
||||
} finally {
|
||||
if (fsi != null) {
|
||||
fsi.close();
|
||||
FileUtils.deleteDirectory(new File(fsi.getOutputRoot()));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue