YARN-3087. Made the REST server of per-node aggregator work alone in NM daemon. Conntributed by Li Lu.

(cherry picked from commit 41a08ad404d4278fe598d6c222b2ae0e84bae0df)
This commit is contained in:
Zhijie Shen 2015-02-26 15:21:42 -08:00 committed by Sangjin Lee
parent 309592b9e5
commit 9d57c9c015
7 changed files with 147 additions and 56 deletions

View File

@ -33,7 +33,7 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
public abstract class HierarchicalTimelineEntity extends TimelineEntity { public abstract class HierarchicalTimelineEntity extends TimelineEntity {
private Identifier parent; private Identifier parent;
private Map<String, Set<String>> children = new HashMap<>(); private HashMap<String, Set<String>> children = new HashMap<>();
HierarchicalTimelineEntity(String type) { HierarchicalTimelineEntity(String type) {
super(type); super(type);
@ -56,14 +56,24 @@ public void setParent(String type, String id) {
parent.setId(id); parent.setId(id);
} }
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "children") @XmlElement(name = "children")
public HashMap<String, Set<String>> getChildrenJAXB() {
return children;
}
public Map<String, Set<String>> getChildren() { public Map<String, Set<String>> getChildren() {
return children; return children;
} }
public void setChildren(Map<String, Set<String>> children) { public void setChildren(Map<String, Set<String>> children) {
validateChildren(children); validateChildren(children);
this.children = children; if (children != null && !(children instanceof HashMap)) {
this.children = new HashMap<String, Set<String>>(children);
} else {
this.children = (HashMap) children;
}
} }
public void addChildren(Map<String, Set<String>> children) { public void addChildren(Map<String, Set<String>> children) {

View File

@ -65,12 +65,12 @@ public void setId(String id) {
} }
private Identifier identifier; private Identifier identifier;
private Map<String, Object> info = new HashMap<>(); private HashMap<String, Object> info = new HashMap<>();
private Map<String, Object> configs = new HashMap<>(); private HashMap<String, Object> configs = new HashMap<>();
private Set<TimelineMetric> metrics = new HashSet<>(); private Set<TimelineMetric> metrics = new HashSet<>();
private Set<TimelineEvent> events = new HashSet<>(); private Set<TimelineEvent> events = new HashSet<>();
private Map<String, Set<String>> isRelatedToEntities = new HashMap<>(); private HashMap<String, Set<String>> isRelatedToEntities = new HashMap<>();
private Map<String, Set<String>> relatesToEntities = new HashMap<>(); private HashMap<String, Set<String>> relatesToEntities = new HashMap<>();
private long createdTime; private long createdTime;
private long modifiedTime; private long modifiedTime;
@ -109,13 +109,23 @@ public void setIdentifier(Identifier identifier) {
this.identifier = identifier; this.identifier = identifier;
} }
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "info") @XmlElement(name = "info")
public HashMap<String, Object> getInfoJAXB() {
return info;
}
public Map<String, Object> getInfo() { public Map<String, Object> getInfo() {
return info; return info;
} }
public void setInfo(Map<String, Object> info) { public void setInfo(Map<String, Object> info) {
this.info = info; if (info != null && !(info instanceof HashMap)) {
this.info = new HashMap<String, Object>(info);
} else {
this.info = (HashMap<String, Object>) info;
}
} }
public void addInfo(Map<String, Object> info) { public void addInfo(Map<String, Object> info) {
@ -126,13 +136,23 @@ public void addInfo(String key, Object value) {
info.put(key, value); info.put(key, value);
} }
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "configs") @XmlElement(name = "configs")
public HashMap<String, Object> getConfigsJAXB() {
return configs;
}
public Map<String, Object> getConfigs() { public Map<String, Object> getConfigs() {
return configs; return configs;
} }
public void setConfigs(Map<String, Object> configs) { public void setConfigs(Map<String, Object> configs) {
this.configs = configs; if (configs != null && !(configs instanceof HashMap)) {
this.configs = new HashMap<String, Object>(configs);
} else {
this.configs = (HashMap<String, Object>) configs;
}
} }
public void addConfigs(Map<String, Object> configs) { public void addConfigs(Map<String, Object> configs) {
@ -177,14 +197,24 @@ public void addEvent(TimelineEvent event) {
events.add(event); events.add(event);
} }
@XmlElement(name = "isrelatedto")
public Map<String, Set<String>> getIsRelatedToEntities() { public Map<String, Set<String>> getIsRelatedToEntities() {
return isRelatedToEntities; return isRelatedToEntities;
} }
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "isrelatedto")
public HashMap<String, Set<String>> getIsRelatedToEntitiesJAXB() {
return isRelatedToEntities;
}
public void setIsRelatedToEntities( public void setIsRelatedToEntities(
Map<String, Set<String>> isRelatedToEntities) { Map<String, Set<String>> isRelatedToEntities) {
this.isRelatedToEntities = isRelatedToEntities; if (isRelatedToEntities != null && !(isRelatedToEntities instanceof HashMap)) {
this.isRelatedToEntities = new HashMap<String, Set<String>>(isRelatedToEntities);
} else {
this.isRelatedToEntities = (HashMap<String, Set<String>>) isRelatedToEntities;
}
} }
public void addIsRelatedToEntities( public void addIsRelatedToEntities(
@ -209,7 +239,13 @@ public void addIsRelatedToEntity(String type, String id) {
ids.add(id); ids.add(id);
} }
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "relatesto") @XmlElement(name = "relatesto")
public HashMap<String, Set<String>> getRelatesToEntitiesJAXB() {
return relatesToEntities;
}
public Map<String, Set<String>> getRelatesToEntities() { public Map<String, Set<String>> getRelatesToEntities() {
return relatesToEntities; return relatesToEntities;
} }
@ -235,7 +271,11 @@ public void addRelatesToEntity(String type, String id) {
} }
public void setRelatesToEntities(Map<String, Set<String>> relatesToEntities) { public void setRelatesToEntities(Map<String, Set<String>> relatesToEntities) {
this.relatesToEntities = relatesToEntities; if (relatesToEntities != null && !(relatesToEntities instanceof HashMap)) {
this.relatesToEntities = new HashMap<String, Set<String>>(relatesToEntities);
} else {
this.relatesToEntities = (HashMap<String, Set<String>>) relatesToEntities;
}
} }
@XmlElement(name = "createdtime") @XmlElement(name = "createdtime")

