YARN-3267. Timelineserver applies the ACL rules after applying the limit on the number of records (Chang Li via jeagles)
(cherry picked from commit 8180e676ab
)
This commit is contained in:
parent
0cd9eb9987
commit
e4d8dddb49
|
@ -464,7 +464,7 @@ public class TestJobHistoryEventHandler {
|
||||||
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000),
|
t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000),
|
||||||
currentTime - 10));
|
currentTime - 10));
|
||||||
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
||||||
null, null, null, null, null, null);
|
null, null, null, null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
TimelineEntity tEntity = entities.getEntities().get(0);
|
TimelineEntity tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
@ -480,7 +480,7 @@ public class TestJobHistoryEventHandler {
|
||||||
new HashMap<JobACL, AccessControlList>(), "default"),
|
new HashMap<JobACL, AccessControlList>(), "default"),
|
||||||
currentTime + 10));
|
currentTime + 10));
|
||||||
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
||||||
null, null, null, null, null);
|
null, null, null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
tEntity = entities.getEntities().get(0);
|
tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
@ -498,7 +498,7 @@ public class TestJobHistoryEventHandler {
|
||||||
new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
|
new JobQueueChangeEvent(TypeConverter.fromYarn(t.jobId), "q2"),
|
||||||
currentTime - 20));
|
currentTime - 20));
|
||||||
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
||||||
null, null, null, null, null);
|
null, null, null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
tEntity = entities.getEntities().get(0);
|
tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
@ -520,7 +520,7 @@ public class TestJobHistoryEventHandler {
|
||||||
new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
|
new JobFinishedEvent(TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0,
|
||||||
0, new Counters(), new Counters(), new Counters()), currentTime));
|
0, new Counters(), new Counters(), new Counters()), currentTime));
|
||||||
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
||||||
null, null, null, null, null);
|
null, null, null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
tEntity = entities.getEntities().get(0);
|
tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
@ -546,7 +546,7 @@ public class TestJobHistoryEventHandler {
|
||||||
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
|
new JobUnsuccessfulCompletionEvent(TypeConverter.fromYarn(t.jobId),
|
||||||
0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20));
|
0, 0, 0, JobStateInternal.KILLED.toString()), currentTime + 20));
|
||||||
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null,
|
||||||
null, null, null, null, null);
|
null, null, null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
tEntity = entities.getEntities().get(0);
|
tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
Assert.assertEquals(t.jobId.toString(), tEntity.getEntityId());
|
||||||
|
@ -575,7 +575,7 @@ public class TestJobHistoryEventHandler {
|
||||||
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
||||||
new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
|
new TaskStartedEvent(t.taskID, 0, TaskType.MAP, "")));
|
||||||
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
|
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
|
||||||
null, null, null, null, null);
|
null, null, null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
tEntity = entities.getEntities().get(0);
|
tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
||||||
|
@ -588,7 +588,7 @@ public class TestJobHistoryEventHandler {
|
||||||
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
handleEvent(jheh, new JobHistoryEvent(t.jobId,
|
||||||
new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
|
new TaskStartedEvent(t.taskID, 0, TaskType.REDUCE, "")));
|
||||||
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
|
entities = ts.getEntities("MAPREDUCE_TASK", null, null, null,
|
||||||
null, null, null, null, null);
|
null, null, null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
tEntity = entities.getEntities().get(0);
|
tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
Assert.assertEquals(t.taskID.toString(), tEntity.getEntityId());
|
||||||
|
|
|
@ -55,7 +55,7 @@ public class TestMRTimelineEventHandling {
|
||||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||||
job.getJobStatus().getState().getValue());
|
job.getJobStatus().getState().getValue());
|
||||||
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
||||||
null, null, null, null, null, null);
|
null, null, null, null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
TimelineEntity tEntity = entities.getEntities().get(0);
|
TimelineEntity tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
||||||
|
@ -70,7 +70,7 @@ public class TestMRTimelineEventHandling {
|
||||||
Assert.assertEquals(JobStatus.FAILED,
|
Assert.assertEquals(JobStatus.FAILED,
|
||||||
job.getJobStatus().getState().getValue());
|
job.getJobStatus().getState().getValue());
|
||||||
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
|
||||||
null, null, null);
|
null, null, null, null);
|
||||||
Assert.assertEquals(2, entities.getEntities().size());
|
Assert.assertEquals(2, entities.getEntities().size());
|
||||||
tEntity = entities.getEntities().get(0);
|
tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
||||||
|
@ -109,7 +109,7 @@ public class TestMRTimelineEventHandling {
|
||||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||||
job.getJobStatus().getState().getValue());
|
job.getJobStatus().getState().getValue());
|
||||||
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
||||||
null, null, null, null, null, null);
|
null, null, null, null, null, null, null);
|
||||||
Assert.assertEquals(0, entities.getEntities().size());
|
Assert.assertEquals(0, entities.getEntities().size());
|
||||||
|
|
||||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
|
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
|
||||||
|
@ -117,7 +117,7 @@ public class TestMRTimelineEventHandling {
|
||||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||||
job.getJobStatus().getState().getValue());
|
job.getJobStatus().getState().getValue());
|
||||||
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
|
||||||
null, null, null);
|
null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
TimelineEntity tEntity = entities.getEntities().get(0);
|
TimelineEntity tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
||||||
|
@ -148,7 +148,7 @@ public class TestMRTimelineEventHandling {
|
||||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||||
job.getJobStatus().getState().getValue());
|
job.getJobStatus().getState().getValue());
|
||||||
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
TimelineEntities entities = ts.getEntities("MAPREDUCE_JOB", null, null,
|
||||||
null, null, null, null, null, null);
|
null, null, null, null, null, null, null);
|
||||||
Assert.assertEquals(0, entities.getEntities().size());
|
Assert.assertEquals(0, entities.getEntities().size());
|
||||||
|
|
||||||
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
|
conf.setBoolean(MRJobConfig.MAPREDUCE_JOB_EMIT_TIMELINE_DATA, true);
|
||||||
|
@ -156,7 +156,7 @@ public class TestMRTimelineEventHandling {
|
||||||
Assert.assertEquals(JobStatus.SUCCEEDED,
|
Assert.assertEquals(JobStatus.SUCCEEDED,
|
||||||
job.getJobStatus().getState().getValue());
|
job.getJobStatus().getState().getValue());
|
||||||
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
|
entities = ts.getEntities("MAPREDUCE_JOB", null, null, null, null, null,
|
||||||
null, null, null);
|
null, null, null, null);
|
||||||
Assert.assertEquals(1, entities.getEntities().size());
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
TimelineEntity tEntity = entities.getEntities().get(0);
|
TimelineEntity tEntity = entities.getEntities().get(0);
|
||||||
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
Assert.assertEquals(job.getID().toString(), tEntity.getEntityId());
|
||||||
|
|
|
@ -703,6 +703,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating
|
YARN-3154. Added additional APIs in LogAggregationContext to avoid aggregating
|
||||||
running logs of application when rolling is enabled. (Xuan Gong via vinodkv)
|
running logs of application when rolling is enabled. (Xuan Gong via vinodkv)
|
||||||
|
|
||||||
|
YARN-3267. Timelineserver applies the ACL rules after applying the limit on
|
||||||
|
the number of records (Chang Li via jeagles)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -243,7 +243,7 @@ public class TestDistributedShell {
|
||||||
.getApplicationHistoryServer()
|
.getApplicationHistoryServer()
|
||||||
.getTimelineStore()
|
.getTimelineStore()
|
||||||
.getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
|
.getEntities(ApplicationMaster.DSEntity.DS_APP_ATTEMPT.toString(),
|
||||||
null, null, null, null, null, null, null, null);
|
null, null, null, null, null, null, null, null, null);
|
||||||
Assert.assertNotNull(entitiesAttempts);
|
Assert.assertNotNull(entitiesAttempts);
|
||||||
Assert.assertEquals(1, entitiesAttempts.getEntities().size());
|
Assert.assertEquals(1, entitiesAttempts.getEntities().size());
|
||||||
Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
|
Assert.assertEquals(2, entitiesAttempts.getEntities().get(0).getEvents()
|
||||||
|
@ -261,7 +261,7 @@ public class TestDistributedShell {
|
||||||
.getApplicationHistoryServer()
|
.getApplicationHistoryServer()
|
||||||
.getTimelineStore()
|
.getTimelineStore()
|
||||||
.getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
|
.getEntities(ApplicationMaster.DSEntity.DS_CONTAINER.toString(), null,
|
||||||
null, null, null, null, null, null, null);
|
null, null, null, null, null, null, null, null);
|
||||||
Assert.assertNotNull(entities);
|
Assert.assertNotNull(entities);
|
||||||
Assert.assertEquals(2, entities.getEntities().size());
|
Assert.assertEquals(2, entities.getEntities().size());
|
||||||
Assert.assertEquals(entities.getEntities().get(0).getEntityType()
|
Assert.assertEquals(entities.getEntities().get(0).getEntityType()
|
||||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
|
||||||
import org.apache.hadoop.yarn.server.records.Version;
|
import org.apache.hadoop.yarn.server.records.Version;
|
||||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
||||||
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
|
||||||
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
|
import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
|
||||||
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
|
||||||
|
@ -56,6 +57,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
|
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
|
||||||
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
|
import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
|
||||||
|
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
|
||||||
import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
|
import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
|
||||||
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
|
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
|
||||||
|
|
||||||
|
@ -549,12 +551,13 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
public TimelineEntities getEntities(String entityType,
|
public TimelineEntities getEntities(String entityType,
|
||||||
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
||||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
||||||
EnumSet<Field> fields) throws IOException {
|
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
|
||||||
if (primaryFilter == null) {
|
if (primaryFilter == null) {
|
||||||
// if no primary filter is specified, prefix the lookup with
|
// if no primary filter is specified, prefix the lookup with
|
||||||
// ENTITY_ENTRY_PREFIX
|
// ENTITY_ENTRY_PREFIX
|
||||||
return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
|
return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
|
||||||
windowStart, windowEnd, fromId, fromTs, secondaryFilters, fields);
|
windowStart, windowEnd, fromId, fromTs, secondaryFilters,
|
||||||
|
fields, checkAcl);
|
||||||
} else {
|
} else {
|
||||||
// if a primary filter is specified, prefix the lookup with
|
// if a primary filter is specified, prefix the lookup with
|
||||||
// INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
|
// INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
|
||||||
|
@ -564,7 +567,7 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
.add(GenericObjectMapper.write(primaryFilter.getValue()), true)
|
.add(GenericObjectMapper.write(primaryFilter.getValue()), true)
|
||||||
.add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
|
.add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
|
||||||
return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
|
return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
|
||||||
fromId, fromTs, secondaryFilters, fields);
|
fromId, fromTs, secondaryFilters, fields, checkAcl);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -586,7 +589,7 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
private TimelineEntities getEntityByTime(byte[] base,
|
private TimelineEntities getEntityByTime(byte[] base,
|
||||||
String entityType, Long limit, Long starttime, Long endtime,
|
String entityType, Long limit, Long starttime, Long endtime,
|
||||||
String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
|
String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
|
||||||
EnumSet<Field> fields) throws IOException {
|
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
|
||||||
LeveldbIterator iterator = null;
|
LeveldbIterator iterator = null;
|
||||||
try {
|
try {
|
||||||
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
|
KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
|
||||||
|
@ -683,9 +686,14 @@ public class LeveldbTimelineStore extends AbstractService
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (filterPassed) {
|
if (filterPassed) {
|
||||||
|
if (entity.getDomainId() == null) {
|
||||||
|
entity.setDomainId(DEFAULT_DOMAIN_ID);
|
||||||
|
}
|
||||||
|
if (checkAcl == null || checkAcl.check(entity)) {
|
||||||
entities.addEntity(entity);
|
entities.addEntity(entity);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return entities;
|
return entities;
|
||||||
} catch(DBException e) {
|
} catch(DBException e) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
|
|
|
@ -47,6 +47,9 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* In-memory implementation of {@link TimelineStore}. This
|
* In-memory implementation of {@link TimelineStore}. This
|
||||||
|
@ -79,7 +82,7 @@ public class MemoryTimelineStore
|
||||||
public synchronized TimelineEntities getEntities(String entityType, Long limit,
|
public synchronized TimelineEntities getEntities(String entityType, Long limit,
|
||||||
Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
||||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
||||||
EnumSet<Field> fields) {
|
EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
|
||||||
if (limit == null) {
|
if (limit == null) {
|
||||||
limit = DEFAULT_LIMIT;
|
limit = DEFAULT_LIMIT;
|
||||||
}
|
}
|
||||||
|
@ -146,8 +149,13 @@ public class MemoryTimelineStore
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (entity.getDomainId() == null) {
|
||||||
|
entity.setDomainId(DEFAULT_DOMAIN_ID);
|
||||||
|
}
|
||||||
|
if (checkAcl == null || checkAcl.check(entity)) {
|
||||||
entitiesSelected.add(entity);
|
entitiesSelected.add(entity);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
|
List<TimelineEntity> entitiesToReturn = new ArrayList<TimelineEntity>();
|
||||||
for (TimelineEntity entitySelected : entitiesSelected) {
|
for (TimelineEntity entitySelected : entitiesSelected) {
|
||||||
entitiesToReturn.add(maskFields(entitySelected, fields));
|
entitiesToReturn.add(maskFields(entitySelected, fields));
|
||||||
|
|
|
@ -90,6 +90,31 @@ public class TimelineDataManager extends AbstractService {
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public interface CheckAcl {
|
||||||
|
boolean check(TimelineEntity entity) throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
class CheckAclImpl implements CheckAcl {
|
||||||
|
final UserGroupInformation ugi;
|
||||||
|
|
||||||
|
public CheckAclImpl(UserGroupInformation callerUGI) {
|
||||||
|
ugi = callerUGI;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean check(TimelineEntity entity) throws IOException {
|
||||||
|
try{
|
||||||
|
return timelineACLsManager.checkAccess(
|
||||||
|
ugi, ApplicationAccessType.VIEW_APP, entity);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.info("Error when verifying access for user " + ugi
|
||||||
|
+ " on the events of the timeline entity "
|
||||||
|
+ new EntityIdentifier(entity.getEntityId(),
|
||||||
|
entity.getEntityType()), e);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the timeline entities that the given user have access to. The meaning
|
* Get the timeline entities that the given user have access to. The meaning
|
||||||
* of each argument has been documented with
|
* of each argument has been documented with
|
||||||
|
@ -118,28 +143,9 @@ public class TimelineDataManager extends AbstractService {
|
||||||
fromTs,
|
fromTs,
|
||||||
primaryFilter,
|
primaryFilter,
|
||||||
secondaryFilter,
|
secondaryFilter,
|
||||||
fields);
|
fields,
|
||||||
if (entities != null) {
|
new CheckAclImpl(callerUGI));
|
||||||
Iterator<TimelineEntity> entitiesItr =
|
|
||||||
entities.getEntities().iterator();
|
|
||||||
while (entitiesItr.hasNext()) {
|
|
||||||
TimelineEntity entity = entitiesItr.next();
|
|
||||||
addDefaultDomainIdIfAbsent(entity);
|
|
||||||
try {
|
|
||||||
// check ACLs
|
|
||||||
if (!timelineACLsManager.checkAccess(
|
|
||||||
callerUGI, ApplicationAccessType.VIEW_APP, entity)) {
|
|
||||||
entitiesItr.remove();
|
|
||||||
}
|
|
||||||
} catch (YarnException e) {
|
|
||||||
LOG.error("Error when verifying access for user " + callerUGI
|
|
||||||
+ " on the events of the timeline entity "
|
|
||||||
+ new EntityIdentifier(entity.getEntityId(),
|
|
||||||
entity.getEntityType()), e);
|
|
||||||
entitiesItr.remove();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (entities == null) {
|
if (entities == null) {
|
||||||
return new TimelineEntities();
|
return new TimelineEntities();
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||||
|
import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This interface is for retrieving timeline information.
|
* This interface is for retrieving timeline information.
|
||||||
|
@ -106,7 +107,7 @@ public interface TimelineReader {
|
||||||
TimelineEntities getEntities(String entityType,
|
TimelineEntities getEntities(String entityType,
|
||||||
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
|
||||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
|
||||||
EnumSet<Field> fieldsToRetrieve) throws IOException;
|
EnumSet<Field> fieldsToRetrieve, CheckAcl checkAcl) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method retrieves the entity information for a given entity.
|
* This method retrieves the entity information for a given entity.
|
||||||
|
|
|
@ -164,13 +164,13 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
|
||||||
@Test
|
@Test
|
||||||
public void testGetEntityTypes() throws IOException {
|
public void testGetEntityTypes() throws IOException {
|
||||||
List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
|
List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
|
||||||
assertEquals(6, entityTypes.size());
|
assertEquals(7, entityTypes.size());
|
||||||
assertEquals("OLD_ENTITY_TYPE_1", entityTypes.get(0));
|
assertEquals("ACL_ENTITY_TYPE_1", entityTypes.get(0));
|
||||||
assertEquals(entityType1, entityTypes.get(1));
|
assertEquals("OLD_ENTITY_TYPE_1", entityTypes.get(1));
|
||||||
assertEquals(entityType2, entityTypes.get(2));
|
assertEquals(entityType1, entityTypes.get(2));
|
||||||
assertEquals(entityType4, entityTypes.get(3));
|
assertEquals(entityType2, entityTypes.get(3));
|
||||||
assertEquals(entityType5, entityTypes.get(4));
|
assertEquals(entityType4, entityTypes.get(4));
|
||||||
assertEquals(entityType7, entityTypes.get(5));
|
assertEquals(entityType5, entityTypes.get(5));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -201,7 +201,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
|
||||||
((LeveldbTimelineStore)store).discardOldEntities(-123l);
|
((LeveldbTimelineStore)store).discardOldEntities(-123l);
|
||||||
assertEquals(2, getEntities("type_1").size());
|
assertEquals(2, getEntities("type_1").size());
|
||||||
assertEquals(0, getEntities("type_2").size());
|
assertEquals(0, getEntities("type_2").size());
|
||||||
assertEquals(5, ((LeveldbTimelineStore)store).getEntityTypes().size());
|
assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size());
|
||||||
|
|
||||||
((LeveldbTimelineStore)store).discardOldEntities(123l);
|
((LeveldbTimelineStore)store).discardOldEntities(123l);
|
||||||
assertEquals(0, getEntities("type_1").size());
|
assertEquals(0, getEntities("type_1").size());
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.AdminACLsManager;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -40,7 +41,8 @@ public class TestTimelineDataManager extends TimelineStoreTestUtils {
|
||||||
private FileContext fsContext;
|
private FileContext fsContext;
|
||||||
private File fsPath;
|
private File fsPath;
|
||||||
private TimelineDataManager dataManaer;
|
private TimelineDataManager dataManaer;
|
||||||
|
private static TimelineACLsManager aclsManager;
|
||||||
|
private static AdminACLsManager adminACLsManager;
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws Exception {
|
public void setup() throws Exception {
|
||||||
fsPath = new File("target", this.getClass().getSimpleName() +
|
fsPath = new File("target", this.getClass().getSimpleName() +
|
||||||
|
@ -58,8 +60,12 @@ public class TestTimelineDataManager extends TimelineStoreTestUtils {
|
||||||
loadVerificationEntityData();
|
loadVerificationEntityData();
|
||||||
loadTestDomainData();
|
loadTestDomainData();
|
||||||
|
|
||||||
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, false);
|
||||||
|
aclsManager = new TimelineACLsManager(conf);
|
||||||
dataManaer = new TimelineDataManager(store, aclsManager);
|
dataManaer = new TimelineDataManager(store, aclsManager);
|
||||||
|
conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
|
||||||
|
conf.set(YarnConfiguration.YARN_ADMIN_ACL, "admin");
|
||||||
|
adminACLsManager = new AdminACLsManager(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
|
@ -84,6 +90,22 @@ public class TestTimelineDataManager extends TimelineStoreTestUtils {
|
||||||
TimelineDataManager.DEFAULT_DOMAIN_ID, entity.getDomainId());
|
TimelineDataManager.DEFAULT_DOMAIN_ID, entity.getDomainId());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetEntitiesAclEnabled() throws Exception {
|
||||||
|
AdminACLsManager oldAdminACLsManager =
|
||||||
|
aclsManager.setAdminACLsManager(adminACLsManager);
|
||||||
|
try {
|
||||||
|
TimelineEntities entities = dataManaer.getEntities(
|
||||||
|
"ACL_ENTITY_TYPE_1", null, null, null, null, null, null, 1l, null,
|
||||||
|
UserGroupInformation.createUserForTesting("owner_1", new String[] {"group1"}));
|
||||||
|
Assert.assertEquals(1, entities.getEntities().size());
|
||||||
|
Assert.assertEquals("ACL_ENTITY_ID_11",
|
||||||
|
entities.getEntities().get(0).getEntityId());
|
||||||
|
} finally {
|
||||||
|
aclsManager.setAdminACLsManager(oldAdminACLsManager);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetOldEntitiesWithOutDomainId() throws Exception {
|
public void testGetOldEntitiesWithOutDomainId() throws Exception {
|
||||||
TimelineEntities entities = dataManaer.getEntities(
|
TimelineEntities entities = dataManaer.getEntities(
|
||||||
|
|
|
@ -353,6 +353,19 @@ public class TimelineStoreTestUtils {
|
||||||
domain3.setReaders("reader_user_4 reader_group_4");
|
domain3.setReaders("reader_user_4 reader_group_4");
|
||||||
domain3.setWriters("writer_user_4 writer_group_4");
|
domain3.setWriters("writer_user_4 writer_group_4");
|
||||||
store.put(domain3);
|
store.put(domain3);
|
||||||
|
|
||||||
|
TimelineEntities entities = new TimelineEntities();
|
||||||
|
if (store instanceof LeveldbTimelineStore) {
|
||||||
|
LeveldbTimelineStore leveldb = (LeveldbTimelineStore) store;
|
||||||
|
entities.setEntities(Collections.singletonList(createEntity(
|
||||||
|
"ACL_ENTITY_ID_11", "ACL_ENTITY_TYPE_1", 63l, null, null, null, null,
|
||||||
|
"domain_id_4")));
|
||||||
|
leveldb.put(entities);
|
||||||
|
entities.setEntities(Collections.singletonList(createEntity(
|
||||||
|
"ACL_ENTITY_ID_22", "ACL_ENTITY_TYPE_1", 64l, null, null, null, null,
|
||||||
|
"domain_id_2")));
|
||||||
|
leveldb.put(entities);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetSingleEntity() throws IOException {
|
public void testGetSingleEntity() throws IOException {
|
||||||
|
@ -419,66 +432,66 @@ public class TimelineStoreTestUtils {
|
||||||
protected List<TimelineEntity> getEntities(String entityType)
|
protected List<TimelineEntity> getEntities(String entityType)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return store.getEntities(entityType, null, null, null, null, null,
|
return store.getEntities(entityType, null, null, null, null, null,
|
||||||
null, null, null).getEntities();
|
null, null, null, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<TimelineEntity> getEntitiesWithPrimaryFilter(
|
protected List<TimelineEntity> getEntitiesWithPrimaryFilter(
|
||||||
String entityType, NameValuePair primaryFilter) throws IOException {
|
String entityType, NameValuePair primaryFilter) throws IOException {
|
||||||
return store.getEntities(entityType, null, null, null, null, null,
|
return store.getEntities(entityType, null, null, null, null, null,
|
||||||
primaryFilter, null, null).getEntities();
|
primaryFilter, null, null, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<TimelineEntity> getEntitiesFromId(String entityType,
|
protected List<TimelineEntity> getEntitiesFromId(String entityType,
|
||||||
String fromId) throws IOException {
|
String fromId) throws IOException {
|
||||||
return store.getEntities(entityType, null, null, null, fromId, null,
|
return store.getEntities(entityType, null, null, null, fromId, null,
|
||||||
null, null, null).getEntities();
|
null, null, null, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<TimelineEntity> getEntitiesFromTs(String entityType,
|
protected List<TimelineEntity> getEntitiesFromTs(String entityType,
|
||||||
long fromTs) throws IOException {
|
long fromTs) throws IOException {
|
||||||
return store.getEntities(entityType, null, null, null, null, fromTs,
|
return store.getEntities(entityType, null, null, null, null, fromTs,
|
||||||
null, null, null).getEntities();
|
null, null, null, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilter(
|
protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilter(
|
||||||
String entityType, NameValuePair primaryFilter, String fromId)
|
String entityType, NameValuePair primaryFilter, String fromId)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return store.getEntities(entityType, null, null, null, fromId, null,
|
return store.getEntities(entityType, null, null, null, fromId, null,
|
||||||
primaryFilter, null, null).getEntities();
|
primaryFilter, null, null, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<TimelineEntity> getEntitiesFromTsWithPrimaryFilter(
|
protected List<TimelineEntity> getEntitiesFromTsWithPrimaryFilter(
|
||||||
String entityType, NameValuePair primaryFilter, long fromTs)
|
String entityType, NameValuePair primaryFilter, long fromTs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return store.getEntities(entityType, null, null, null, null, fromTs,
|
return store.getEntities(entityType, null, null, null, null, fromTs,
|
||||||
primaryFilter, null, null).getEntities();
|
primaryFilter, null, null, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<TimelineEntity> getEntitiesFromIdWithWindow(String entityType,
|
protected List<TimelineEntity> getEntitiesFromIdWithWindow(String entityType,
|
||||||
Long windowEnd, String fromId) throws IOException {
|
Long windowEnd, String fromId) throws IOException {
|
||||||
return store.getEntities(entityType, null, null, windowEnd, fromId, null,
|
return store.getEntities(entityType, null, null, windowEnd, fromId, null,
|
||||||
null, null, null).getEntities();
|
null, null, null, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilterAndWindow(
|
protected List<TimelineEntity> getEntitiesFromIdWithPrimaryFilterAndWindow(
|
||||||
String entityType, Long windowEnd, String fromId,
|
String entityType, Long windowEnd, String fromId,
|
||||||
NameValuePair primaryFilter) throws IOException {
|
NameValuePair primaryFilter) throws IOException {
|
||||||
return store.getEntities(entityType, null, null, windowEnd, fromId, null,
|
return store.getEntities(entityType, null, null, windowEnd, fromId, null,
|
||||||
primaryFilter, null, null).getEntities();
|
primaryFilter, null, null, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<TimelineEntity> getEntitiesWithFilters(String entityType,
|
protected List<TimelineEntity> getEntitiesWithFilters(String entityType,
|
||||||
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
|
NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
return store.getEntities(entityType, null, null, null, null, null,
|
return store.getEntities(entityType, null, null, null, null, null,
|
||||||
primaryFilter, secondaryFilters, null).getEntities();
|
primaryFilter, secondaryFilters, null, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
protected List<TimelineEntity> getEntities(String entityType, Long limit,
|
protected List<TimelineEntity> getEntities(String entityType, Long limit,
|
||||||
Long windowStart, Long windowEnd, NameValuePair primaryFilter,
|
Long windowStart, Long windowEnd, NameValuePair primaryFilter,
|
||||||
EnumSet<Field> fields) throws IOException {
|
EnumSet<Field> fields) throws IOException {
|
||||||
return store.getEntities(entityType, limit, windowStart, windowEnd, null,
|
return store.getEntities(entityType, limit, windowStart, windowEnd, null,
|
||||||
null, primaryFilter, null, fields).getEntities();
|
null, primaryFilter, null, fields, null).getEntities();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testGetEntities() throws IOException {
|
public void testGetEntities() throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue