YARN-3830. AbstractYarnScheduler.createReleaseCache may try to clean a
null attempt. Contributed by nijel.
This commit is contained in:
parent
7405c59799
commit
80a68d6056
|
@ -583,6 +583,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
YARN-3823. Fix mismatch in default values for
|
YARN-3823. Fix mismatch in default values for
|
||||||
yarn.scheduler.maximum-allocation-vcores property. (Ray Chiang via devaraj)
|
yarn.scheduler.maximum-allocation-vcores property. (Ray Chiang via devaraj)
|
||||||
|
|
||||||
|
YARN-3830. AbstractYarnScheduler.createReleaseCache may try to clean a null
|
||||||
|
attempt. (nijel via devaraj)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -66,6 +66,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.SettableFuture;
|
import com.google.common.util.concurrent.SettableFuture;
|
||||||
|
|
||||||
|
|
||||||
|
@ -451,27 +452,32 @@ public abstract class AbstractYarnScheduler
|
||||||
new Timer().schedule(new TimerTask() {
|
new Timer().schedule(new TimerTask() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
for (SchedulerApplication<T> app : applications.values()) {
|
clearPendingContainerCache();
|
||||||
|
|
||||||
T attempt = app.getCurrentAppAttempt();
|
|
||||||
synchronized (attempt) {
|
|
||||||
for (ContainerId containerId : attempt.getPendingRelease()) {
|
|
||||||
RMAuditLogger.logFailure(
|
|
||||||
app.getUser(),
|
|
||||||
AuditConstants.RELEASE_CONTAINER,
|
|
||||||
"Unauthorized access or invalid container",
|
|
||||||
"Scheduler",
|
|
||||||
"Trying to release container not owned by app or with invalid id.",
|
|
||||||
attempt.getApplicationId(), containerId);
|
|
||||||
}
|
|
||||||
attempt.getPendingRelease().clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
LOG.info("Release request cache is cleaned up");
|
LOG.info("Release request cache is cleaned up");
|
||||||
}
|
}
|
||||||
}, nmExpireInterval);
|
}, nmExpireInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void clearPendingContainerCache() {
|
||||||
|
for (SchedulerApplication<T> app : applications.values()) {
|
||||||
|
T attempt = app.getCurrentAppAttempt();
|
||||||
|
if (attempt != null) {
|
||||||
|
synchronized (attempt) {
|
||||||
|
for (ContainerId containerId : attempt.getPendingRelease()) {
|
||||||
|
RMAuditLogger.logFailure(app.getUser(),
|
||||||
|
AuditConstants.RELEASE_CONTAINER,
|
||||||
|
"Unauthorized access or invalid container", "Scheduler",
|
||||||
|
"Trying to release container not owned by app "
|
||||||
|
+ "or with invalid id.", attempt.getApplicationId(),
|
||||||
|
containerId);
|
||||||
|
}
|
||||||
|
attempt.getPendingRelease().clear();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// clean up a completed container
|
// clean up a completed container
|
||||||
protected abstract void completedContainer(RMContainer rmContainer,
|
protected abstract void completedContainer(RMContainer rmContainer,
|
||||||
ContainerStatus containerStatus, RMContainerEventType event);
|
ContainerStatus containerStatus, RMContainerEventType event);
|
||||||
|
|
|
@ -18,26 +18,36 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
|
||||||
|
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
|
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
|
||||||
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
||||||
|
|
||||||
|
@ -341,6 +351,58 @@ public class TestAbstractYarnScheduler extends ParameterizedSchedulerTestBase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This test case is to test the pending containers are cleared from the
|
||||||
|
* attempt even if one of the application in the list have current attempt as
|
||||||
|
* null (no attempt).
|
||||||
|
*/
|
||||||
|
@SuppressWarnings({ "rawtypes" })
|
||||||
|
@Test(timeout = 10000)
|
||||||
|
public void testReleasedContainerIfAppAttemptisNull() throws Exception {
|
||||||
|
YarnConfiguration conf=getConf();
|
||||||
|
conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
|
||||||
|
MemoryRMStateStore memStore = new MemoryRMStateStore();
|
||||||
|
memStore.init(conf);
|
||||||
|
MockRM rm1 = new MockRM(conf, memStore);
|
||||||
|
try {
|
||||||
|
rm1.start();
|
||||||
|
MockNM nm1 =
|
||||||
|
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
|
||||||
|
nm1.registerNode();
|
||||||
|
|
||||||
|
AbstractYarnScheduler scheduler =
|
||||||
|
(AbstractYarnScheduler) rm1.getResourceScheduler();
|
||||||
|
// Mock App without attempt
|
||||||
|
RMApp mockAPp =
|
||||||
|
new MockRMApp(125, System.currentTimeMillis(), RMAppState.NEW);
|
||||||
|
SchedulerApplication<FiCaSchedulerApp> application =
|
||||||
|
new SchedulerApplication<FiCaSchedulerApp>(null, mockAPp.getUser());
|
||||||
|
|
||||||
|
// Second app with one app attempt
|
||||||
|
RMApp app = rm1.submitApp(200);
|
||||||
|
MockAM am1 = MockRM.launchAndRegisterAM(app, rm1, nm1);
|
||||||
|
final ContainerId runningContainer =
|
||||||
|
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
|
||||||
|
am1.allocate(null, Arrays.asList(runningContainer));
|
||||||
|
|
||||||
|
Map schedulerApplications = scheduler.getSchedulerApplications();
|
||||||
|
SchedulerApplication schedulerApp =
|
||||||
|
(SchedulerApplication) scheduler.getSchedulerApplications().get(
|
||||||
|
app.getApplicationId());
|
||||||
|
schedulerApplications.put(mockAPp.getApplicationId(), application);
|
||||||
|
|
||||||
|
scheduler.clearPendingContainerCache();
|
||||||
|
|
||||||
|
Assert.assertEquals("Pending containers are not released "
|
||||||
|
+ "when one of the application attempt is null !", schedulerApp
|
||||||
|
.getCurrentAppAttempt().getPendingRelease().size(), 0);
|
||||||
|
} finally {
|
||||||
|
if (rm1 != null) {
|
||||||
|
rm1.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void verifyMaximumResourceCapability(
|
private void verifyMaximumResourceCapability(
|
||||||
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {
|
Resource expectedMaximumResource, AbstractYarnScheduler scheduler) {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue