YARN-2819. NPE in ATS Timeline Domains when upgrading from 2.4 to 2.6. Contributed by Zhijie Shen

This commit is contained in:
Xuan 2014-11-07 16:11:30 -08:00
parent 1e215e8ba2
commit 4a114dd67a
6 changed files with 293 additions and 21 deletions

View File

@ -911,6 +911,9 @@ Release 2.6.0 - UNRELEASED
YARN-2825. Container leak on NM (Jian He via jlowe) YARN-2825. Container leak on NM (Jian He via jlowe)
YARN-2819. NPE in ATS Timeline Domains when upgrading from 2.4 to 2.6.
(Zhijie Shen via xgong)
Release 2.5.2 - UNRELEASED Release 2.5.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -792,7 +792,8 @@ public class LeveldbTimelineStore extends AbstractService
* Put a single entity. If there is an error, add a TimelinePutError to the * Put a single entity. If there is an error, add a TimelinePutError to the
* given response. * given response.
*/ */
private void put(TimelineEntity entity, TimelinePutResponse response) { private void put(TimelineEntity entity, TimelinePutResponse response,
boolean allowEmptyDomainId) {
LockMap.CountingReentrantLock<EntityIdentifier> lock = LockMap.CountingReentrantLock<EntityIdentifier> lock =
writeLocks.getLock(new EntityIdentifier(entity.getEntityId(), writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
entity.getEntityType())); entity.getEntityType()));
@ -867,10 +868,18 @@ public class LeveldbTimelineStore extends AbstractService
new EntityIdentifier(relatedEntityId, relatedEntityType)); new EntityIdentifier(relatedEntityId, relatedEntityType));
continue; continue;
} else { } else {
// This is the existing entity
byte[] domainIdBytes = db.get(createDomainIdKey( byte[] domainIdBytes = db.get(createDomainIdKey(
relatedEntityId, relatedEntityType, relatedEntityStartTime)); relatedEntityId, relatedEntityType, relatedEntityStartTime));
// This is the existing entity // The timeline data created by the server before 2.6 won't have
String domainId = new String(domainIdBytes); // the domain field. We assume this timeline data is in the
// default timeline domain.
String domainId = null;
if (domainIdBytes == null) {
domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
} else {
domainId = new String(domainIdBytes);
}
if (!domainId.equals(entity.getDomainId())) { if (!domainId.equals(entity.getDomainId())) {
// in this case the entity will be put, but the relation will be // in this case the entity will be put, but the relation will be
// ignored // ignored
@ -923,12 +932,14 @@ public class LeveldbTimelineStore extends AbstractService
entity.getEntityType(), revStartTime); entity.getEntityType(), revStartTime);
if (entity.getDomainId() == null || if (entity.getDomainId() == null ||
entity.getDomainId().length() == 0) { entity.getDomainId().length() == 0) {
TimelinePutError error = new TimelinePutError(); if (!allowEmptyDomainId) {
error.setEntityId(entity.getEntityId()); TimelinePutError error = new TimelinePutError();
error.setEntityType(entity.getEntityType()); error.setEntityId(entity.getEntityId());
error.setErrorCode(TimelinePutError.NO_DOMAIN); error.setEntityType(entity.getEntityType());
response.addError(error); error.setErrorCode(TimelinePutError.NO_DOMAIN);
return; response.addError(error);
return;
}
} else { } else {
writeBatch.put(key, entity.getDomainId().getBytes()); writeBatch.put(key, entity.getDomainId().getBytes());
writePrimaryFilterEntries(writeBatch, primaryFilters, key, writePrimaryFilterEntries(writeBatch, primaryFilters, key,
@ -1011,7 +1022,22 @@ public class LeveldbTimelineStore extends AbstractService
deleteLock.readLock().lock(); deleteLock.readLock().lock();
TimelinePutResponse response = new TimelinePutResponse(); TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : entities.getEntities()) { for (TimelineEntity entity : entities.getEntities()) {
put(entity, response); put(entity, response, false);
}
return response;
} finally {
deleteLock.readLock().unlock();
}
}
@Private
@VisibleForTesting
public TimelinePutResponse putWithNoDomainId(TimelineEntities entities) {
try {
deleteLock.readLock().lock();
TimelinePutResponse response = new TimelinePutResponse();
for (TimelineEntity entity : entities.getEntities()) {
put(entity, response, true);
} }
return response; return response;
} finally { } finally {

View File

@ -124,6 +124,7 @@ public class TimelineDataManager extends AbstractService {
entities.getEntities().iterator(); entities.getEntities().iterator();
while (entitiesItr.hasNext()) { while (entitiesItr.hasNext()) {
TimelineEntity entity = entitiesItr.next(); TimelineEntity entity = entitiesItr.next();
addDefaultDomainIdIfAbsent(entity);
try { try {
// check ACLs // check ACLs
if (!timelineACLsManager.checkAccess( if (!timelineACLsManager.checkAccess(
@ -161,6 +162,7 @@ public class TimelineDataManager extends AbstractService {
entity = entity =
store.getEntity(entityId, entityType, fields); store.getEntity(entityId, entityType, fields);
if (entity != null) { if (entity != null) {
addDefaultDomainIdIfAbsent(entity);
// check ACLs // check ACLs
if (!timelineACLsManager.checkAccess( if (!timelineACLsManager.checkAccess(
callerUGI, ApplicationAccessType.VIEW_APP, entity)) { callerUGI, ApplicationAccessType.VIEW_APP, entity)) {
@ -203,6 +205,7 @@ public class TimelineDataManager extends AbstractService {
eventsOfOneEntity.getEntityId(), eventsOfOneEntity.getEntityId(),
eventsOfOneEntity.getEntityType(), eventsOfOneEntity.getEntityType(),
EnumSet.of(Field.PRIMARY_FILTERS)); EnumSet.of(Field.PRIMARY_FILTERS));
addDefaultDomainIdIfAbsent(entity);
// check ACLs // check ACLs
if (!timelineACLsManager.checkAccess( if (!timelineACLsManager.checkAccess(
callerUGI, ApplicationAccessType.VIEW_APP, entity)) { callerUGI, ApplicationAccessType.VIEW_APP, entity)) {
@ -254,10 +257,12 @@ public class TimelineDataManager extends AbstractService {
existingEntity = existingEntity =
store.getEntity(entityID.getId(), entityID.getType(), store.getEntity(entityID.getId(), entityID.getType(),
EnumSet.of(Field.PRIMARY_FILTERS)); EnumSet.of(Field.PRIMARY_FILTERS));
if (existingEntity != null && if (existingEntity != null) {
!existingEntity.getDomainId().equals(entity.getDomainId())) { addDefaultDomainIdIfAbsent(existingEntity);
throw new YarnException("The domain of the timeline entity " if (!existingEntity.getDomainId().equals(entity.getDomainId())) {
+ entityID + " is not allowed to be changed."); throw new YarnException("The domain of the timeline entity "
+ entityID + " is not allowed to be changed.");
}
} }
if (!timelineACLsManager.checkAccess( if (!timelineACLsManager.checkAccess(
callerUGI, ApplicationAccessType.MODIFY_APP, entity)) { callerUGI, ApplicationAccessType.MODIFY_APP, entity)) {
@ -355,4 +360,11 @@ public class TimelineDataManager extends AbstractService {
} }
} }
private static void addDefaultDomainIdIfAbsent(TimelineEntity entity) {
// be compatible with the timeline data created before 2.6
if (entity.getDomainId() == null) {
entity.setDomainId(DEFAULT_DOMAIN_ID);
}
}
} }

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.service.ServiceStateException;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version; import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
@ -160,12 +161,13 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
@Test @Test
public void testGetEntityTypes() throws IOException { public void testGetEntityTypes() throws IOException {
List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes(); List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
assertEquals(5, entityTypes.size()); assertEquals(6, entityTypes.size());
assertEquals(entityType1, entityTypes.get(0)); assertEquals("OLD_ENTITY_TYPE_1", entityTypes.get(0));
assertEquals(entityType2, entityTypes.get(1)); assertEquals(entityType1, entityTypes.get(1));
assertEquals(entityType4, entityTypes.get(2)); assertEquals(entityType2, entityTypes.get(2));
assertEquals(entityType5, entityTypes.get(3)); assertEquals(entityType4, entityTypes.get(3));
assertEquals(entityType7, entityTypes.get(4)); assertEquals(entityType5, entityTypes.get(4));
assertEquals(entityType7, entityTypes.get(5));
} }
@Test @Test
@ -196,7 +198,7 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
((LeveldbTimelineStore)store).discardOldEntities(-123l); ((LeveldbTimelineStore)store).discardOldEntities(-123l);
assertEquals(2, getEntities("type_1").size()); assertEquals(2, getEntities("type_1").size());
assertEquals(0, getEntities("type_2").size()); assertEquals(0, getEntities("type_2").size());
assertEquals(4, ((LeveldbTimelineStore)store).getEntityTypes().size()); assertEquals(5, ((LeveldbTimelineStore)store).getEntityTypes().size());
((LeveldbTimelineStore)store).discardOldEntities(123l); ((LeveldbTimelineStore)store).discardOldEntities(123l);
assertEquals(0, getEntities("type_1").size()); assertEquals(0, getEntities("type_1").size());
@ -327,4 +329,69 @@ public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
super.testGetDomains(); super.testGetDomains();
} }
@Test
public void testRelatingToNonExistingEntity() throws IOException {
TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
entityToStore.setEntityId("TEST_ENTITY_ID_1");
entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
store.put(entities);
TimelineEntity entityToGet =
store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
Assert.assertNotNull(entityToGet);
Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
Assert.assertEquals("TEST_ENTITY_TYPE_1",
entityToGet.getRelatedEntities().keySet().iterator().next());
Assert.assertEquals("TEST_ENTITY_ID_1",
entityToGet.getRelatedEntities().values().iterator().next()
.iterator().next());
}
@Test
public void testRelatingToOldEntityWithoutDomainId() throws IOException {
// New entity is put in the default domain
TimelineEntity entityToStore = new TimelineEntity();
entityToStore.setEntityType("NEW_ENTITY_TYPE_1");
entityToStore.setEntityId("NEW_ENTITY_ID_1");
entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entityToStore);
store.put(entities);
TimelineEntity entityToGet =
store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
Assert.assertNotNull(entityToGet);
Assert.assertNull(entityToGet.getDomainId());
Assert.assertEquals("NEW_ENTITY_TYPE_1",
entityToGet.getRelatedEntities().keySet().iterator().next());
Assert.assertEquals("NEW_ENTITY_ID_1",
entityToGet.getRelatedEntities().values().iterator().next()
.iterator().next());
// New entity is not put in the default domain
entityToStore = new TimelineEntity();
entityToStore.setEntityType("NEW_ENTITY_TYPE_2");
entityToStore.setEntityId("NEW_ENTITY_ID_2");
entityToStore.setDomainId("NON_DEFAULT");
entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
entities = new TimelineEntities();
entities.addEntity(entityToStore);
TimelinePutResponse response = store.put(entities);
Assert.assertEquals(1, response.getErrors().size());
Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION,
response.getErrors().get(0).getErrorCode());
entityToGet =
store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
Assert.assertNotNull(entityToGet);
Assert.assertNull(entityToGet.getDomainId());
// Still have one related entity
Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size());
Assert.assertEquals(1, entityToGet.getRelatedEntities().values()
.iterator().next().size());
}
} }

View File

@ -0,0 +1,152 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timeline;
import java.io.File;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.timeline.security.TimelineACLsManager;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class TestTimelineDataManager extends TimelineStoreTestUtils {
private FileContext fsContext;
private File fsPath;
private TimelineDataManager dataManaer;
@Before
public void setup() throws Exception {
fsPath = new File("target", this.getClass().getSimpleName() +
"-tmpDir").getAbsoluteFile();
fsContext = FileContext.getLocalFSFileContext();
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
Configuration conf = new YarnConfiguration();
conf.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
fsPath.getAbsolutePath());
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
store = new LeveldbTimelineStore();
store.init(conf);
store.start();
loadTestEntityData();
loadVerificationEntityData();
loadTestDomainData();
TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
dataManaer = new TimelineDataManager(store, aclsManager);
}
@After
public void tearDown() throws Exception {
if (store != null) {
store.stop();
}
if (fsContext != null) {
fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
}
}
@Test
public void testGetOldEntityWithOutDomainId() throws Exception {
TimelineEntity entity = dataManaer.getEntity(
"OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1", null,
UserGroupInformation.getCurrentUser());
Assert.assertNotNull(entity);
Assert.assertEquals("OLD_ENTITY_ID_1", entity.getEntityId());
Assert.assertEquals("OLD_ENTITY_TYPE_1", entity.getEntityType());
Assert.assertEquals(
TimelineDataManager.DEFAULT_DOMAIN_ID, entity.getDomainId());
}
@Test
public void testGetOldEntitiesWithOutDomainId() throws Exception {
TimelineEntities entities = dataManaer.getEntities(
"OLD_ENTITY_TYPE_1", null, null, null, null, null, null, null, null,
UserGroupInformation.getCurrentUser());
Assert.assertEquals(2, entities.getEntities().size());
Assert.assertEquals("OLD_ENTITY_ID_2",
entities.getEntities().get(0).getEntityId());
Assert.assertEquals("OLD_ENTITY_TYPE_1",
entities.getEntities().get(0).getEntityType());
Assert.assertEquals(TimelineDataManager.DEFAULT_DOMAIN_ID,
entities.getEntities().get(0).getDomainId());
Assert.assertEquals("OLD_ENTITY_ID_1",
entities.getEntities().get(1).getEntityId());
Assert.assertEquals("OLD_ENTITY_TYPE_1",
entities.getEntities().get(1).getEntityType());
Assert.assertEquals(TimelineDataManager.DEFAULT_DOMAIN_ID,
entities.getEntities().get(1).getDomainId());
}
@Test
public void testUpdatingOldEntityWithoutDomainId() throws Exception {
// Set the domain to the default domain when updating
TimelineEntity entity = new TimelineEntity();
entity.setEntityType("OLD_ENTITY_TYPE_1");
entity.setEntityId("OLD_ENTITY_ID_1");
entity.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
entity.addOtherInfo("NEW_OTHER_INFO_KEY", "NEW_OTHER_INFO_VALUE");
TimelineEntities entities = new TimelineEntities();
entities.addEntity(entity);
TimelinePutResponse response = dataManaer.postEntities(
entities, UserGroupInformation.getCurrentUser());
Assert.assertEquals(0, response.getErrors().size());
entity = store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
Assert.assertNotNull(entity);
// Even in leveldb, the domain is updated to the default domain Id
Assert.assertEquals(
TimelineDataManager.DEFAULT_DOMAIN_ID, entity.getDomainId());
Assert.assertEquals(1, entity.getOtherInfo().size());
Assert.assertEquals("NEW_OTHER_INFO_KEY",
entity.getOtherInfo().keySet().iterator().next());
Assert.assertEquals("NEW_OTHER_INFO_VALUE",
entity.getOtherInfo().values().iterator().next());
// Set the domain to the non-default domain when updating
entity = new TimelineEntity();
entity.setEntityType("OLD_ENTITY_TYPE_1");
entity.setEntityId("OLD_ENTITY_ID_2");
entity.setDomainId("NON_DEFAULT");
entity.addOtherInfo("NEW_OTHER_INFO_KEY", "NEW_OTHER_INFO_VALUE");
entities = new TimelineEntities();
entities.addEntity(entity);
response = dataManaer.postEntities(
entities, UserGroupInformation.getCurrentUser());
Assert.assertEquals(1, response.getErrors().size());
Assert.assertEquals(TimelinePutResponse.TimelinePutError.ACCESS_DENIED,
response.getErrors().get(0).getErrorCode());
entity = store.getEntity("OLD_ENTITY_ID_2", "OLD_ENTITY_TYPE_1", null);
Assert.assertNotNull(entity);
// In leveldb, the domain Id is still null
Assert.assertNull(entity.getDomainId());
// Updating is not executed
Assert.assertEquals(0, entity.getOtherInfo().size());
}
}

View File

@ -210,6 +210,18 @@ public class TimelineStoreTestUtils {
assertEquals(entityId7, response.getErrors().get(0).getEntityId()); assertEquals(entityId7, response.getErrors().get(0).getEntityId());
assertEquals(TimelinePutError.FORBIDDEN_RELATION, assertEquals(TimelinePutError.FORBIDDEN_RELATION,
response.getErrors().get(0).getErrorCode()); response.getErrors().get(0).getErrorCode());
if (store instanceof LeveldbTimelineStore) {
LeveldbTimelineStore leveldb = (LeveldbTimelineStore) store;
entities.setEntities(Collections.singletonList(createEntity(
"OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", 63l, null, null, null, null,
null)));
leveldb.putWithNoDomainId(entities);
entities.setEntities(Collections.singletonList(createEntity(
"OLD_ENTITY_ID_2", "OLD_ENTITY_TYPE_1", 64l, null, null, null, null,
null)));
leveldb.putWithNoDomainId(entities);
}
} }
/** /**