YARN-2302. Refactor TimelineWebServices. (Contributed by Zhijie Shen)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1617055 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
743f7f30da
commit
e91d099c4a
|
@ -105,6 +105,8 @@ Release 2.6.0 - UNRELEASED
|
||||||
|
|
||||||
YARN-1954. Added waitFor to AMRMClient(Async). (Tsuyoshi Ozawa via zjshen)
|
YARN-1954. Added waitFor to AMRMClient(Async). (Tsuyoshi Ozawa via zjshen)
|
||||||
|
|
||||||
|
YARN-2302. Refactor TimelineWebServices. (Zhijie Shen via junping_du)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.webapp.AHSWebApp;
|
||||||
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
|
||||||
|
@ -59,12 +60,12 @@ public class ApplicationHistoryServer extends CompositeService {
|
||||||
private static final Log LOG = LogFactory
|
private static final Log LOG = LogFactory
|
||||||
.getLog(ApplicationHistoryServer.class);
|
.getLog(ApplicationHistoryServer.class);
|
||||||
|
|
||||||
protected ApplicationHistoryClientService ahsClientService;
|
private ApplicationHistoryClientService ahsClientService;
|
||||||
protected ApplicationHistoryManager historyManager;
|
private ApplicationHistoryManager historyManager;
|
||||||
protected TimelineStore timelineStore;
|
private TimelineStore timelineStore;
|
||||||
protected TimelineDelegationTokenSecretManagerService secretManagerService;
|
private TimelineDelegationTokenSecretManagerService secretManagerService;
|
||||||
protected TimelineACLsManager timelineACLsManager;
|
private TimelineDataManager timelineDataManager;
|
||||||
protected WebApp webApp;
|
private WebApp webApp;
|
||||||
|
|
||||||
public ApplicationHistoryServer() {
|
public ApplicationHistoryServer() {
|
||||||
super(ApplicationHistoryServer.class.getName());
|
super(ApplicationHistoryServer.class.getName());
|
||||||
|
@ -72,15 +73,18 @@ public class ApplicationHistoryServer extends CompositeService {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
historyManager = createApplicationHistory();
|
// init timeline services first
|
||||||
ahsClientService = createApplicationHistoryClientService(historyManager);
|
|
||||||
addService(ahsClientService);
|
|
||||||
addService((Service) historyManager);
|
|
||||||
timelineStore = createTimelineStore(conf);
|
timelineStore = createTimelineStore(conf);
|
||||||
addIfService(timelineStore);
|
addIfService(timelineStore);
|
||||||
secretManagerService = createTimelineDelegationTokenSecretManagerService(conf);
|
secretManagerService = createTimelineDelegationTokenSecretManagerService(conf);
|
||||||
addService(secretManagerService);
|
addService(secretManagerService);
|
||||||
timelineACLsManager = createTimelineACLsManager(conf);
|
timelineDataManager = createTimelineDataManager(conf);
|
||||||
|
|
||||||
|
// init generic history service afterwards
|
||||||
|
historyManager = createApplicationHistoryManager(conf);
|
||||||
|
ahsClientService = createApplicationHistoryClientService(historyManager);
|
||||||
|
addService(ahsClientService);
|
||||||
|
addService((Service) historyManager);
|
||||||
|
|
||||||
DefaultMetricsSystem.initialize("ApplicationHistoryServer");
|
DefaultMetricsSystem.initialize("ApplicationHistoryServer");
|
||||||
JvmMetrics.initSingleton("ApplicationHistoryServer", null);
|
JvmMetrics.initSingleton("ApplicationHistoryServer", null);
|
||||||
|
@ -111,21 +115,22 @@ public class ApplicationHistoryServer extends CompositeService {
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public ApplicationHistoryClientService getClientService() {
|
ApplicationHistoryClientService getClientService() {
|
||||||
return this.ahsClientService;
|
return this.ahsClientService;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ApplicationHistoryClientService
|
/**
|
||||||
createApplicationHistoryClientService(
|
* @return ApplicationTimelineStore
|
||||||
ApplicationHistoryManager historyManager) {
|
*/
|
||||||
return new ApplicationHistoryClientService(historyManager);
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
public TimelineStore getTimelineStore() {
|
||||||
|
return timelineStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ApplicationHistoryManager createApplicationHistory() {
|
@Private
|
||||||
return new ApplicationHistoryManagerImpl();
|
@VisibleForTesting
|
||||||
}
|
ApplicationHistoryManager getApplicationHistoryManager() {
|
||||||
|
|
||||||
protected ApplicationHistoryManager getApplicationHistory() {
|
|
||||||
return this.historyManager;
|
return this.historyManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,28 +159,35 @@ public class ApplicationHistoryServer extends CompositeService {
|
||||||
launchAppHistoryServer(args);
|
launchAppHistoryServer(args);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected ApplicationHistoryManager createApplicationHistoryManager(
|
private ApplicationHistoryClientService
|
||||||
|
createApplicationHistoryClientService(
|
||||||
|
ApplicationHistoryManager historyManager) {
|
||||||
|
return new ApplicationHistoryClientService(historyManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationHistoryManager createApplicationHistoryManager(
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
return new ApplicationHistoryManagerImpl();
|
return new ApplicationHistoryManagerImpl();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TimelineStore createTimelineStore(
|
private TimelineStore createTimelineStore(
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
return ReflectionUtils.newInstance(conf.getClass(
|
return ReflectionUtils.newInstance(conf.getClass(
|
||||||
YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
|
YarnConfiguration.TIMELINE_SERVICE_STORE, LeveldbTimelineStore.class,
|
||||||
TimelineStore.class), conf);
|
TimelineStore.class), conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TimelineDelegationTokenSecretManagerService
|
private TimelineDelegationTokenSecretManagerService
|
||||||
createTimelineDelegationTokenSecretManagerService(Configuration conf) {
|
createTimelineDelegationTokenSecretManagerService(Configuration conf) {
|
||||||
return new TimelineDelegationTokenSecretManagerService();
|
return new TimelineDelegationTokenSecretManagerService();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected TimelineACLsManager createTimelineACLsManager(Configuration conf) {
|
private TimelineDataManager createTimelineDataManager(Configuration conf) {
|
||||||
return new TimelineACLsManager(conf);
|
return new TimelineDataManager(
|
||||||
|
timelineStore, new TimelineACLsManager(conf));
|
||||||
}
|
}
|
||||||
|
|
||||||
protected void startWebApp() {
|
private void startWebApp() {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
// Always load pseudo authentication filter to parse "user.name" in an URL
|
// Always load pseudo authentication filter to parse "user.name" in an URL
|
||||||
// to identify a HTTP request's user in insecure mode.
|
// to identify a HTTP request's user in insecure mode.
|
||||||
|
@ -199,9 +211,8 @@ public class ApplicationHistoryServer extends CompositeService {
|
||||||
try {
|
try {
|
||||||
AHSWebApp ahsWebApp = AHSWebApp.getInstance();
|
AHSWebApp ahsWebApp = AHSWebApp.getInstance();
|
||||||
ahsWebApp.setApplicationHistoryManager(historyManager);
|
ahsWebApp.setApplicationHistoryManager(historyManager);
|
||||||
ahsWebApp.setTimelineStore(timelineStore);
|
|
||||||
ahsWebApp.setTimelineDelegationTokenSecretManagerService(secretManagerService);
|
ahsWebApp.setTimelineDelegationTokenSecretManagerService(secretManagerService);
|
||||||
ahsWebApp.setTimelineACLsManager(timelineACLsManager);
|
ahsWebApp.setTimelineDataManager(timelineDataManager);
|
||||||
webApp =
|
webApp =
|
||||||
WebApps
|
WebApps
|
||||||
.$for("applicationhistory", ApplicationHistoryClientService.class,
|
.$for("applicationhistory", ApplicationHistoryClientService.class,
|
||||||
|
@ -213,14 +224,6 @@ public class ApplicationHistoryServer extends CompositeService {
|
||||||
throw new YarnRuntimeException(msg, e);
|
throw new YarnRuntimeException(msg, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/**
|
|
||||||
* @return ApplicationTimelineStore
|
|
||||||
*/
|
|
||||||
@Private
|
|
||||||
@VisibleForTesting
|
|
||||||
public TimelineStore getTimelineStore() {
|
|
||||||
return timelineStore;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void doSecureLogin(Configuration conf) throws IOException {
|
private void doSecureLogin(Configuration conf) throws IOException {
|
||||||
InetSocketAddress socAddr = getBindAddress(conf);
|
InetSocketAddress socAddr = getBindAddress(conf);
|
||||||
|
|
|
@ -22,8 +22,7 @@ import static org.apache.hadoop.yarn.util.StringHelper.pajoin;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.yarn.server.api.ApplicationContext;
|
import org.apache.hadoop.yarn.server.api.ApplicationContext;
|
||||||
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
|
import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelegationTokenSecretManagerService;
|
||||||
import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices;
|
import org.apache.hadoop.yarn.server.timeline.webapp.TimelineWebServices;
|
||||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||||
|
@ -36,9 +35,8 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
public class AHSWebApp extends WebApp implements YarnWebParams {
|
public class AHSWebApp extends WebApp implements YarnWebParams {
|
||||||
|
|
||||||
private ApplicationHistoryManager applicationHistoryManager;
|
private ApplicationHistoryManager applicationHistoryManager;
|
||||||
private TimelineStore timelineStore;
|
|
||||||
private TimelineDelegationTokenSecretManagerService secretManagerService;
|
private TimelineDelegationTokenSecretManagerService secretManagerService;
|
||||||
private TimelineACLsManager timelineACLsManager;
|
private TimelineDataManager timelineDataManager;
|
||||||
|
|
||||||
private static AHSWebApp instance = null;
|
private static AHSWebApp instance = null;
|
||||||
|
|
||||||
|
@ -68,14 +66,6 @@ public class AHSWebApp extends WebApp implements YarnWebParams {
|
||||||
this.applicationHistoryManager = applicationHistoryManager;
|
this.applicationHistoryManager = applicationHistoryManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TimelineStore getTimelineStore() {
|
|
||||||
return timelineStore;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setTimelineStore(TimelineStore timelineStore) {
|
|
||||||
this.timelineStore = timelineStore;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TimelineDelegationTokenSecretManagerService
|
public TimelineDelegationTokenSecretManagerService
|
||||||
getTimelineDelegationTokenSecretManagerService() {
|
getTimelineDelegationTokenSecretManagerService() {
|
||||||
return secretManagerService;
|
return secretManagerService;
|
||||||
|
@ -86,12 +76,12 @@ public class AHSWebApp extends WebApp implements YarnWebParams {
|
||||||
this.secretManagerService = secretManagerService;
|
this.secretManagerService = secretManagerService;
|
||||||
}
|
}
|
||||||
|
|
||||||
public TimelineACLsManager getTimelineACLsManager() {
|
public TimelineDataManager getTimelineDataManager() {
|
||||||
return timelineACLsManager;
|
return timelineDataManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setTimelineACLsManager(TimelineACLsManager timelineACLsManager) {
|
public void setTimelineDataManager(TimelineDataManager timelineDataManager) {
|
||||||
this.timelineACLsManager = timelineACLsManager;
|
this.timelineDataManager = timelineDataManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -101,10 +91,9 @@ public class AHSWebApp extends WebApp implements YarnWebParams {
|
||||||
bind(TimelineWebServices.class);
|
bind(TimelineWebServices.class);
|
||||||
bind(GenericExceptionHandler.class);
|
bind(GenericExceptionHandler.class);
|
||||||
bind(ApplicationContext.class).toInstance(applicationHistoryManager);
|
bind(ApplicationContext.class).toInstance(applicationHistoryManager);
|
||||||
bind(TimelineStore.class).toInstance(timelineStore);
|
|
||||||
bind(TimelineDelegationTokenSecretManagerService.class).toInstance(
|
bind(TimelineDelegationTokenSecretManagerService.class).toInstance(
|
||||||
secretManagerService);
|
secretManagerService);
|
||||||
bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
|
bind(TimelineDataManager.class).toInstance(timelineDataManager);
|
||||||
route("/", AHSController.class);
|
route("/", AHSController.class);
|
||||||
route(pajoin("/apps", APP_STATE), AHSController.class);
|
route(pajoin("/apps", APP_STATE), AHSController.class);
|
||||||
route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");
|
route(pajoin("/app", APPLICATION_ID), AHSController.class, "app");
|
||||||
|
|
|
@ -0,0 +1,319 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.timeline;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.EnumSet;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.SortedSet;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||||
|
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The class wrap over the timeline store and the ACLs manager. It does some non
|
||||||
|
* trivial manipulation of the timeline data before putting or after getting it
|
||||||
|
* from the timeline store, and checks the user's access to it.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class TimelineDataManager {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(TimelineDataManager.class);
|
||||||
|
|
||||||
|
private TimelineStore store;
|
||||||
|
private TimelineACLsManager timelineACLsManager;
|
||||||
|
|
||||||
|
public TimelineDataManager(TimelineStore store,
|
||||||
|
TimelineACLsManager timelineACLsManager) {
|
||||||
|
this.store = store;
|
||||||
|
this.timelineACLsManager = timelineACLsManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the timeline entities that the given user have access to. The meaning
|
||||||
|
* of each argument has been documented with
|
||||||
|
* {@link TimelineReader#getEntities}.
|
||||||
|
*
|
||||||
|
* @see TimelineReader#getEntities
|
||||||
|
*/
|
||||||
|
public TimelineEntities getEntities(
|
||||||
|
String entityType,
|
||||||
|
NameValuePair primaryFilter,
|
||||||
|
Collection<NameValuePair> secondaryFilter,
|
||||||
|
Long windowStart,
|
||||||
|
Long windowEnd,
|
||||||
|
String fromId,
|
||||||
|
Long fromTs,
|
||||||
|
Long limit,
|
||||||
|
EnumSet<Field> fields,
|
||||||
|
UserGroupInformation callerUGI) throws YarnException, IOException {
|
||||||
|
TimelineEntities entities = null;
|
||||||
|
boolean modified = extendFields(fields);
|
||||||
|
entities = store.getEntities(
|
||||||
|
entityType,
|
||||||
|
limit,
|
||||||
|
windowStart,
|
||||||
|
windowEnd,
|
||||||
|
fromId,
|
||||||
|
fromTs,
|
||||||
|
primaryFilter,
|
||||||
|
secondaryFilter,
|
||||||
|
fields);
|
||||||
|
if (entities != null) {
|
||||||
|
Iterator<TimelineEntity> entitiesItr =
|
||||||
|
entities.getEntities().iterator();
|
||||||
|
while (entitiesItr.hasNext()) {
|
||||||
|
TimelineEntity entity = entitiesItr.next();
|
||||||
|
try {
|
||||||
|
// check ACLs
|
||||||
|
if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
|
||||||
|
entitiesItr.remove();
|
||||||
|
} else {
|
||||||
|
// clean up system data
|
||||||
|
if (modified) {
|
||||||
|
entity.setPrimaryFilters(null);
|
||||||
|
} else {
|
||||||
|
cleanupOwnerInfo(entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error("Error when verifying access for user " + callerUGI
|
||||||
|
+ " on the events of the timeline entity "
|
||||||
|
+ new EntityIdentifier(entity.getEntityId(),
|
||||||
|
entity.getEntityType()), e);
|
||||||
|
entitiesItr.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (entities == null) {
|
||||||
|
return new TimelineEntities();
|
||||||
|
}
|
||||||
|
return entities;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the single timeline entity that the given user has access to. The
|
||||||
|
* meaning of each argument has been documented with
|
||||||
|
* {@link TimelineReader#getEntity}.
|
||||||
|
*
|
||||||
|
* @see TimelineReader#getEntity
|
||||||
|
*/
|
||||||
|
public TimelineEntity getEntity(
|
||||||
|
String entityType,
|
||||||
|
String entityId,
|
||||||
|
EnumSet<Field> fields,
|
||||||
|
UserGroupInformation callerUGI) throws YarnException, IOException {
|
||||||
|
TimelineEntity entity = null;
|
||||||
|
boolean modified = extendFields(fields);
|
||||||
|
entity =
|
||||||
|
store.getEntity(entityId, entityType, fields);
|
||||||
|
if (entity != null) {
|
||||||
|
// check ACLs
|
||||||
|
if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
|
||||||
|
entity = null;
|
||||||
|
} else {
|
||||||
|
// clean up the system data
|
||||||
|
if (modified) {
|
||||||
|
entity.setPrimaryFilters(null);
|
||||||
|
} else {
|
||||||
|
cleanupOwnerInfo(entity);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return entity;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the events whose entities the given user has access to. The meaning of
|
||||||
|
* each argument has been documented with
|
||||||
|
* {@link TimelineReader#getEntityTimelines}.
|
||||||
|
*
|
||||||
|
* @see TimelineReader#getEntityTimelines
|
||||||
|
*/
|
||||||
|
public TimelineEvents getEvents(
|
||||||
|
String entityType,
|
||||||
|
SortedSet<String> entityIds,
|
||||||
|
SortedSet<String> eventTypes,
|
||||||
|
Long windowStart,
|
||||||
|
Long windowEnd,
|
||||||
|
Long limit,
|
||||||
|
UserGroupInformation callerUGI) throws YarnException, IOException {
|
||||||
|
TimelineEvents events = null;
|
||||||
|
events = store.getEntityTimelines(
|
||||||
|
entityType,
|
||||||
|
entityIds,
|
||||||
|
limit,
|
||||||
|
windowStart,
|
||||||
|
windowEnd,
|
||||||
|
eventTypes);
|
||||||
|
if (events != null) {
|
||||||
|
Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
|
||||||
|
events.getAllEvents().iterator();
|
||||||
|
while (eventsItr.hasNext()) {
|
||||||
|
TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
|
||||||
|
try {
|
||||||
|
TimelineEntity entity = store.getEntity(
|
||||||
|
eventsOfOneEntity.getEntityId(),
|
||||||
|
eventsOfOneEntity.getEntityType(),
|
||||||
|
EnumSet.of(Field.PRIMARY_FILTERS));
|
||||||
|
// check ACLs
|
||||||
|
if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
|
||||||
|
eventsItr.remove();
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.error("Error when verifying access for user " + callerUGI
|
||||||
|
+ " on the events of the timeline entity "
|
||||||
|
+ new EntityIdentifier(eventsOfOneEntity.getEntityId(),
|
||||||
|
eventsOfOneEntity.getEntityType()), e);
|
||||||
|
eventsItr.remove();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (events == null) {
|
||||||
|
return new TimelineEvents();
|
||||||
|
}
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Store the timeline entities into the store and set the owner of them to the
|
||||||
|
* given user.
|
||||||
|
*/
|
||||||
|
public TimelinePutResponse postEntities(
|
||||||
|
TimelineEntities entities,
|
||||||
|
UserGroupInformation callerUGI) throws YarnException, IOException {
|
||||||
|
if (entities == null) {
|
||||||
|
return new TimelinePutResponse();
|
||||||
|
}
|
||||||
|
List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
|
||||||
|
TimelineEntities entitiesToPut = new TimelineEntities();
|
||||||
|
List<TimelinePutResponse.TimelinePutError> errors =
|
||||||
|
new ArrayList<TimelinePutResponse.TimelinePutError>();
|
||||||
|
for (TimelineEntity entity : entities.getEntities()) {
|
||||||
|
EntityIdentifier entityID =
|
||||||
|
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
|
||||||
|
|
||||||
|
// check if there is existing entity
|
||||||
|
TimelineEntity existingEntity = null;
|
||||||
|
try {
|
||||||
|
existingEntity =
|
||||||
|
store.getEntity(entityID.getId(), entityID.getType(),
|
||||||
|
EnumSet.of(Field.PRIMARY_FILTERS));
|
||||||
|
if (existingEntity != null
|
||||||
|
&& !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
|
||||||
|
throw new YarnException("The timeline entity " + entityID
|
||||||
|
+ " was not put by " + callerUGI + " before");
|
||||||
|
}
|
||||||
|
} catch (Exception e) {
|
||||||
|
// Skip the entity which already exists and was put by others
|
||||||
|
LOG.error("Skip the timeline entity: " + entityID + ", because "
|
||||||
|
+ e.getMessage());
|
||||||
|
TimelinePutResponse.TimelinePutError error =
|
||||||
|
new TimelinePutResponse.TimelinePutError();
|
||||||
|
error.setEntityId(entityID.getId());
|
||||||
|
error.setEntityType(entityID.getType());
|
||||||
|
error.setErrorCode(
|
||||||
|
TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
|
||||||
|
errors.add(error);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// inject owner information for the access check if this is the first
|
||||||
|
// time to post the entity, in case it's the admin who is updating
|
||||||
|
// the timeline data.
|
||||||
|
try {
|
||||||
|
if (existingEntity == null) {
|
||||||
|
injectOwnerInfo(entity, callerUGI.getShortUserName());
|
||||||
|
}
|
||||||
|
} catch (YarnException e) {
|
||||||
|
// Skip the entity which messes up the primary filter and record the
|
||||||
|
// error
|
||||||
|
LOG.error("Skip the timeline entity: " + entityID + ", because "
|
||||||
|
+ e.getMessage());
|
||||||
|
TimelinePutResponse.TimelinePutError error =
|
||||||
|
new TimelinePutResponse.TimelinePutError();
|
||||||
|
error.setEntityId(entityID.getId());
|
||||||
|
error.setEntityType(entityID.getType());
|
||||||
|
error.setErrorCode(
|
||||||
|
TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
|
||||||
|
errors.add(error);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
entityIDs.add(entityID);
|
||||||
|
entitiesToPut.addEntity(entity);
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
|
||||||
|
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
|
||||||
|
}
|
||||||
|
TimelinePutResponse response = store.put(entitiesToPut);
|
||||||
|
// add the errors of timeline system filter key conflict
|
||||||
|
response.addErrors(errors);
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean extendFields(EnumSet<Field> fieldEnums) {
|
||||||
|
boolean modified = false;
|
||||||
|
if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
|
||||||
|
fieldEnums.add(Field.PRIMARY_FILTERS);
|
||||||
|
modified = true;
|
||||||
|
}
|
||||||
|
return modified;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void injectOwnerInfo(TimelineEntity timelineEntity,
|
||||||
|
String owner) throws YarnException {
|
||||||
|
if (timelineEntity.getPrimaryFilters() != null &&
|
||||||
|
timelineEntity.getPrimaryFilters().containsKey(
|
||||||
|
TimelineStore.SystemFilter.ENTITY_OWNER.toString())) {
|
||||||
|
throw new YarnException(
|
||||||
|
"User should not use the timeline system filter key: "
|
||||||
|
+ TimelineStore.SystemFilter.ENTITY_OWNER);
|
||||||
|
}
|
||||||
|
timelineEntity.addPrimaryFilter(
|
||||||
|
TimelineStore.SystemFilter.ENTITY_OWNER
|
||||||
|
.toString(), owner);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
|
||||||
|
if (timelineEntity.getPrimaryFilters() != null) {
|
||||||
|
timelineEntity.getPrimaryFilters().remove(
|
||||||
|
TimelineStore.SystemFilter.ENTITY_OWNER.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -18,14 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timeline.webapp;
|
package org.apache.hadoop.yarn.server.timeline.webapp;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.util.StringHelper.CSV_JOINER;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
|
@ -58,14 +54,11 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
||||||
import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
|
import org.apache.hadoop.yarn.server.timeline.EntityIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
|
||||||
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
|
import org.apache.hadoop.yarn.server.timeline.NameValuePair;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
|
||||||
import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
|
|
||||||
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
import org.apache.hadoop.yarn.webapp.BadRequestException;
|
||||||
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
import org.apache.hadoop.yarn.webapp.ForbiddenException;
|
||||||
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
import org.apache.hadoop.yarn.webapp.NotFoundException;
|
||||||
|
@ -80,14 +73,11 @@ public class TimelineWebServices {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
|
private static final Log LOG = LogFactory.getLog(TimelineWebServices.class);
|
||||||
|
|
||||||
private TimelineStore store;
|
private TimelineDataManager timelineDataManager;
|
||||||
private TimelineACLsManager timelineACLsManager;
|
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
public TimelineWebServices(TimelineStore store,
|
public TimelineWebServices(TimelineDataManager timelineDataManager) {
|
||||||
TimelineACLsManager timelineACLsManager) {
|
this.timelineDataManager = timelineDataManager;
|
||||||
this.store = store;
|
|
||||||
this.timelineACLsManager = timelineACLsManager;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@XmlRootElement(name = "about")
|
@XmlRootElement(name = "about")
|
||||||
|
@ -148,61 +138,28 @@ public class TimelineWebServices {
|
||||||
@QueryParam("limit") String limit,
|
@QueryParam("limit") String limit,
|
||||||
@QueryParam("fields") String fields) {
|
@QueryParam("fields") String fields) {
|
||||||
init(res);
|
init(res);
|
||||||
TimelineEntities entities = null;
|
|
||||||
try {
|
try {
|
||||||
EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
|
return timelineDataManager.getEntities(
|
||||||
boolean modified = extendFields(fieldEnums);
|
|
||||||
UserGroupInformation callerUGI = getUser(req);
|
|
||||||
entities = store.getEntities(
|
|
||||||
parseStr(entityType),
|
parseStr(entityType),
|
||||||
parseLongStr(limit),
|
parsePairStr(primaryFilter, ":"),
|
||||||
|
parsePairsStr(secondaryFilter, ",", ":"),
|
||||||
parseLongStr(windowStart),
|
parseLongStr(windowStart),
|
||||||
parseLongStr(windowEnd),
|
parseLongStr(windowEnd),
|
||||||
parseStr(fromId),
|
parseStr(fromId),
|
||||||
parseLongStr(fromTs),
|
parseLongStr(fromTs),
|
||||||
parsePairStr(primaryFilter, ":"),
|
parseLongStr(limit),
|
||||||
parsePairsStr(secondaryFilter, ",", ":"),
|
parseFieldsStr(fields, ","),
|
||||||
fieldEnums);
|
getUser(req));
|
||||||
if (entities != null) {
|
|
||||||
Iterator<TimelineEntity> entitiesItr =
|
|
||||||
entities.getEntities().iterator();
|
|
||||||
while (entitiesItr.hasNext()) {
|
|
||||||
TimelineEntity entity = entitiesItr.next();
|
|
||||||
try {
|
|
||||||
// check ACLs
|
|
||||||
if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
|
|
||||||
entitiesItr.remove();
|
|
||||||
} else {
|
|
||||||
// clean up system data
|
|
||||||
if (modified) {
|
|
||||||
entity.setPrimaryFilters(null);
|
|
||||||
} else {
|
|
||||||
cleanupOwnerInfo(entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (YarnException e) {
|
|
||||||
LOG.error("Error when verifying access for user " + callerUGI
|
|
||||||
+ " on the events of the timeline entity "
|
|
||||||
+ new EntityIdentifier(entity.getEntityId(),
|
|
||||||
entity.getEntityType()), e);
|
|
||||||
entitiesItr.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
throw new BadRequestException(
|
throw new BadRequestException(
|
||||||
"windowStart, windowEnd or limit is not a numeric value.");
|
"windowStart, windowEnd or limit is not a numeric value.");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw new BadRequestException("requested invalid field.");
|
throw new BadRequestException("requested invalid field.");
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error getting entities", e);
|
LOG.error("Error getting entities", e);
|
||||||
throw new WebApplicationException(e,
|
throw new WebApplicationException(e,
|
||||||
Response.Status.INTERNAL_SERVER_ERROR);
|
Response.Status.INTERNAL_SERVER_ERROR);
|
||||||
}
|
}
|
||||||
if (entities == null) {
|
|
||||||
return new TimelineEntities();
|
|
||||||
}
|
|
||||||
return entities;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -220,33 +177,15 @@ public class TimelineWebServices {
|
||||||
init(res);
|
init(res);
|
||||||
TimelineEntity entity = null;
|
TimelineEntity entity = null;
|
||||||
try {
|
try {
|
||||||
EnumSet<Field> fieldEnums = parseFieldsStr(fields, ",");
|
entity = timelineDataManager.getEntity(
|
||||||
boolean modified = extendFields(fieldEnums);
|
parseStr(entityType),
|
||||||
entity =
|
parseStr(entityId),
|
||||||
store.getEntity(parseStr(entityId), parseStr(entityType),
|
parseFieldsStr(fields, ","),
|
||||||
fieldEnums);
|
getUser(req));
|
||||||
if (entity != null) {
|
|
||||||
// check ACLs
|
|
||||||
UserGroupInformation callerUGI = getUser(req);
|
|
||||||
if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
|
|
||||||
entity = null;
|
|
||||||
} else {
|
|
||||||
// clean up the system data
|
|
||||||
if (modified) {
|
|
||||||
entity.setPrimaryFilters(null);
|
|
||||||
} else {
|
|
||||||
cleanupOwnerInfo(entity);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
throw new BadRequestException(
|
throw new BadRequestException(
|
||||||
"requested invalid field.");
|
"requested invalid field.");
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error getting entity", e);
|
|
||||||
throw new WebApplicationException(e,
|
|
||||||
Response.Status.INTERNAL_SERVER_ERROR);
|
|
||||||
} catch (YarnException e) {
|
|
||||||
LOG.error("Error getting entity", e);
|
LOG.error("Error getting entity", e);
|
||||||
throw new WebApplicationException(e,
|
throw new WebApplicationException(e,
|
||||||
Response.Status.INTERNAL_SERVER_ERROR);
|
Response.Status.INTERNAL_SERVER_ERROR);
|
||||||
|
@ -275,51 +214,23 @@ public class TimelineWebServices {
|
||||||
@QueryParam("windowEnd") String windowEnd,
|
@QueryParam("windowEnd") String windowEnd,
|
||||||
@QueryParam("limit") String limit) {
|
@QueryParam("limit") String limit) {
|
||||||
init(res);
|
init(res);
|
||||||
TimelineEvents events = null;
|
|
||||||
try {
|
try {
|
||||||
UserGroupInformation callerUGI = getUser(req);
|
return timelineDataManager.getEvents(
|
||||||
events = store.getEntityTimelines(
|
|
||||||
parseStr(entityType),
|
parseStr(entityType),
|
||||||
parseArrayStr(entityId, ","),
|
parseArrayStr(entityId, ","),
|
||||||
parseLongStr(limit),
|
parseArrayStr(eventType, ","),
|
||||||
parseLongStr(windowStart),
|
parseLongStr(windowStart),
|
||||||
parseLongStr(windowEnd),
|
parseLongStr(windowEnd),
|
||||||
parseArrayStr(eventType, ","));
|
parseLongStr(limit),
|
||||||
if (events != null) {
|
getUser(req));
|
||||||
Iterator<TimelineEvents.EventsOfOneEntity> eventsItr =
|
|
||||||
events.getAllEvents().iterator();
|
|
||||||
while (eventsItr.hasNext()) {
|
|
||||||
TimelineEvents.EventsOfOneEntity eventsOfOneEntity = eventsItr.next();
|
|
||||||
try {
|
|
||||||
TimelineEntity entity = store.getEntity(
|
|
||||||
eventsOfOneEntity.getEntityId(),
|
|
||||||
eventsOfOneEntity.getEntityType(),
|
|
||||||
EnumSet.of(Field.PRIMARY_FILTERS));
|
|
||||||
// check ACLs
|
|
||||||
if (!timelineACLsManager.checkAccess(callerUGI, entity)) {
|
|
||||||
eventsItr.remove();
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.error("Error when verifying access for user " + callerUGI
|
|
||||||
+ " on the events of the timeline entity "
|
|
||||||
+ new EntityIdentifier(eventsOfOneEntity.getEntityId(),
|
|
||||||
eventsOfOneEntity.getEntityType()), e);
|
|
||||||
eventsItr.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
throw new BadRequestException(
|
throw new BadRequestException(
|
||||||
"windowStart, windowEnd or limit is not a numeric value.");
|
"windowStart, windowEnd or limit is not a numeric value.");
|
||||||
} catch (IOException e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Error getting entity timelines", e);
|
LOG.error("Error getting entity timelines", e);
|
||||||
throw new WebApplicationException(e,
|
throw new WebApplicationException(e,
|
||||||
Response.Status.INTERNAL_SERVER_ERROR);
|
Response.Status.INTERNAL_SERVER_ERROR);
|
||||||
}
|
}
|
||||||
if (events == null) {
|
|
||||||
return new TimelineEvents();
|
|
||||||
}
|
|
||||||
return events;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -333,9 +244,6 @@ public class TimelineWebServices {
|
||||||
@Context HttpServletResponse res,
|
@Context HttpServletResponse res,
|
||||||
TimelineEntities entities) {
|
TimelineEntities entities) {
|
||||||
init(res);
|
init(res);
|
||||||
if (entities == null) {
|
|
||||||
return new TimelinePutResponse();
|
|
||||||
}
|
|
||||||
UserGroupInformation callerUGI = getUser(req);
|
UserGroupInformation callerUGI = getUser(req);
|
||||||
if (callerUGI == null) {
|
if (callerUGI == null) {
|
||||||
String msg = "The owner of the posted timeline entities is not set";
|
String msg = "The owner of the posted timeline entities is not set";
|
||||||
|
@ -343,76 +251,8 @@ public class TimelineWebServices {
|
||||||
throw new ForbiddenException(msg);
|
throw new ForbiddenException(msg);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
List<EntityIdentifier> entityIDs = new ArrayList<EntityIdentifier>();
|
return timelineDataManager.postEntities(entities, callerUGI);
|
||||||
TimelineEntities entitiesToPut = new TimelineEntities();
|
} catch (Exception e) {
|
||||||
List<TimelinePutResponse.TimelinePutError> errors =
|
|
||||||
new ArrayList<TimelinePutResponse.TimelinePutError>();
|
|
||||||
for (TimelineEntity entity : entities.getEntities()) {
|
|
||||||
EntityIdentifier entityID =
|
|
||||||
new EntityIdentifier(entity.getEntityId(), entity.getEntityType());
|
|
||||||
|
|
||||||
// check if there is existing entity
|
|
||||||
TimelineEntity existingEntity = null;
|
|
||||||
try {
|
|
||||||
existingEntity =
|
|
||||||
store.getEntity(entityID.getId(), entityID.getType(),
|
|
||||||
EnumSet.of(Field.PRIMARY_FILTERS));
|
|
||||||
if (existingEntity != null
|
|
||||||
&& !timelineACLsManager.checkAccess(callerUGI, existingEntity)) {
|
|
||||||
throw new YarnException("The timeline entity " + entityID
|
|
||||||
+ " was not put by " + callerUGI + " before");
|
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
// Skip the entity which already exists and was put by others
|
|
||||||
LOG.warn("Skip the timeline entity: " + entityID + ", because "
|
|
||||||
+ e.getMessage());
|
|
||||||
TimelinePutResponse.TimelinePutError error =
|
|
||||||
new TimelinePutResponse.TimelinePutError();
|
|
||||||
error.setEntityId(entityID.getId());
|
|
||||||
error.setEntityType(entityID.getType());
|
|
||||||
error.setErrorCode(
|
|
||||||
TimelinePutResponse.TimelinePutError.ACCESS_DENIED);
|
|
||||||
errors.add(error);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// inject owner information for the access check if this is the first
|
|
||||||
// time to post the entity, in case it's the admin who is updating
|
|
||||||
// the timeline data.
|
|
||||||
try {
|
|
||||||
if (existingEntity == null) {
|
|
||||||
injectOwnerInfo(entity, callerUGI.getShortUserName());
|
|
||||||
}
|
|
||||||
} catch (YarnException e) {
|
|
||||||
// Skip the entity which messes up the primary filter and record the
|
|
||||||
// error
|
|
||||||
LOG.warn("Skip the timeline entity: " + entityID + ", because "
|
|
||||||
+ e.getMessage());
|
|
||||||
TimelinePutResponse.TimelinePutError error =
|
|
||||||
new TimelinePutResponse.TimelinePutError();
|
|
||||||
error.setEntityId(entityID.getId());
|
|
||||||
error.setEntityType(entityID.getType());
|
|
||||||
error.setErrorCode(
|
|
||||||
TimelinePutResponse.TimelinePutError.SYSTEM_FILTER_CONFLICT);
|
|
||||||
errors.add(error);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
entityIDs.add(entityID);
|
|
||||||
entitiesToPut.addEntity(entity);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Storing the entity " + entityID + ", JSON-style content: "
|
|
||||||
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Storing entities: " + CSV_JOINER.join(entityIDs));
|
|
||||||
}
|
|
||||||
TimelinePutResponse response = store.put(entitiesToPut);
|
|
||||||
// add the errors of timeline system filter key conflict
|
|
||||||
response.addErrors(errors);
|
|
||||||
return response;
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Error putting entities", e);
|
LOG.error("Error putting entities", e);
|
||||||
throw new WebApplicationException(e,
|
throw new WebApplicationException(e,
|
||||||
Response.Status.INTERNAL_SERVER_ERROR);
|
Response.Status.INTERNAL_SERVER_ERROR);
|
||||||
|
@ -423,6 +263,15 @@ public class TimelineWebServices {
|
||||||
response.setContentType(null);
|
response.setContentType(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static UserGroupInformation getUser(HttpServletRequest req) {
|
||||||
|
String remoteUser = req.getRemoteUser();
|
||||||
|
UserGroupInformation callerUGI = null;
|
||||||
|
if (remoteUser != null) {
|
||||||
|
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
||||||
|
}
|
||||||
|
return callerUGI;
|
||||||
|
}
|
||||||
|
|
||||||
private static SortedSet<String> parseArrayStr(String str, String delimiter) {
|
private static SortedSet<String> parseArrayStr(String str, String delimiter) {
|
||||||
if (str == null) {
|
if (str == null) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -495,14 +344,6 @@ public class TimelineWebServices {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static boolean extendFields(EnumSet<Field> fieldEnums) {
|
|
||||||
boolean modified = false;
|
|
||||||
if (fieldEnums != null && !fieldEnums.contains(Field.PRIMARY_FILTERS)) {
|
|
||||||
fieldEnums.add(Field.PRIMARY_FILTERS);
|
|
||||||
modified = true;
|
|
||||||
}
|
|
||||||
return modified;
|
|
||||||
}
|
|
||||||
private static Long parseLongStr(String str) {
|
private static Long parseLongStr(String str) {
|
||||||
return str == null ? null : Long.parseLong(str.trim());
|
return str == null ? null : Long.parseLong(str.trim());
|
||||||
}
|
}
|
||||||
|
@ -511,34 +352,4 @@ public class TimelineWebServices {
|
||||||
return str == null ? null : str.trim();
|
return str == null ? null : str.trim();
|
||||||
}
|
}
|
||||||
|
|
||||||
private static UserGroupInformation getUser(HttpServletRequest req) {
|
|
||||||
String remoteUser = req.getRemoteUser();
|
|
||||||
UserGroupInformation callerUGI = null;
|
|
||||||
if (remoteUser != null) {
|
|
||||||
callerUGI = UserGroupInformation.createRemoteUser(remoteUser);
|
|
||||||
}
|
|
||||||
return callerUGI;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void injectOwnerInfo(TimelineEntity timelineEntity,
|
|
||||||
String owner) throws YarnException {
|
|
||||||
if (timelineEntity.getPrimaryFilters() != null &&
|
|
||||||
timelineEntity.getPrimaryFilters().containsKey(
|
|
||||||
TimelineStore.SystemFilter.ENTITY_OWNER.toString())) {
|
|
||||||
throw new YarnException(
|
|
||||||
"User should not use the timeline system filter key: "
|
|
||||||
+ TimelineStore.SystemFilter.ENTITY_OWNER);
|
|
||||||
}
|
|
||||||
timelineEntity.addPrimaryFilter(
|
|
||||||
TimelineStore.SystemFilter.ENTITY_OWNER
|
|
||||||
.toString(), owner);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void cleanupOwnerInfo(TimelineEntity timelineEntity) {
|
|
||||||
if (timelineEntity.getPrimaryFilters() != null) {
|
|
||||||
timelineEntity.getPrimaryFilters().remove(
|
|
||||||
TimelineStore.SystemFilter.ENTITY_OWNER.toString());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,7 +69,7 @@ public class TestApplicationHistoryClientService extends
|
||||||
historyServer.init(config);
|
historyServer.init(config);
|
||||||
historyServer.start();
|
historyServer.start();
|
||||||
store =
|
store =
|
||||||
((ApplicationHistoryManagerImpl) historyServer.getApplicationHistory())
|
((ApplicationHistoryManagerImpl) historyServer.getApplicationHistoryManager())
|
||||||
.getHistoryStore();
|
.getHistoryStore();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelineP
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.security.AdminACLsManager;
|
import org.apache.hadoop.yarn.security.AdminACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TestMemoryTimelineStore;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
import org.apache.hadoop.yarn.server.timeline.TimelineStore;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilter;
|
||||||
|
@ -89,14 +90,15 @@ public class TestTimelineWebServices extends JerseyTest {
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
Assert.fail();
|
Assert.fail();
|
||||||
}
|
}
|
||||||
bind(TimelineStore.class).toInstance(store);
|
|
||||||
Configuration conf = new YarnConfiguration();
|
Configuration conf = new YarnConfiguration();
|
||||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
|
||||||
timelineACLsManager = new TimelineACLsManager(conf);
|
timelineACLsManager = new TimelineACLsManager(conf);
|
||||||
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
|
||||||
adminACLsManager = new AdminACLsManager(conf);
|
adminACLsManager = new AdminACLsManager(conf);
|
||||||
bind(TimelineACLsManager.class).toInstance(timelineACLsManager);
|
TimelineDataManager timelineDataManager =
|
||||||
|
new TimelineDataManager(store, timelineACLsManager);
|
||||||
|
bind(TimelineDataManager.class).toInstance(timelineDataManager);
|
||||||
serve("/*").with(GuiceContainer.class);
|
serve("/*").with(GuiceContainer.class);
|
||||||
TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter();
|
TimelineAuthenticationFilter taFilter = new TimelineAuthenticationFilter();
|
||||||
FilterConfig filterConfig = mock(FilterConfig.class);
|
FilterConfig filterConfig = mock(FilterConfig.class);
|
||||||
|
|
Loading…
Reference in New Issue