YARN-9338 Timeline related testcases are failing. Contributed by Abhishek Modi.
This commit is contained in:
parent
67cc24a7a4
commit
17a3e14d25
@ -44,7 +44,9 @@
|
|||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
|
import org.apache.hadoop.fs.RawLocalFileSystem;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.minikdc.MiniKdc;
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
@ -144,6 +146,8 @@ public static void setup() {
|
|||||||
// Setup timeline service v2.
|
// Setup timeline service v2.
|
||||||
try {
|
try {
|
||||||
conf = new Configuration(false);
|
conf = new Configuration(false);
|
||||||
|
conf.setClass("fs.file.impl", RawLocalFileSystem.class,
|
||||||
|
FileSystem.class);
|
||||||
conf.setStrings(TimelineAuthenticationFilterInitializer.PREFIX + "type",
|
conf.setStrings(TimelineAuthenticationFilterInitializer.PREFIX + "type",
|
||||||
"kerberos");
|
"kerberos");
|
||||||
conf.set(TimelineAuthenticationFilterInitializer.PREFIX +
|
conf.set(TimelineAuthenticationFilterInitializer.PREFIX +
|
||||||
|
@ -24,6 +24,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
@ -78,6 +79,7 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
|||||||
private int fsNumRetries;
|
private int fsNumRetries;
|
||||||
private long fsRetryInterval;
|
private long fsRetryInterval;
|
||||||
private Path entitiesPath;
|
private Path entitiesPath;
|
||||||
|
private Configuration config;
|
||||||
|
|
||||||
/** default value for storage location on local disk. */
|
/** default value for storage location on local disk. */
|
||||||
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
|
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
|
||||||
@ -122,17 +124,13 @@ private synchronized void writeInternal(String clusterId, String userId,
|
|||||||
TimelineEntity entity,
|
TimelineEntity entity,
|
||||||
TimelineWriteResponse response)
|
TimelineWriteResponse response)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Path clusterIdPath = new Path(entitiesPath, clusterId);
|
String entityTypePathStr = clusterId + File.separator + userId +
|
||||||
Path userIdPath = new Path(clusterIdPath, userId);
|
File.separator + escape(flowName) + File.separator +
|
||||||
Path flowNamePath = new Path(userIdPath, escape(flowName));
|
escape(flowVersion) + File.separator + flowRun + File.separator + appId
|
||||||
Path flowVersionPath = new Path(flowNamePath, escape(flowVersion));
|
+ File.separator + entity.getType();
|
||||||
Path flowRunPath = new Path(flowVersionPath, String.valueOf(flowRun));
|
Path entityTypePath = new Path(entitiesPath, entityTypePathStr);
|
||||||
Path appIdPath = new Path(flowRunPath, appId);
|
|
||||||
Path entityTypePath = new Path(appIdPath, entity.getType());
|
|
||||||
try {
|
try {
|
||||||
mkdirs(rootPath, entitiesPath, clusterIdPath, userIdPath,
|
mkdirs(entityTypePath);
|
||||||
flowNamePath, flowVersionPath, flowRunPath, appIdPath,
|
|
||||||
entityTypePath);
|
|
||||||
Path filePath =
|
Path filePath =
|
||||||
new Path(entityTypePath,
|
new Path(entityTypePath,
|
||||||
entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
|
entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
|
||||||
@ -181,7 +179,8 @@ public void serviceInit(Configuration conf) throws Exception {
|
|||||||
DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES);
|
DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES);
|
||||||
fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS,
|
fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS,
|
||||||
DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS);
|
DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS);
|
||||||
fs = rootPath.getFileSystem(getConfig());
|
config = conf;
|
||||||
|
fs = rootPath.getFileSystem(config);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -285,12 +284,15 @@ protected void writeFile(Path outputPath, byte[] data) throws IOException {
|
|||||||
// final status.
|
// final status.
|
||||||
try {
|
try {
|
||||||
fsOut = fs.create(tempPath, true);
|
fsOut = fs.create(tempPath, true);
|
||||||
|
FSDataInputStream fsIn = fs.open(outputPath);
|
||||||
|
IOUtils.copyBytes(fsIn, fsOut, config, false);
|
||||||
|
fsIn.close();
|
||||||
|
fs.delete(outputPath, false);
|
||||||
fsOut.write(data);
|
fsOut.write(data);
|
||||||
fsOut.close();
|
fsOut.close();
|
||||||
fsOut = null;
|
|
||||||
fs.rename(tempPath, outputPath);
|
fs.rename(tempPath, outputPath);
|
||||||
} finally {
|
} catch (IOException ie) {
|
||||||
IOUtils.cleanupWithLogger(LOG, fsOut);
|
LOG.error("Got an exception while writing file", ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -146,6 +146,125 @@ public void testWriteEntityToFile() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteMultipleEntities() throws Exception {
|
||||||
|
String id = "appId";
|
||||||
|
String type = "app";
|
||||||
|
|
||||||
|
TimelineEntities te1 = new TimelineEntities();
|
||||||
|
TimelineEntity entity = new TimelineEntity();
|
||||||
|
entity.setId(id);
|
||||||
|
entity.setType(type);
|
||||||
|
entity.setCreatedTime(1425016501000L);
|
||||||
|
te1.addEntity(entity);
|
||||||
|
|
||||||
|
TimelineEntities te2 = new TimelineEntities();
|
||||||
|
TimelineEntity entity2 = new TimelineEntity();
|
||||||
|
entity2.setId(id);
|
||||||
|
entity2.setType(type);
|
||||||
|
entity2.setCreatedTime(1425016503000L);
|
||||||
|
te2.addEntity(entity2);
|
||||||
|
|
||||||
|
FileSystemTimelineWriterImpl fsi = null;
|
||||||
|
try {
|
||||||
|
fsi = new FileSystemTimelineWriterImpl();
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
String outputRoot = tmpFolder.newFolder().getAbsolutePath();
|
||||||
|
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||||
|
outputRoot);
|
||||||
|
fsi.init(conf);
|
||||||
|
fsi.start();
|
||||||
|
fsi.write(
|
||||||
|
new TimelineCollectorContext("cluster_id", "user_id", "flow_name",
|
||||||
|
"flow_version", 12345678L, "app_id"),
|
||||||
|
te1, UserGroupInformation.createRemoteUser("user_id"));
|
||||||
|
fsi.write(
|
||||||
|
new TimelineCollectorContext("cluster_id", "user_id", "flow_name",
|
||||||
|
"flow_version", 12345678L, "app_id"),
|
||||||
|
te2, UserGroupInformation.createRemoteUser("user_id"));
|
||||||
|
|
||||||
|
String fileName = outputRoot + File.separator + "entities" +
|
||||||
|
File.separator + "cluster_id" + File.separator + "user_id" +
|
||||||
|
File.separator + "flow_name" + File.separator + "flow_version" +
|
||||||
|
File.separator + "12345678" + File.separator + "app_id" +
|
||||||
|
File.separator + type + File.separator + id +
|
||||||
|
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||||
|
Path path = new Path(fileName);
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
assertTrue("Specified path(" + fileName + ") should exist: ",
|
||||||
|
fs.exists(path));
|
||||||
|
FileStatus fileStatus = fs.getFileStatus(path);
|
||||||
|
assertTrue("Specified path should be a file",
|
||||||
|
!fileStatus.isDirectory());
|
||||||
|
List<String> data = readFromFile(fs, path);
|
||||||
|
assertTrue("data size is:" + data.size(), data.size() == 3);
|
||||||
|
String d = data.get(0);
|
||||||
|
// confirm the contents same as what was written
|
||||||
|
assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
||||||
|
|
||||||
|
|
||||||
|
String metricToString = data.get(1);
|
||||||
|
// confirm the contents same as what was written
|
||||||
|
assertEquals(metricToString,
|
||||||
|
TimelineUtils.dumpTimelineRecordtoJSON(entity2));
|
||||||
|
} finally {
|
||||||
|
if (fsi != null) {
|
||||||
|
fsi.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testWriteEntitiesWithEmptyFlowName() throws Exception {
|
||||||
|
String id = "appId";
|
||||||
|
String type = "app";
|
||||||
|
|
||||||
|
TimelineEntities te = new TimelineEntities();
|
||||||
|
TimelineEntity entity = new TimelineEntity();
|
||||||
|
entity.setId(id);
|
||||||
|
entity.setType(type);
|
||||||
|
entity.setCreatedTime(1425016501000L);
|
||||||
|
te.addEntity(entity);
|
||||||
|
|
||||||
|
FileSystemTimelineWriterImpl fsi = null;
|
||||||
|
try {
|
||||||
|
fsi = new FileSystemTimelineWriterImpl();
|
||||||
|
Configuration conf = new YarnConfiguration();
|
||||||
|
String outputRoot = tmpFolder.newFolder().getAbsolutePath();
|
||||||
|
conf.set(FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||||
|
outputRoot);
|
||||||
|
fsi.init(conf);
|
||||||
|
fsi.start();
|
||||||
|
fsi.write(
|
||||||
|
new TimelineCollectorContext("cluster_id", "user_id", "",
|
||||||
|
"flow_version", 12345678L, "app_id"),
|
||||||
|
te, UserGroupInformation.createRemoteUser("user_id"));
|
||||||
|
|
||||||
|
String fileName = outputRoot + File.separator + "entities" +
|
||||||
|
File.separator + "cluster_id" + File.separator + "user_id" +
|
||||||
|
File.separator + "" + File.separator + "flow_version" +
|
||||||
|
File.separator + "12345678" + File.separator + "app_id" +
|
||||||
|
File.separator + type + File.separator + id +
|
||||||
|
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||||
|
Path path = new Path(fileName);
|
||||||
|
FileSystem fs = FileSystem.get(conf);
|
||||||
|
assertTrue("Specified path(" + fileName + ") should exist: ",
|
||||||
|
fs.exists(path));
|
||||||
|
FileStatus fileStatus = fs.getFileStatus(path);
|
||||||
|
assertTrue("Specified path should be a file",
|
||||||
|
!fileStatus.isDirectory());
|
||||||
|
List<String> data = readFromFile(fs, path);
|
||||||
|
assertTrue("data size is:" + data.size(), data.size() == 2);
|
||||||
|
String d = data.get(0);
|
||||||
|
// confirm the contents same as what was written
|
||||||
|
assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
||||||
|
} finally {
|
||||||
|
if (fsi != null) {
|
||||||
|
fsi.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private List<String> readFromFile(FileSystem fs, Path path)
|
private List<String> readFromFile(FileSystem fs, Path path)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
BufferedReader br = new BufferedReader(
|
BufferedReader br = new BufferedReader(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user