View File

@ -33,7 +33,7 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class TimelineEvent { public class TimelineEvent {
private String id; private String id;
private Map<String, Object> info = new HashMap<>(); private HashMap<String, Object> info = new HashMap<>();
private long timestamp; private long timestamp;
public TimelineEvent() { public TimelineEvent() {
@ -49,13 +49,23 @@ public void setId(String id) {
this.id = id; this.id = id;
} }
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "info") @XmlElement(name = "info")
public HashMap<String, Object> getInfoJAXB() {
return info;
}
public Map<String, Object> getInfo() { public Map<String, Object> getInfo() {
return info; return info;
} }
public void setInfo(Map<String, Object> info) { public void setInfo(Map<String, Object> info) {
this.info = info; if (info != null && !(info instanceof HashMap)) {
this.info = new HashMap<String, Object>(info);
} else {
this.info = (HashMap<String, Object>) info;
}
} }
public void addInfo(Map<String, Object> info) { public void addInfo(Map<String, Object> info) {

View File

@ -34,9 +34,9 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class TimelineMetric { public class TimelineMetric {
private String id; private String id;
private Map<String, Object> info = new HashMap<>(); private HashMap<String, Object> info = new HashMap<>();
private Object singleData; private Object singleData;
private Map<Long, Object> timeSeries = new LinkedHashMap<>(); private HashMap<Long, Object> timeSeries = new LinkedHashMap<>();
private long startTime; private long startTime;
private long endTime; private long endTime;
@ -53,13 +53,23 @@ public void setId(String id) {
this.id = id; this.id = id;
} }
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "info") @XmlElement(name = "info")
public HashMap<String, Object> getInfoJAXB() {
return info;
}
public Map<String, Object> getInfo() { public Map<String, Object> getInfo() {
return info; return info;
} }
public void setInfo(Map<String, Object> info) { public void setInfo(Map<String, Object> info) {
this.info = info; if (info != null && !(info instanceof HashMap)) {
this.info = new HashMap<String, Object>(info);
} else {
this.info = (HashMap<String, Object>) info;
}
} }
public void addInfo(Map<String, Object> info) { public void addInfo(Map<String, Object> info) {
@ -79,13 +89,23 @@ public void setSingleData(Object singleData) {
this.singleData = singleData; this.singleData = singleData;
} }
// required by JAXB
@InterfaceAudience.Private
@XmlElement(name = "timeseries") @XmlElement(name = "timeseries")
public HashMap<Long, Object> getTimeSeriesJAXB() {
return timeSeries;
}
public Map<Long, Object> getTimeSeries() { public Map<Long, Object> getTimeSeries() {
return timeSeries; return timeSeries;
} }
public void setTimeSeries(Map<Long, Object> timeSeries) { public void setTimeSeries(Map<Long, Object> timeSeries) {
this.timeSeries = timeSeries; if (timeSeries != null && !(timeSeries instanceof LinkedHashMap)) {
this.timeSeries = new LinkedHashMap<Long, Object>(timeSeries);
} else {
this.timeSeries = (LinkedHashMap<Long, Object>) timeSeries;
}
} }
public void addTimeSeries(Map<Long, Object> timeSeries) { public void addTimeSeries(Map<Long, Object> timeSeries) {

View File

@ -126,12 +126,6 @@ public void setup() {
bind(NMWebServices.class); bind(NMWebServices.class);
bind(GenericExceptionHandler.class); bind(GenericExceptionHandler.class);
bind(JAXBContextResolver.class); bind(JAXBContextResolver.class);
// host the timeline service aggregator web service temporarily
// (see YARN-3087)
bind(PerNodeAggregatorWebService.class);
// bind to the global singleton instance
bind(AppLevelServiceManager.class).
toProvider(AppLevelServiceManagerProvider.class);
bind(ResourceView.class).toInstance(this.resourceView); bind(ResourceView.class).toInstance(this.resourceView);
bind(ApplicationACLsManager.class).toInstance(this.aclsManager); bind(ApplicationACLsManager.class).toInstance(this.aclsManager);
bind(LocalDirsHandlerService.class).toInstance(dirsHandler); bind(LocalDirsHandlerService.class).toInstance(dirsHandler);

View File

@ -18,14 +18,16 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator; package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.HashMap;
import com.google.inject.Inject;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.util.ExitUtil; import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -40,11 +42,16 @@
import org.apache.hadoop.yarn.server.api.ContainerContext; import org.apache.hadoop.yarn.server.api.ContainerContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext; import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext; import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.webapp.*; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils; import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
import org.apache.hadoop.http.HttpServer2;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER;
/** /**
* The top-level server for the per-node timeline aggregator service. Currently * The top-level server for the per-node timeline aggregator service. Currently
* it is defined as an auxiliary service to accommodate running within another * it is defined as an auxiliary service to accommodate running within another
@ -56,9 +63,10 @@ public class PerNodeAggregatorServer extends AuxiliaryService {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(PerNodeAggregatorServer.class); LogFactory.getLog(PerNodeAggregatorServer.class);
private static final int SHUTDOWN_HOOK_PRIORITY = 30; private static final int SHUTDOWN_HOOK_PRIORITY = 30;
static final String AGGREGATOR_COLLECTION_ATTR_KEY = "aggregator.collection";
private final AppLevelServiceManager serviceManager; private final AppLevelServiceManager serviceManager;
private WebApp webApp; private HttpServer2 timelineRestServer;
public PerNodeAggregatorServer() { public PerNodeAggregatorServer() {
// use the same singleton // use the same singleton
@ -86,8 +94,8 @@ protected void serviceStart() throws Exception {
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {
if (webApp != null) { if (timelineRestServer != null) {
webApp.stop(); timelineRestServer.stop();
} }
// stop the service manager // stop the service manager
serviceManager.stop(); serviceManager.stop();
@ -103,11 +111,31 @@ private void startWebApp() {
WebAppUtils.getAHSWebAppURLWithoutScheme(conf)); WebAppUtils.getAHSWebAppURLWithoutScheme(conf));
LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress); LOG.info("Instantiating the per-node aggregator webapp at " + bindAddress);
try { try {
webApp = Configuration confForInfoServer = new Configuration(conf);
WebApps confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS, 10);
.$for("timeline", null, null, "ws") HttpServer2.Builder builder = new HttpServer2.Builder()
.with(conf).at(bindAddress).start( .setName("timeline")
new TimelineServiceWebApp()); .setConf(conf)
.addEndpoint(URI.create("http://" + bindAddress));
timelineRestServer = builder.build();
// TODO: replace this by an authentification filter in future.
HashMap<String, String> options = new HashMap<String, String>();
String username = conf.get(HADOOP_HTTP_STATIC_USER,
DEFAULT_HADOOP_HTTP_STATIC_USER);
options.put(HADOOP_HTTP_STATIC_USER, username);
HttpServer2.defineFilter(timelineRestServer.getWebAppContext(),
"static_user_filter_timeline",
StaticUserWebFilter.StaticUserFilter.class.getName(),
options, new String[] {"/*"});
timelineRestServer.addJerseyResourcePackage(
PerNodeAggregatorWebService.class.getPackage().getName() + ";"
+ GenericExceptionHandler.class.getPackage().getName() + ";"
+ YarnJacksonJaxbJsonProvider.class.getPackage().getName(),
"/*");
timelineRestServer.setAttribute(AGGREGATOR_COLLECTION_ATTR_KEY,
AppLevelServiceManager.getInstance());
timelineRestServer.start();
} catch (Exception e) { } catch (Exception e) {
String msg = "The per-node aggregator webapp failed to start."; String msg = "The per-node aggregator webapp failed to start.";
LOG.error(msg, e); LOG.error(msg, e);
@ -115,19 +143,6 @@ private void startWebApp() {
} }
} }
private static class TimelineServiceWebApp
extends WebApp implements YarnWebParams {
@Override
public void setup() {
bind(YarnJacksonJaxbJsonProvider.class);
bind(GenericExceptionHandler.class);
bind(PerNodeAggregatorWebService.class);
// bind to the global singleton
bind(AppLevelServiceManager.class).
toProvider(AppLevelServiceManagerProvider.class);
}
}
// these methods can be used as the basis for future service methods if the // these methods can be used as the basis for future service methods if the
// per-node aggregator runs separate from the node manager // per-node aggregator runs separate from the node manager
/** /**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.timelineservice.aggregator; package org.apache.hadoop.yarn.server.timelineservice.aggregator;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.*; import javax.ws.rs.*;
@ -41,11 +42,8 @@
import org.apache.hadoop.yarn.webapp.ForbiddenException; import org.apache.hadoop.yarn.webapp.ForbiddenException;
import org.apache.hadoop.yarn.webapp.NotFoundException; import org.apache.hadoop.yarn.webapp.NotFoundException;
import com.google.inject.Inject;
import com.google.inject.Singleton; import com.google.inject.Singleton;
import java.net.URI;
/** /**
* The main per-node REST end point for timeline service writes. It is * The main per-node REST end point for timeline service writes. It is
* essentially a container service that routes requests to the appropriate * essentially a container service that routes requests to the appropriate
@ -59,12 +57,7 @@ public class PerNodeAggregatorWebService {
private static final Log LOG = private static final Log LOG =
LogFactory.getLog(PerNodeAggregatorWebService.class); LogFactory.getLog(PerNodeAggregatorWebService.class);
private final AppLevelServiceManager serviceManager; private @Context ServletContext context;
@Inject
public PerNodeAggregatorWebService(AppLevelServiceManager serviceManager) {
this.serviceManager = serviceManager;
}
@XmlRootElement(name = "about") @XmlRootElement(name = "about")
@XmlAccessorType(XmlAccessType.NONE) @XmlAccessorType(XmlAccessType.NONE)
@ -135,7 +128,7 @@ public Response putEntities(
if (appId == null) { if (appId == null) {
return Response.status(Response.Status.BAD_REQUEST).build(); return Response.status(Response.Status.BAD_REQUEST).build();
} }
AppLevelAggregatorService service = serviceManager.getService(appId); AppLevelAggregatorService service = getAggregatorService(req, appId);
if (service == null) { if (service == null) {
LOG.error("Application not found"); LOG.error("Application not found");
throw new NotFoundException(); // different exception? throw new NotFoundException(); // different exception?
@ -163,6 +156,15 @@ private String parseApplicationId(String appId) {
} }
} }
private AppLevelAggregatorService
getAggregatorService(HttpServletRequest req, String appIdToParse) {
String appIdString = parseApplicationId(appIdToParse);
final AppLevelServiceManager serviceManager =
(AppLevelServiceManager) context.getAttribute(
PerNodeAggregatorServer.AGGREGATOR_COLLECTION_ATTR_KEY);
return serviceManager.getService(appIdString);
}
private void init(HttpServletResponse response) { private void init(HttpServletResponse response) {
response.setContentType(null); response.setContentType(null);
} }