YARN-3841 [atsv2 Storage implementation] Adding retry semantics to HDFS backing storage. Contributed by Abhishek Modi.
This commit is contained in:
parent
66e1599761
commit
d451ff7534
|
@ -18,16 +18,16 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
|
|
||||||
import java.io.BufferedWriter;
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileOutputStream;
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStreamWriter;
|
|
||||||
import java.io.PrintWriter;
|
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.service.AbstractService;
|
import org.apache.hadoop.service.AbstractService;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
|
||||||
|
@ -35,14 +35,17 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
|
||||||
|
import org.apache.hadoop.yarn.client.api.impl.FileSystemTimelineWriter;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This implements a local file based backend for storing application timeline
|
* This implements a FileSystem based backend for storing application timeline
|
||||||
* information. This implementation may not provide a complete implementation of
|
* information. This implementation may not provide a complete implementation of
|
||||||
* all the necessary features. This implementation is provided solely for basic
|
* all the necessary features. This implementation is provided solely for basic
|
||||||
* testing purposes, and should not be used in a non-test situation.
|
* testing purposes, and should not be used in a non-test situation.
|
||||||
|
@ -52,20 +55,36 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
public class FileSystemTimelineWriterImpl extends AbstractService
|
public class FileSystemTimelineWriterImpl extends AbstractService
|
||||||
implements TimelineWriter {
|
implements TimelineWriter {
|
||||||
|
|
||||||
private String outputRoot;
|
|
||||||
|
|
||||||
/** Config param for timeline service storage tmp root for FILE YARN-3264. */
|
/** Config param for timeline service storage tmp root for FILE YARN-3264. */
|
||||||
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
|
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
|
||||||
= YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
|
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
|
||||||
|
|
||||||
|
public static final String TIMELINE_FS_WRITER_NUM_RETRIES =
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.num-retries";
|
||||||
|
public static final int DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES = 0;
|
||||||
|
|
||||||
|
public static final String TIMELINE_FS_WRITER_RETRY_INTERVAL_MS =
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_PREFIX +
|
||||||
|
"fs-writer.retry-interval-ms";
|
||||||
|
public static final long DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = 1000L;
|
||||||
|
|
||||||
public static final String ENTITIES_DIR = "entities";
|
public static final String ENTITIES_DIR = "entities";
|
||||||
|
|
||||||
/** Default extension for output files. */
|
/** Default extension for output files. */
|
||||||
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
|
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
|
||||||
|
|
||||||
|
private FileSystem fs;
|
||||||
|
private Path rootPath;
|
||||||
|
private int fsNumRetries;
|
||||||
|
private long fsRetryInterval;
|
||||||
|
private Path entitiesPath;
|
||||||
|
|
||||||
/** default value for storage location on local disk. */
|
/** default value for storage location on local disk. */
|
||||||
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
|
private static final String STORAGE_DIR_ROOT = "timeline_service_data";
|
||||||
|
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(FileSystemTimelineWriter.class);
|
||||||
|
|
||||||
FileSystemTimelineWriterImpl() {
|
FileSystemTimelineWriterImpl() {
|
||||||
super((FileSystemTimelineWriterImpl.class.getName()));
|
super((FileSystemTimelineWriterImpl.class.getName()));
|
||||||
}
|
}
|
||||||
|
@ -83,8 +102,8 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
||||||
String appId = context.getAppId();
|
String appId = context.getAppId();
|
||||||
|
|
||||||
for (TimelineEntity entity : entities.getEntities()) {
|
for (TimelineEntity entity : entities.getEntities()) {
|
||||||
write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity,
|
writeInternal(clusterId, userId, flowName, flowVersion,
|
||||||
response);
|
flowRunId, appId, entity, response);
|
||||||
}
|
}
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
@ -97,59 +116,78 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void write(String clusterId, String userId,
|
private synchronized void writeInternal(String clusterId, String userId,
|
||||||
String flowName, String flowVersion, long flowRun, String appId,
|
String flowName, String flowVersion,
|
||||||
TimelineEntity entity, TimelineWriteResponse response)
|
long flowRun, String appId,
|
||||||
|
TimelineEntity entity,
|
||||||
|
TimelineWriteResponse response)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
PrintWriter out = null;
|
Path clusterIdPath = new Path(entitiesPath, clusterId);
|
||||||
|
Path userIdPath = new Path(clusterIdPath, userId);
|
||||||
|
Path flowNamePath = new Path(userIdPath, escape(flowName));
|
||||||
|
Path flowVersionPath = new Path(flowNamePath, escape(flowVersion));
|
||||||
|
Path flowRunPath = new Path(flowVersionPath, String.valueOf(flowRun));
|
||||||
|
Path appIdPath = new Path(flowRunPath, appId);
|
||||||
|
Path entityTypePath = new Path(appIdPath, entity.getType());
|
||||||
try {
|
try {
|
||||||
String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId,
|
mkdirs(rootPath, entitiesPath, clusterIdPath, userIdPath,
|
||||||
escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId,
|
flowNamePath, flowVersionPath, flowRunPath, appIdPath,
|
||||||
entity.getType());
|
entityTypePath);
|
||||||
String fileName = dir + entity.getId() +
|
Path filePath =
|
||||||
TIMELINE_SERVICE_STORAGE_EXTENSION;
|
new Path(entityTypePath,
|
||||||
out =
|
entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
|
||||||
new PrintWriter(new BufferedWriter(new OutputStreamWriter(
|
createFileWithRetries(filePath);
|
||||||
new FileOutputStream(fileName, true), "UTF-8")));
|
|
||||||
out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
byte[] record = new StringBuilder()
|
||||||
out.write("\n");
|
.append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
|
||||||
} catch (IOException ioe) {
|
.append("\n").toString().getBytes("UTF-8");
|
||||||
TimelineWriteError error = new TimelineWriteError();
|
writeFileWithRetries(filePath, record);
|
||||||
error.setEntityId(entity.getId());
|
} catch (Exception ioe) {
|
||||||
error.setEntityType(entity.getType());
|
LOG.warn("Interrupted operation:" + ioe.getMessage());
|
||||||
|
TimelineWriteError error = createTimelineWriteError(entity);
|
||||||
/*
|
/*
|
||||||
* TODO: set an appropriate error code after PoC could possibly be:
|
* TODO: set an appropriate error code after PoC could possibly be:
|
||||||
* error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
|
* error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
|
||||||
*/
|
*/
|
||||||
response.addError(error);
|
response.addError(error);
|
||||||
} finally {
|
|
||||||
if (out != null) {
|
|
||||||
out.close();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
|
||||||
|
TimelineWriteError error = new TimelineWriteError();
|
||||||
|
error.setEntityId(entity.getId());
|
||||||
|
error.setEntityType(entity.getType());
|
||||||
|
return error;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TimelineWriteResponse aggregate(TimelineEntity data,
|
public TimelineWriteResponse aggregate(TimelineEntity data,
|
||||||
TimelineAggregationTrack track) throws IOException {
|
TimelineAggregationTrack track) throws IOException {
|
||||||
return null;
|
return null;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
String getOutputRoot() {
|
String getOutputRoot() {
|
||||||
return outputRoot;
|
return rootPath.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serviceInit(Configuration conf) throws Exception {
|
public void serviceInit(Configuration conf) throws Exception {
|
||||||
outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
|
||||||
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
|
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
|
||||||
|
rootPath = new Path(outputRoot);
|
||||||
|
entitiesPath = new Path(rootPath, ENTITIES_DIR);
|
||||||
|
fsNumRetries = conf.getInt(TIMELINE_FS_WRITER_NUM_RETRIES,
|
||||||
|
DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES);
|
||||||
|
fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS,
|
||||||
|
DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS);
|
||||||
|
fs = rootPath.getFileSystem(getConfig());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void serviceStart() throws Exception {
|
public void serviceStart() throws Exception {
|
||||||
mkdirs(outputRoot, ENTITIES_DIR);
|
mkdirsWithRetries(rootPath);
|
||||||
|
mkdirsWithRetries(entitiesPath);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -157,18 +195,103 @@ public class FileSystemTimelineWriterImpl extends AbstractService
|
||||||
// no op
|
// no op
|
||||||
}
|
}
|
||||||
|
|
||||||
private static String mkdirs(String... dirStrs) throws IOException {
|
private void mkdirs(Path... paths) throws IOException, InterruptedException {
|
||||||
StringBuilder path = new StringBuilder();
|
for (Path path: paths) {
|
||||||
for (String dirStr : dirStrs) {
|
if (!existsWithRetries(path)) {
|
||||||
path.append(dirStr).append(File.separatorChar);
|
mkdirsWithRetries(path);
|
||||||
File dir = new File(path.toString());
|
|
||||||
if (!dir.exists()) {
|
|
||||||
if (!dir.mkdirs()) {
|
|
||||||
throw new IOException("Could not create directories for " + dir);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return path.toString();
|
|
||||||
|
// Code from FSRMStateStore.
|
||||||
|
private void mkdirsWithRetries(final Path dirPath)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
new FSAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws IOException {
|
||||||
|
fs.mkdirs(dirPath);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void writeFileWithRetries(final Path outputPath, final byte[] data)
|
||||||
|
throws Exception {
|
||||||
|
new FSAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws IOException {
|
||||||
|
writeFile(outputPath, data);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean createFileWithRetries(final Path newFile)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return new FSAction<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean run() throws IOException {
|
||||||
|
return createFile(newFile);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean existsWithRetries(final Path path)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
return new FSAction<Boolean>() {
|
||||||
|
@Override
|
||||||
|
public Boolean run() throws IOException {
|
||||||
|
return fs.exists(path);
|
||||||
|
}
|
||||||
|
}.runWithRetries();
|
||||||
|
}
|
||||||
|
|
||||||
|
private abstract class FSAction<T> {
|
||||||
|
abstract T run() throws IOException;
|
||||||
|
|
||||||
|
T runWithRetries() throws IOException, InterruptedException {
|
||||||
|
int retry = 0;
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
return run();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.info("Exception while executing a FS operation.", e);
|
||||||
|
if (++retry > fsNumRetries) {
|
||||||
|
LOG.info("Maxed out FS retries. Giving up!");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
LOG.info("Will retry operation on FS. Retry no. " + retry +
|
||||||
|
" after sleeping for " + fsRetryInterval + " seconds");
|
||||||
|
Thread.sleep(fsRetryInterval);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean createFile(Path newFile) throws IOException {
|
||||||
|
return fs.createNewFile(newFile);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* In order to make this writeInternal atomic as a part of writeInternal
|
||||||
|
* we will first writeInternal data to .tmp file and then rename it.
|
||||||
|
* Here we are assuming that rename is atomic for underlying file system.
|
||||||
|
*/
|
||||||
|
protected void writeFile(Path outputPath, byte[] data) throws IOException {
|
||||||
|
Path tempPath =
|
||||||
|
new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
|
||||||
|
FSDataOutputStream fsOut = null;
|
||||||
|
// This file will be overwritten when app/attempt finishes for saving the
|
||||||
|
// final status.
|
||||||
|
try {
|
||||||
|
fsOut = fs.create(tempPath, true);
|
||||||
|
fsOut.write(data);
|
||||||
|
fsOut.close();
|
||||||
|
fsOut = null;
|
||||||
|
fs.rename(tempPath, outputPath);
|
||||||
|
} finally {
|
||||||
|
IOUtils.cleanupWithLogger(LOG, fsOut);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// specifically escape the separator character
|
// specifically escape the separator character
|
||||||
|
|
|
@ -20,16 +20,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.BufferedReader;
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.io.IOException;
|
||||||
import java.nio.file.Files;
|
import java.io.InputStreamReader;
|
||||||
import java.nio.file.Path;
|
import java.util.ArrayList;
|
||||||
import java.nio.file.Paths;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
@ -96,16 +99,20 @@ public class TestFileSystemTimelineWriterImpl {
|
||||||
"flow_version", 12345678L, "app_id"),
|
"flow_version", 12345678L, "app_id"),
|
||||||
te, UserGroupInformation.createRemoteUser("user_id"));
|
te, UserGroupInformation.createRemoteUser("user_id"));
|
||||||
|
|
||||||
String fileName = fsi.getOutputRoot() + File.separator + "entities" +
|
String fileName = outputRoot + File.separator + "entities" +
|
||||||
File.separator + "cluster_id" + File.separator + "user_id" +
|
File.separator + "cluster_id" + File.separator + "user_id" +
|
||||||
File.separator + "flow_name" + File.separator + "flow_version" +
|
File.separator + "flow_name" + File.separator + "flow_version" +
|
||||||
File.separator + "12345678" + File.separator + "app_id" +
|
File.separator + "12345678" + File.separator + "app_id" +
|
||||||
File.separator + type + File.separator + id +
|
File.separator + type + File.separator + id +
|
||||||
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||||
Path path = Paths.get(fileName);
|
Path path = new Path(fileName);
|
||||||
File f = new File(fileName);
|
FileSystem fs = FileSystem.get(conf);
|
||||||
assertTrue(f.exists() && !f.isDirectory());
|
assertTrue("Specified path(" + fileName + ") should exist: ",
|
||||||
List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
|
fs.exists(path));
|
||||||
|
FileStatus fileStatus = fs.getFileStatus(path);
|
||||||
|
assertTrue("Specified path should be a file",
|
||||||
|
!fileStatus.isDirectory());
|
||||||
|
List<String> data = readFromFile(fs, path);
|
||||||
// ensure there's only one entity + 1 new line
|
// ensure there's only one entity + 1 new line
|
||||||
assertTrue("data size is:" + data.size(), data.size() == 2);
|
assertTrue("data size is:" + data.size(), data.size() == 2);
|
||||||
String d = data.get(0);
|
String d = data.get(0);
|
||||||
|
@ -119,12 +126,15 @@ public class TestFileSystemTimelineWriterImpl {
|
||||||
File.separator + "12345678" + File.separator + "app_id" +
|
File.separator + "12345678" + File.separator + "app_id" +
|
||||||
File.separator + type2 + File.separator + id2 +
|
File.separator + type2 + File.separator + id2 +
|
||||||
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
|
||||||
Path path2 = Paths.get(fileName2);
|
Path path2 = new Path(fileName2);
|
||||||
File file = new File(fileName2);
|
assertTrue("Specified path(" + fileName + ") should exist: ",
|
||||||
assertTrue(file.exists() && !file.isDirectory());
|
fs.exists(path2));
|
||||||
List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8);
|
FileStatus fileStatus2 = fs.getFileStatus(path2);
|
||||||
|
assertTrue("Specified path should be a file",
|
||||||
|
!fileStatus2.isDirectory());
|
||||||
|
List<String> data2 = readFromFile(fs, path2);
|
||||||
// ensure there's only one entity + 1 new line
|
// ensure there's only one entity + 1 new line
|
||||||
assertTrue("data size is:" + data.size(), data2.size() == 2);
|
assertTrue("data size is:" + data2.size(), data2.size() == 2);
|
||||||
String metricToString = data2.get(0);
|
String metricToString = data2.get(0);
|
||||||
// confirm the contents same as what was written
|
// confirm the contents same as what was written
|
||||||
assertEquals(metricToString,
|
assertEquals(metricToString,
|
||||||
|
@ -136,4 +146,17 @@ public class TestFileSystemTimelineWriterImpl {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private List<String> readFromFile(FileSystem fs, Path path)
|
||||||
|
throws IOException {
|
||||||
|
BufferedReader br = new BufferedReader(
|
||||||
|
new InputStreamReader(fs.open(path)));
|
||||||
|
List<String> data = new ArrayList<>();
|
||||||
|
String line = br.readLine();
|
||||||
|
data.add(line);
|
||||||
|
while(line != null) {
|
||||||
|
line = br.readLine();
|
||||||
|
data.add(line);
|
||||||
|
}
|
||||||
|
return data;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue