YARN-3264. Created backing storage write interface and a POC only FS based storage implementation. Contributed by Vrushali C.

(cherry picked from commit 821b68d05d246fd57d7b7286eb2ccc075ed1eae8)
This commit is contained in:
Zhijie Shen 2015-03-05 15:03:30 -08:00 committed by Sangjin Lee
parent bf54d32750
commit f0e752c14b
8 changed files with 555 additions and 20 deletions

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records.timelineservice;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import java.util.ArrayList;
import java.util.List;
/**
* A class that holds a list of put errors. This is the response returned when a
* list of {@link TimelineEntity} objects is added to the timeline. If there are errors
* in storing individual entity objects, they will be indicated in the list of
* errors.
*/
@XmlRootElement(name = "response")
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public class TimelineWriteResponse {
private List<TimelineWriteError> errors = new ArrayList<TimelineWriteError>();
public TimelineWriteResponse() {
}
/**
* Get a list of {@link TimelineWriteError} instances
*
* @return a list of {@link TimelineWriteError} instances
*/
@XmlElement(name = "errors")
public List<TimelineWriteError> getErrors() {
return errors;
}
/**
* Add a single {@link TimelineWriteError} instance into the existing list
*
* @param error
* a single {@link TimelineWriteError} instance
*/
public void addError(TimelineWriteError error) {
errors.add(error);
}
/**
* Add a list of {@link TimelineWriteError} instances into the existing list
*
* @param errors
* a list of {@link TimelineWriteError} instances
*/
public void addErrors(List<TimelineWriteError> errors) {
this.errors.addAll(errors);
}
/**
* Set the list to the given list of {@link TimelineWriteError} instances
*
* @param errors
* a list of {@link TimelineWriteError} instances
*/
public void setErrors(List<TimelineWriteError> errors) {
this.errors.clear();
this.errors.addAll(errors);
}
/**
* A class that holds the error code for one entity.
*/
@XmlRootElement(name = "error")
@XmlAccessorType(XmlAccessType.NONE)
@Public
@Unstable
public static class TimelineWriteError {
/**
* Error code returned if an IOException is encountered when storing an
* entity.
*/
public static final int IO_EXCEPTION = 1;
private String entityId;
private String entityType;
private int errorCode;
/**
* Get the entity Id
*
* @return the entity Id
*/
@XmlElement(name = "entity")
public String getEntityId() {
return entityId;
}
/**
* Set the entity Id
*
* @param entityId
* the entity Id
*/
public void setEntityId(String entityId) {
this.entityId = entityId;
}
/**
* Get the entity type
*
* @return the entity type
*/
@XmlElement(name = "entitytype")
public String getEntityType() {
return entityType;
}
/**
* Set the entity type
*
* @param entityType
* the entity type
*/
public void setEntityType(String entityType) {
this.entityType = entityType;
}
/**
* Get the error code
*
* @return an error code
*/
@XmlElement(name = "errorcode")
public int getErrorCode() {
return errorCode;
}
/**
* Set the error code to the given error code
*
* @param errorCode
* an error code
*/
public void setErrorCode(int errorCode) {
this.errorCode = errorCode;
}
}
}

View File

@ -1927,6 +1927,12 @@ public class YarnConfiguration extends Configuration {
= TIMELINE_SERVICE_PREFIX
+ "entity-file.fs-support-append";
/**
* Settings for timeline service v2.0
*/
public static final String TIMELINE_SERVICE_WRITER_CLASS =
TIMELINE_SERVICE_PREFIX + "writer.class";
// mark app-history related configs @Private as application history is going
// to be integrated into the timeline service
@Private

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.JarFinder;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.timeline.TimelineVersionWatcher;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.server.timelineservice.aggregator.PerNodeTimelineAggregatorsAuxService;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -324,6 +326,7 @@ public class TestDistributedShell {
boolean verified = false;
String errorMessage = "";
ApplicationId appId = null;
while(!verified) {
List<ApplicationReport> apps = yarnClient.getApplications();
if (apps.size() == 0 ) {
@ -331,6 +334,7 @@ public class TestDistributedShell {
continue;
}
ApplicationReport appReport = apps.get(0);
appId = appReport.getApplicationId();
if(appReport.getHost().equals("N/A")) {
Thread.sleep(10);
continue;
@ -373,7 +377,7 @@ public class TestDistributedShell {
if (!isTestingTimelineV2) {
checkTimelineV1(haveDomain);
} else {
checkTimelineV2(haveDomain);
checkTimelineV2(haveDomain, appId);
}
}
@ -429,8 +433,54 @@ public class TestDistributedShell {
}
}
private void checkTimelineV2(boolean haveDomain) {
// TODO check timeline V2 here after we have a storage layer
private void checkTimelineV2(boolean haveDomain, ApplicationId appId) {
// For PoC check in /tmp/ YARN-3264
String tmpRoot = FileSystemTimelineWriterImpl.DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT;
File tmpRootFolder = new File(tmpRoot);
Assert.assertTrue(tmpRootFolder.isDirectory());
// for this test, we expect DS_APP_ATTEMPT AND DS_CONTAINER dirs
String outputDirApp = tmpRoot + "/DS_APP_ATTEMPT/";
File entityFolder = new File(outputDirApp);
Assert.assertTrue(entityFolder.isDirectory());
// there will be at least one attempt, look for that file
String appTimestampFileName = "appattempt_" + appId.getClusterTimestamp()
+ "_000" + appId.getId() + "_000001"
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
String appAttemptFileName = outputDirApp + appTimestampFileName;
File appAttemptFile = new File(appAttemptFileName);
Assert.assertTrue(appAttemptFile.exists());
String outputDirContainer = tmpRoot + "/DS_CONTAINER/";
File containerFolder = new File(outputDirContainer);
Assert.assertTrue(containerFolder.isDirectory());
String containerTimestampFileName = "container_"
+ appId.getClusterTimestamp() + "_000" + appId.getId()
+ "_01_000002.thist";
String containerFileName = outputDirContainer + containerTimestampFileName;
File containerFile = new File(containerFileName);
Assert.assertTrue(containerFile.exists());
String appTimeStamp = appId.getClusterTimestamp() + "_" + appId.getId()
+ "_";
deleteAppFiles(new File(outputDirApp), appTimeStamp);
deleteAppFiles(new File(outputDirContainer), appTimeStamp);
tmpRootFolder.delete();
}
private void deleteAppFiles(File rootDir, String appTimeStamp) {
boolean deleted = false;
File[] listOfFiles = rootDir.listFiles();
for (File f1 : listOfFiles) {
// list all attempts for this app and delete them
if (f1.getName().contains(appTimeStamp)){
deleted = f1.delete();
Assert.assertTrue(deleted);
}
}
}
/**

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -26,12 +28,16 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
import org.apache.hadoop.util.ReflectionUtils;
/**
* Service that handles writes to the timeline service and writes them to the
* backing storage.
*
* Classes that extend this can putIfAbsent their own lifecycle management or
* Classes that extend this can add their own lifecycle management or
* customization of request handling.
*/
@Private
@ -39,6 +45,8 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
public abstract class TimelineAggregator extends CompositeService {
private static final Log LOG = LogFactory.getLog(TimelineAggregator.class);
private TimelineWriter writer;
public TimelineAggregator(String name) {
super(name);
}
@ -46,6 +54,11 @@ public abstract class TimelineAggregator extends CompositeService {
@Override
protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
writer = ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
FileSystemTimelineWriterImpl.class,
TimelineWriter.class), conf);
writer.init(conf);
}
@Override
@ -56,6 +69,11 @@ public abstract class TimelineAggregator extends CompositeService {
@Override
protected void serviceStop() throws Exception {
super.serviceStop();
writer.stop();
}
public TimelineWriter getWriter() {
return writer;
}
/**
@ -69,20 +87,17 @@ public abstract class TimelineAggregator extends CompositeService {
*
* @param entities entities to post
* @param callerUgi the caller UGI
* @return the response that contains the result of the post.
*/
public void postEntities(TimelineEntities entities,
UserGroupInformation callerUgi) {
// Add this output temporarily for our prototype
// TODO remove this after we have an actual implementation
LOG.info("SUCCESS - TIMELINE V2 PROTOTYPE");
LOG.info("postEntities(entities=" + entities + ", callerUgi=" +
callerUgi + ")");
// TODO implement
public TimelineWriteResponse postEntities(TimelineEntities entities,
UserGroupInformation callerUgi) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("postEntities(entities=" + entities + ", callerUgi=" +
callerUgi + ")");
LOG.debug("SUCCESS - TIMELINE V2 PROTOTYPE");
LOG.debug("postEntities(entities=" + entities + ", callerUgi="
+ callerUgi + ")");
}
return writer.write(entities);
}
/**
@ -104,4 +119,4 @@ public abstract class TimelineAggregator extends CompositeService {
callerUgi + ")");
}
}
}
}

View File

@ -0,0 +1,144 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse.TimelineWriteError;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
/**
* This implements a local file based backend for storing application timeline
* information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class FileSystemTimelineWriterImpl extends AbstractService
implements TimelineWriter {
private String outputRoot;
/** Config param for timeline service storage tmp root for FILE YARN-3264 */
public static final String TIMELINE_SERVICE_STORAGE_DIR_ROOT
= YarnConfiguration.TIMELINE_SERVICE_PREFIX + "fs-writer.root-dir";
/** default value for storage location on local disk */
public static final String DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT
= "/tmp/timeline_service_data/";
/** Default extension for output files */
public static final String TIMELINE_SERVICE_STORAGE_EXTENSION = ".thist";
FileSystemTimelineWriterImpl() {
super((FileSystemTimelineWriterImpl.class.getName()));
}
/**
* Stores the entire information in {@link TimelineEntity} to the
* timeline store. Any errors occurring for individual write request objects
* will be reported in the response.
*
* @param data
* a {@link TimelineEntity} object
* @return {@link TimelineWriteResponse} object.
* @throws IOException
*/
@Override
public TimelineWriteResponse write(TimelineEntities entities)
throws IOException {
TimelineWriteResponse response = new TimelineWriteResponse();
for (TimelineEntity entity : entities.getEntities()) {
write(entity, response);
}
return response;
}
private void write(TimelineEntity entity,
TimelineWriteResponse response) throws IOException {
PrintWriter out = null;
try {
File outputDir = new File(outputRoot + entity.getType());
String fileName = outputDir + "/" + entity.getId()
+ TIMELINE_SERVICE_STORAGE_EXTENSION;
if (!outputDir.exists()) {
if (!outputDir.mkdirs()) {
throw new IOException("Could not create directories for " + fileName);
}
}
out = new PrintWriter(new BufferedWriter(new FileWriter(fileName, true)));
out.println(TimelineUtils.dumpTimelineRecordtoJSON(entity));
out.write("\n");
} catch (IOException ioe) {
TimelineWriteError error = new TimelineWriteError();
error.setEntityId(entity.getId());
error.setEntityType(entity.getType());
/*
* TODO: set an appropriate error code after PoC could possibly be:
* error.setErrorCode(TimelineWriteError.IO_EXCEPTION);
*/
response.addError(error);
} finally {
if (out != null) {
out.close();
}
}
}
/**
* Aggregates the entity information to the timeline store based on which
* track this entity is to be rolled up to The tracks along which aggregations
* are to be done are given by {@link TimelineAggregationTrack}
*
* Any errors occurring for individual write request objects will be reported
* in the response.
*
* @param data
* a {@link TimelineEntity} object
* a {@link TimelineAggregationTrack} enum value
* @return a {@link TimelineWriteResponse} object.
* @throws IOException
*/
public TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException {
return null;
}
public String getOutputRoot() {
return outputRoot;
}
@Override
public void serviceInit(Configuration conf) throws Exception {
outputRoot = conf.get(TIMELINE_SERVICE_STORAGE_DIR_ROOT,
DEFAULT_TIMELINE_SERVICE_STORAGE_DIR_ROOT);
}
}

View File

@ -16,8 +16,13 @@
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.aggregator;
public class TestTimelineAggregator {
package org.apache.hadoop.yarn.server.timelineservice.storage;
/**
* specifies the tracks along which an entity
* info is to be aggregated on
*
*/
public enum TimelineAggregationTrack {
FLOW, USER, QUEUE
}

View File

@ -0,0 +1,66 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
import org.apache.hadoop.service.Service;
/**
* This interface is for storing application timeline information.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public interface TimelineWriter extends Service {
/**
* Stores the entire information in {@link TimelineEntities} to the
* timeline store. Any errors occurring for individual write request objects
* will be reported in the response.
*
* @param data
* a {@link TimelineEntities} object.
* @return a {@link TimelineWriteResponse} object.
* @throws IOException
*/
TimelineWriteResponse write(TimelineEntities data) throws IOException;
/**
* Aggregates the entity information to the timeline store based on which
* track this entity is to be rolled up to The tracks along which aggregations
* are to be done are given by {@link TimelineAggregationTrack}
*
* Any errors occurring for individual write request objects will be reported
* in the response.
*
* @param data
* a {@link TimelineEntity} object
* a {@link TimelineAggregationTrack} enum
* value.
* @return a {@link TimelineWriteResponse} object.
* @throws IOException
*/
TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException;
}

View File

@ -0,0 +1,79 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
import org.junit.Test;
import org.apache.commons.io.FileUtils;
public class TestFileSystemTimelineWriterImpl {
/**
* Unit test for PoC YARN 3264
* @throws Exception
*/
@Test
public void testWriteEntityToFile() throws Exception {
String name = "unit_test_BaseAggregator_testWriteEntityToFile_"
+ Long.toString(System.currentTimeMillis());
TimelineEntities te = new TimelineEntities();
TimelineEntity entity = new TimelineEntity();
String id = "hello";
String type = "world";
entity.setId(id);
entity.setType(type);
entity.setCreatedTime(1425016501000L);
entity.setModifiedTime(1425016502000L);
te.addEntity(entity);
FileSystemTimelineWriterImpl fsi = new FileSystemTimelineWriterImpl();
fsi.serviceInit(new Configuration());
fsi.write(te);
String fileName = fsi.getOutputRoot() + "/" + type + "/" + id
+ FileSystemTimelineWriterImpl.TIMELINE_SERVICE_STORAGE_EXTENSION;
Path path = Paths.get(fileName);
File f = new File(fileName);
assertTrue(f.exists() && !f.isDirectory());
List<String> data = Files.readAllLines(path, StandardCharsets.UTF_8);
// ensure there's only one entity + 1 new line
assertTrue(data.size() == 2);
String d = data.get(0);
// confirm the contents same as what was written
assertEquals(d, TimelineUtils.dumpTimelineRecordtoJSON(entity));
// delete the directory
File outputDir = new File(fsi.getOutputRoot());
FileUtils.deleteDirectory(outputDir);
assertTrue(!(f.exists()));
}
}