YARN-3841 [atsv2 Storage implementation] Adding retry semantics to HDFS backing storage. Contributed by Abhishek Modi.

This commit is contained in:
Vrushali C 2019-02-27 14:55:35 -08:00
parent 0ec962ac8f
commit ea3cdc60b3
2 changed files with 207 additions and 61 deletions

View File

@ -18,16 +18,16 @@
package org.apache.hadoop.yarn.server.timelineservice.storage; package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.BufferedWriter;
import java.io.File; import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineDomain;
@ -35,14 +35,17 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
import org.apache.hadoop.yarn.client.api.impl.FileSystemTimelineWriter;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils; import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/** /**
* This implements a local file based backend for storing application timeline * This implements a FileSystem based backend for storing application timeline
* information. This implementation may not provide a complete implementation of * information. This implementation may not provide a complete implementation of
* all the necessary features. This implementation is provided solely for basic * all the necessary features. This implementation is provided solely for basic
* testing purposes, and should not be used in a non-test situation. * testing purposes, and should not be used in a non-test situation.
@ -52,20 +55,36 @@ import com.google.common.annotations.VisibleForTesting;
public class FileSystemTimelineWriterImpl extends AbstractService public class FileSystemTimelineWriterImpl extends AbstractService
implements TimelineWriter { implements TimelineWriter {
private String outputRoot;
/** Config param for timeline service storage tmp root for FILE YARN-3264. */ /** Config param for timeline service storage tmp root for FILE YARN-3264. */
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT =
= YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir"; YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
public static final String TIMELINE_FS_WRITER_NUM_RETRIES =
YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.num-retries";
public static final int DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES = 0;
public static final String TIMELINE_FS_WRITER_RETRY_INTERVAL_MS =
YarnConfiguration.TIMELINE_SERVICE_PREFIX +
"fs-writer.retry-interval-ms";
public static final long DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS = 1000L;
public static final String ENTITIES_DIR = "entities"; public static final String ENTITIES_DIR = "entities";
/** Default extension for output files. */ /** Default extension for output files. */
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist"; public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
private FileSystem fs;
private Path rootPath;
private int fsNumRetries;
private long fsRetryInterval;
private Path entitiesPath;
/** default value for storage location on local disk. */ /** default value for storage location on local disk. */
private static final String STORAGE_DIR_ROOT = "timeline_service_data"; private static final String STORAGE_DIR_ROOT = "timeline_service_data";
private static final Logger LOG =
LoggerFactory.getLogger(FileSystemTimelineWriter.class);
FileSystemTimelineWriterImpl() { FileSystemTimelineWriterImpl() {
super((FileSystemTimelineWriterImpl.class.getName())); super((FileSystemTimelineWriterImpl.class.getName()));
} }
@ -83,8 +102,8 @@ public class FileSystemTimelineWriterImpl extends AbstractService
String appId = context.getAppId(); String appId = context.getAppId();
for (TimelineEntity entity : entities.getEntities()) { for (TimelineEntity entity : entities.getEntities()) {
write(clusterId, userId, flowName, flowVersion, flowRunId, appId, entity, writeInternal(clusterId, userId, flowName, flowVersion,
response); flowRunId, appId, entity, response);
} }
return response; return response;
} }
@ -97,59 +116,78 @@ public class FileSystemTimelineWriterImpl extends AbstractService
return null; return null;
} }
private synchronized void write(String clusterId, String userId, private synchronized void writeInternal(String clusterId, String userId,
String flowName, String flowVersion, long flowRun, String appId, String flowName, String flowVersion,
TimelineEntity entity, TimelineWriteResponse response) long flowRun, String appId,
TimelineEntity entity,
TimelineWriteResponse response)
throws IOException { throws IOException {
PrintWriter out = null; Path clusterIdPath = new Path(entitiesPath, clusterId);
Path userIdPath = new Path(clusterIdPath, userId);
Path flowNamePath = new Path(userIdPath, escape(flowName));
Path flowVersionPath = new Path(flowNamePath, escape(flowVersion));
Path flowRunPath = new Path(flowVersionPath, String.valueOf(flowRun));
Path appIdPath = new Path(flowRunPath, appId);
Path entityTypePath = new Path(appIdPath, entity.getType());
try { try {
String dir = mkdirs(outputRoot, ENTITIES_DIR, clusterId, userId, mkdirs(rootPath, entitiesPath, clusterIdPath, userIdPath,
escape(flowName), escape(flowVersion), String.valueOf(flowRun), appId, flowNamePath, flowVersionPath, flowRunPath, appIdPath,
entity.getType()); entityTypePath);
String fileName = dir + entity.getId() + Path filePath =
TIMELINE_SERVICE_STORAGE_EXTENSION; new Path(entityTypePath,
out = entity.getId() + TIMELINE_SERVICE_STORAGE_EXTENSION);
new PrintWriter(new BufferedWriter(new OutputStreamWriter( createFileWithRetries(filePath);
new FileOutputStream(fileName, true), "UTF-8")));
out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity)); byte[] record = new StringBuilder()
out.write("\n"); .append(TimelineUtils.dumpTimelineRecordtoJSON(entity))
} catch (IOException ioe) { .append("\n").toString().getBytes("UTF-8");
TimelineWriteError error = new TimelineWriteError(); writeFileWithRetries(filePath, record);
error.setEntityId(entity.getId()); } catch (Exception ioe) {
error.setEntityType(entity.getType()); LOG.warn("Interrupted operation:" + ioe.getMessage());
TimelineWriteError error = createTimelineWriteError(entity);
/* /*
* TODO: set an appropriate error code after PoC could possibly be: * TODO: set an appropriate error code after PoC could possibly be:
* error.setErrorCode(TimelineWriteError.IO_EXCEPTION); * error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
*/ */
response.addError(error); response.addError(error);
} finally {
if (out != null) {
out.close();
} }
} }
private TimelineWriteError createTimelineWriteError(TimelineEntity entity) {
TimelineWriteError error = new TimelineWriteError();
error.setEntityId(entity.getId());
error.setEntityType(entity.getType());
return error;
} }
@Override @Override
public TimelineWriteResponse aggregate(TimelineEntity data, public TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException { TimelineAggregationTrack track) throws IOException {
return null; return null;
} }
@VisibleForTesting @VisibleForTesting
String getOutputRoot() { String getOutputRoot() {
return outputRoot; return rootPath.toString();
} }
@Override @Override
public void serviceInit(Configuration conf) throws Exception { public void serviceInit(Configuration conf) throws Exception {
outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT, String outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT); conf.get("hadoop.tmp.dir") + File.separator + STORAGE_DIR_ROOT);
rootPath = new Path(outputRoot);
entitiesPath = new Path(rootPath, ENTITIES_DIR);
fsNumRetries = conf.getInt(TIMELINE_FS_WRITER_NUM_RETRIES,
DEFAULT_TIMELINE_FS_WRITER_NUM_RETRIES);
fsRetryInterval = conf.getLong(TIMELINE_FS_WRITER_RETRY_INTERVAL_MS,
DEFAULT_TIMELINE_FS_WRITER_RETRY_INTERVAL_MS);
fs = rootPath.getFileSystem(getConfig());
} }
@Override @Override
public void serviceStart() throws Exception { public void serviceStart() throws Exception {
mkdirs(outputRoot, ENTITIES_DIR); mkdirsWithRetries(rootPath);
mkdirsWithRetries(entitiesPath);
} }
@Override @Override
@ -157,18 +195,103 @@ public class FileSystemTimelineWriterImpl extends AbstractService
// no op // no op
} }
private static String mkdirs(String... dirStrs) throws IOException { private void mkdirs(Path... paths) throws IOException, InterruptedException {
StringBuilder path = new StringBuilder(); for (Path path: paths) {
for (String dirStr : dirStrs) { if (!existsWithRetries(path)) {
path.append(dirStr).append(File.separatorChar); mkdirsWithRetries(path);
File dir = new File(path.toString());
if (!dir.exists()) {
if (!dir.mkdirs()) {
throw new IOException("Could not create directories for " + dir);
} }
} }
} }
return path.toString();
// Code from FSRMStateStore.
private void mkdirsWithRetries(final Path dirPath)
throws IOException, InterruptedException {
new FSAction<Void>() {
@Override
public Void run() throws IOException {
fs.mkdirs(dirPath);
return null;
}
}.runWithRetries();
}
private void writeFileWithRetries(final Path outputPath, final byte[] data)
throws Exception {
new FSAction<Void>() {
@Override
public Void run() throws IOException {
writeFile(outputPath, data);
return null;
}
}.runWithRetries();
}
private boolean createFileWithRetries(final Path newFile)
throws IOException, InterruptedException {
return new FSAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return createFile(newFile);
}
}.runWithRetries();
}
private boolean existsWithRetries(final Path path)
throws IOException, InterruptedException {
return new FSAction<Boolean>() {
@Override
public Boolean run() throws IOException {
return fs.exists(path);
}
}.runWithRetries();
}
private abstract class FSAction<T> {
abstract T run() throws IOException;
T runWithRetries() throws IOException, InterruptedException {
int retry = 0;
while (true) {
try {
return run();
} catch (IOException e) {
LOG.info("Exception while executing a FS operation.", e);
if (++retry > fsNumRetries) {
LOG.info("Maxed out FS retries. Giving up!");
throw e;
}
LOG.info("Will retry operation on FS. Retry no. " + retry +
" after sleeping for " + fsRetryInterval + " seconds");
Thread.sleep(fsRetryInterval);
}
}
}
}
private boolean createFile(Path newFile) throws IOException {
return fs.createNewFile(newFile);
}
/**
* In order to make this writeInternal atomic as a part of writeInternal
* we will first writeInternal data to .tmp file and then rename it.
* Here we are assuming that rename is atomic for underlying file system.
*/
protected void writeFile(Path outputPath, byte[] data) throws IOException {
Path tempPath =
new Path(outputPath.getParent(), outputPath.getName() + ".tmp");
FSDataOutputStream fsOut = null;
// This file will be overwritten when app/attempt finishes for saving the
// final status.
try {
fsOut = fs.create(tempPath, true);
fsOut.write(data);
fsOut.close();
fsOut = null;
fs.rename(tempPath, outputPath);
} finally {
IOUtils.cleanupWithLogger(LOG, fsOut);
}
} }
// specifically escape the separator character // specifically escape the separator character

View File

@ -20,16 +20,19 @@ package org.apache.hadoop.yarn.server.timelineservice.storage;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.File; import java.io.File;
import java.nio.charset.StandardCharsets; import java.io.IOException;
import java.nio.file.Files; import java.io.InputStreamReader;
import java.nio.file.Path; import java.util.ArrayList;
import java.nio.file.Paths;
import java.util.HashMap; import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
@ -96,16 +99,20 @@ public class TestFileSystemTimelineWriterImpl {
"flow_version", 12345678L, "app_id"), "flow_version", 12345678L, "app_id"),
te, UserGroupInformation.createRemoteUser("user_id")); te, UserGroupInformation.createRemoteUser("user_id"));
String fileName = fsi.getOutputRoot() + File.separator + "entities" + String fileName = outputRoot + File.separator + "entities" +
File.separator + "cluster_id" + File.separator + "user_id" + File.separator + "cluster_id" + File.separator + "user_id" +
File.separator + "flow_name" + File.separator + "flow_version" + File.separator + "flow_name" + File.separator + "flow_version" +
File.separator + "12345678" + File.separator + "app_id" + File.separator + "12345678" + File.separator + "app_id" +
File.separator + type + File.separator + id + File.separator + type + File.separator + id +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path = Paths.get(fileName); Path path = new Path(fileName);
File f = new File(fileName); FileSystem fs = FileSystem.get(conf);
assertTrue(f.exists() && !f.isDirectory()); assertTrue("Specified path(" + fileName + ") should exist: ",
List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8); fs.exists(path));
FileStatus fileStatus = fs.getFileStatus(path);
assertTrue("Specified path should be a file",
!fileStatus.isDirectory());
List<String> data = readFromFile(fs, path);
// ensure there's only one entity + 1 new line // ensure there's only one entity + 1 new line
assertTrue("data size is:" + data.size(), data.size() == 2); assertTrue("data size is:" + data.size(), data.size() == 2);
String d = data.get(0); String d = data.get(0);
@ -119,12 +126,15 @@ public class TestFileSystemTimelineWriterImpl {
File.separator + "12345678" + File.separator + "app_id" + File.separator + "12345678" + File.separator + "app_id" +
File.separator + type2 + File.separator + id2 + File.separator + type2 + File.separator + id2 +
FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION; FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path2 = Paths.get(fileName2); Path path2 = new Path(fileName2);
File file = new File(fileName2); assertTrue("Specified path(" + fileName + ") should exist: ",
assertTrue(file.exists() && !file.isDirectory()); fs.exists(path2));
List<String> data2 = Files.readAllLines(path2, StandardCharsets.UTF_8); FileStatus fileStatus2 = fs.getFileStatus(path2);
assertTrue("Specified path should be a file",
!fileStatus2.isDirectory());
List<String> data2 = readFromFile(fs, path2);
// ensure there's only one entity + 1 new line // ensure there's only one entity + 1 new line
assertTrue("data size is:" + data.size(), data2.size() == 2); assertTrue("data size is:" + data2.size(), data2.size() == 2);
String metricToString = data2.get(0); String metricToString = data2.get(0);
// confirm the contents same as what was written // confirm the contents same as what was written
assertEquals(metricToString, assertEquals(metricToString,
@ -136,4 +146,17 @@ public class TestFileSystemTimelineWriterImpl {
} }
} }
private List<String> readFromFile(FileSystem fs, Path path)
throws IOException {
BufferedReader br = new BufferedReader(
new InputStreamReader(fs.open(path)));
List<String> data = new ArrayList<>();
String line = br.readLine();
data.add(line);
while(line != null) {
line = br.readLine();
data.add(line);
}
return data;
}
} }