YARN-3267. Timelineserver applies the ACL rules after applying the limit on the number of records (Chang Li via jeagles)

(cherry picked from commit 8180e676ab)
This commit is contained in:
Jonathan Eagles 2015-03-13 12:04:30 -05:00
parent 146abadb96
commit 44aedad5dd
11 changed files with 126 additions and 65 deletions

View File

@ -464,7 +464,7 @@ public void testTimelineEventHandling() throws Exception {
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000),
currentTime - 10));
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null);
null, null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
TimelineEntity tEntity = entities.getEntities().get(0);
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
@ -480,7 +480,7 @@ public void testTimelineEventHandling() throws Exception {
new HashMap<JobACL, AccessControlList>(), "default"),
currentTime + 10));
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null);
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
@ -498,7 +498,7 @@ public void testTimelineEventHandling() throws Exception {
new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
currentTime - 20));
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null);
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
@ -520,7 +520,7 @@ public void testTimelineEventHandling() throws Exception {
new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
0, new Counters(), new Counters(), new Counters()), currentTime));
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null);
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
@ -546,7 +546,7 @@ public void testTimelineEventHandling() throws Exception {
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20));
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
null, null, null, null, null);
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
@ -575,7 +575,7 @@ public void testTimelineEventHandling() throws Exception {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
null, null, null, null, null);
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
@ -588,7 +588,7 @@ public void testTimelineEventHandling() throws Exception {
handleEvent(jheh, new JobHistoryEvent(t.jobId,
new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
null, null, null, null, null);
null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());

View File

@ -55,7 +55,7 @@ public void testMRTimelineEventHandling() throws Exception {
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null);
null, null, null, null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
TimelineEntity tEntity = entities.getEntities().get(0);
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
@ -70,7 +70,7 @@ public void testMRTimelineEventHandling() throws Exception {
Assert.assertEquals(JobStatus.FAILED,
job.getJobStatus().getState().getValue());
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
null, null, null);
null, null, null, null);
Assert.assertEquals(2, entities.getEntities().size());
tEntity = entities.getEntities().get(0);
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
@ -109,7 +109,7 @@ public void testMapreduceJobTimelineServiceEnabled()
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null);
null, null, null, null, null, null, null);
Assert.assertEquals(0, entities.getEntities().size());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
@ -117,7 +117,7 @@ public void testMapreduceJobTimelineServiceEnabled()
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
null, null, null);
null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
TimelineEntity tEntity = entities.getEntities().get(0);
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
@ -148,7 +148,7 @@ public void testMapreduceJobTimelineServiceEnabled()
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
null, null, null, null, null, null);
null, null, null, null, null, null, null);
Assert.assertEquals(0, entities.getEntities().size());
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
@ -156,7 +156,7 @@ public void testMapreduceJobTimelineServiceEnabled()
Assert.assertEquals(JobStatus.SUCCEEDED,
job.getJobStatus().getState().getValue());
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
null, null, null);
null, null, null, null);
Assert.assertEquals(1, entities.getEntities().size());
TimelineEntity tEntity = entities.getEntities().get(0);
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());

View File

@ -715,6 +715,9 @@ Release 2.7.0 - UNRELEASED
YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating
running logs of application when rolling is enabled. (Xuan Gong via vinodkv)
YARN-3267. Timelineserver applies the ACL rules after applying the limit on
the number of records (Chang Li via jeagles)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -243,7 +243,7 @@ public void run() {
.getApplicationHistoryServer()
.getTimelineStore()
.getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
null, null, null, null, null, null, null, null);
null, null, null, null, null, null, null, null, null);
Assert.assertNotNull(entitiesAttempts);
Assert.assertEquals(1, entitiesAttempts.getEntities().size());
Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
@ -261,7 +261,7 @@ public void run() {
.getApplicationHistoryServer()
.getTimelineStore()
.getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
null, null, null, null, null, null, null);
null, null, null, null, null, null, null, null);
Assert.assertNotNull(entities);
Assert.assertEquals(2, entities.getEntities().size());
Assert.assertEquals(entities.getEntities().get(0).getEntityType()

View File

@ -40,6 +40,7 @@
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
@ -56,6 +57,7 @@
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
@ -549,12 +551,13 @@ public int compare(byte[] o1, byte[] o2) {
public TimelineEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields) throws IOException {
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
if (primaryFilter == null) {
// if no primary filter is specified, prefix the lookup with
// ENTITY_ENTRY_PREFIX
return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields);
windowStart, windowEnd, fromId, fromTs, secondaryFilters,
fields, checkAcl);
} else {
// if a primary filter is specified, prefix the lookup with
// INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
@ -564,7 +567,7 @@ public TimelineEntities getEntities(String entityType,
.add(GenericObjectMapper.write(primaryFilter.getValue()), true)
.add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
fromId, fromTs, secondaryFilters, fields);
fromId, fromTs, secondaryFilters, fields, checkAcl);
}
}
@ -586,7 +589,7 @@ public TimelineEntities getEntities(String entityType,
private TimelineEntities getEntityByTime(byte[] base,
String entityType, Long limit, Long starttime, Long endtime,
String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields) throws IOException {
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
LeveldbIterator iterator = null;
try {
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
@ -683,7 +686,12 @@ private TimelineEntities getEntityByTime(byte[] base,
}
}
if (filterPassed) {
entities.addEntity(entity);
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
entities.addEntity(entity);
}
}
}
return entities;

View File

@ -47,6 +47,9 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
/**
* In-memory implementation of {@link TimelineStore}. This
@ -79,7 +82,7 @@ public MemoryTimelineStore() {
public synchronized TimelineEntities getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fields) {
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
if (limit == null) {
limit = DEFAULT_LIMIT;
}
@ -146,7 +149,12 @@ public synchronized TimelineEntities getEntities(String entityType, Long limit,
continue;
}
}
entitiesSelected.add(entity);
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
if (checkAcl == null || checkAcl.check(entity)) {
entitiesSelected.add(entity);
}
}
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
for (TimelineEntity entitySelected : entitiesSelected) {

View File

@ -90,6 +90,31 @@ protected void serviceInit(Configuration conf) throws Exception {
super.serviceInit(conf);
}
public interface CheckAcl {
boolean check(TimelineEntity entity) throws IOException;
}
class CheckAclImpl implements CheckAcl {
final UserGroupInformation ugi;
public CheckAclImpl(UserGroupInformation callerUGI) {
ugi = callerUGI;
}
public boolean check(TimelineEntity entity) throws IOException {
try{
return timelineACLsManager.checkAccess(
ugi, ApplicationAccessType.VIEW_APP, entity);
} catch (YarnException e) {
LOG.info("Error when verifying access for user " + ugi
+ " on the events of the timeline entity "
+ new EntityIdentifier(entity.getEntityId(),
entity.getEntityType()), e);
return false;
}
}
}
/**
* Get the timeline entities that the given user have access to. The meaning
* of each argument has been documented with
@ -118,28 +143,9 @@ public TimelineEntities getEntities(
fromTs,
primaryFilter,
secondaryFilter,
fields);
if (entities != null) {
Iterator<TimelineEntity> entitiesItr =
entities.getEntities().iterator();
while (entitiesItr.hasNext()) {
TimelineEntity entity = entitiesItr.next();
addDefaultDomainIdIfAbsent(entity);
try {
// check ACLs
if (!timelineACLsManager.checkAccess(
callerUGI, ApplicationAccessType.VIEW_APP, entity)) {
entitiesItr.remove();
}
} catch (YarnException e) {
LOG.error("Error when verifying access for user " + callerUGI
+ " on the events of the timeline entity "
+ new EntityIdentifier(entity.getEntityId(),
entity.getEntityType()), e);
entitiesItr.remove();
}
}
}
fields,
new CheckAclImpl(callerUGI));
if (entities == null) {
return new TimelineEntities();
}

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
/**
* This interface is for retrieving timeline information.
@ -106,7 +107,7 @@ enum Field {
TimelineEntities getEntities(String entityType,
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
EnumSet<Field> fieldsToRetrieve) throws IOException;
EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException;
/**
* This method retrieves the entity information for a given entity.

View File

@ -164,13 +164,13 @@ private boolean deleteNextEntity(String entityType, byte[] ts)
@Test
public void testGetEntityTypes() throws IOException {
List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
assertEquals(6, entityTypes.size());
assertEquals("OLD_ENTITY_TYPE_1", entityTypes.get(0));
assertEquals(entityType1, entityTypes.get(1));
assertEquals(entityType2, entityTypes.get(2));
assertEquals(entityType4, entityTypes.get(3));
assertEquals(entityType5, entityTypes.get(4));
assertEquals(entityType7, entityTypes.get(5));
assertEquals(7, entityTypes.size());
assertEquals("ACL_ENTITY_TYPE_1", entityTypes.get(0));
assertEquals("OLD_ENTITY_TYPE_1", entityTypes.get(1));
assertEquals(entityType1, entityTypes.get(2));
assertEquals(entityType2, entityTypes.get(3));
assertEquals(entityType4, entityTypes.get(4));
assertEquals(entityType5, entityTypes.get(5));
}
@Test
@ -201,7 +201,7 @@ public void testDeleteEntities() throws IOException, InterruptedException {
((LeveldbTimelineStore)store).discardOldEntities(-123l);
assertEquals(2, getEntities("type_1").size());
assertEquals(0, getEntities("type_2").size());
assertEquals(5, ((LeveldbTimelineStore)store).getEntityTypes().size());
assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size());
((LeveldbTimelineStore)store).discardOldEntities(123l);
assertEquals(0, getEntities("type_1").size());

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AdminACLsManager;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.junit.After;
import org.junit.Assert;
@ -40,7 +41,8 @@ public class TestTimelineDataManager extends TimelineStoreTestUtils {
private FileContext fsContext;
private File fsPath;
private TimelineDataManager dataManaer;
private static TimelineACLsManager aclsManager;
private static AdminACLsManager adminACLsManager;
@Before
public void setup() throws Exception {
fsPath = new File("target", this.getClass().getSimpleName() +
@ -58,8 +60,12 @@ public void setup() throws Exception {
loadVerificationEntityData();
loadTestDomainData();
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
aclsManager = new TimelineACLsManager(conf);
dataManaer = new TimelineDataManager(store, aclsManager);
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
adminACLsManager = new AdminACLsManager(conf);
}
@After
@ -84,6 +90,22 @@ public void testGetOldEntityWithOutDomainId() throws Exception {
TimelineDataManager.DEFAULT_DOMAIN_ID, entity.getDomainId());
}
@Test
public void testGetEntitiesAclEnabled() throws Exception {
AdminACLsManager oldAdminACLsManager =
aclsManager.setAdminACLsManager(adminACLsManager);
try {
TimelineEntities entities = dataManaer.getEntities(
"ACL_ENTITY_TYPE_1", null, null, null, null, null, null, 1l, null,
UserGroupInformation.createUserForTesting("owner_1", new String[] {"group1"}));
Assert.assertEquals(1, entities.getEntities().size());
Assert.assertEquals("ACL_ENTITY_ID_11",
entities.getEntities().get(0).getEntityId());
} finally {
aclsManager.setAdminACLsManager(oldAdminACLsManager);
}
}
@Test
public void testGetOldEntitiesWithOutDomainId() throws Exception {
TimelineEntities entities = dataManaer.getEntities(

View File

@ -353,6 +353,19 @@ protected void loadTestDomainData() throws IOException {
domain3.setReaders("reader_user_4 reader_group_4");
domain3.setWriters("writer_user_4 writer_group_4");
store.put(domain3);
TimelineEntities entities = new TimelineEntities();
if (store instanceof LeveldbTimelineStore) {
LeveldbTimelineStore leveldb = (LeveldbTimelineStore) store;
entities.setEntities(Collections.singletonList(createEntity(
"ACL_ENTITY_ID_11", "ACL_ENTITY_TYPE_1", 63l, null, null, null, null,
"domain_id_4")));
leveldb.put(entities);
entities.setEntities(Collections.singletonList(createEntity(
"ACL_ENTITY_ID_22", "ACL_ENTITY_TYPE_1", 64l, null, null, null, null,
"domain_id_2")));
leveldb.put(entities);
}
}
public void testGetSingleEntity() throws IOException {
@ -419,66 +432,66 @@ public void testGetSingleEntity() throws IOException {
protected List<TimelineEntity> getEntities(String entityType)
throws IOException {
return store.getEntities(entityType, null, null, null, null, null,
null, null, null).getEntities();
null, null, null, null).getEntities();
}
protected List<TimelineEntity> getEntitiesWithPrimaryFilter(
String entityType, NameValuePair primaryFilter) throws IOException {
return store.getEntities(entityType, null, null, null, null, null,
primaryFilter, null, null).getEntities();
primaryFilter, null, null, null).getEntities();
}
protected List<TimelineEntity> getEntitiesFromId(String entityType,
String fromId) throws IOException {
return store.getEntities(entityType, null, null, null, fromId, null,
null, null, null).getEntities();
null, null, null, null).getEntities();
}
protected List<TimelineEntity> getEntitiesFromTs(String entityType,
long fromTs) throws IOException {
return store.getEntities(entityType, null, null, null, null, fromTs,
null, null, null).getEntities();
null, null, null, null).getEntities();
}
protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilter(
String entityType, NameValuePair primaryFilter, String fromId)
throws IOException {
return store.getEntities(entityType, null, null, null, fromId, null,
primaryFilter, null, null).getEntities();
primaryFilter, null, null, null).getEntities();
}
protected List<TimelineEntity> getEntitiesFromTsWithPrimaryFilter(
String entityType, NameValuePair primaryFilter, long fromTs)
throws IOException {
return store.getEntities(entityType, null, null, null, null, fromTs,
primaryFilter, null, null).getEntities();
primaryFilter, null, null, null).getEntities();
}
protected List<TimelineEntity> getEntitiesFromIdWithWindow(String entityType,
Long windowEnd, String fromId) throws IOException {
return store.getEntities(entityType, null, null, windowEnd, fromId, null,
null, null, null).getEntities();
null, null, null, null).getEntities();
}
protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilterAndWindow(
String entityType, Long windowEnd, String fromId,
NameValuePair primaryFilter) throws IOException {
return store.getEntities(entityType, null, null, windowEnd, fromId, null,
primaryFilter, null, null).getEntities();
primaryFilter, null, null, null).getEntities();
}
protected List<TimelineEntity> getEntitiesWithFilters(String entityType,
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
throws IOException {
return store.getEntities(entityType, null, null, null, null, null,
primaryFilter, secondaryFilters, null).getEntities();
primaryFilter, secondaryFilters, null, null).getEntities();
}
protected List<TimelineEntity> getEntities(String entityType, Long limit,
Long windowStart, Long windowEnd, NameValuePair primaryFilter,
EnumSet<Field> fields) throws IOException {
return store.getEntities(entityType, limit, windowStart, windowEnd, null,
null, primaryFilter, null, fields).getEntities();
null, primaryFilter, null, fields, null).getEntities();
}
public void testGetEntities() throws IOException {