YARN-292. Fixed FifoScheduler and FairScheduler to make their applications data structures thread safe to avoid RM crashing with ArrayIndexOutOfBoundsException. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1521328 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-09-10 01:24:45 +00:00
parent 0f91d8485a
commit 59b5490989
8 changed files with 117 additions and 14 deletions

View File

@ -176,6 +176,10 @@ Release 2.1.1-beta - UNRELEASED
invalid client token key errors when an appliation is about to finish.
(Jason Lowe via vinodkv)
YARN-292. Fixed FifoScheduler and FairScheduler to make their applications
data structures thread safe to avoid RM crashing with
ArrayIndexOutOfBoundsException. (Zhijie Shen via vinodkv)
Release 2.1.0-beta - 2013-08-22
INCOMPATIBLE CHANGES

View File

@ -833,7 +833,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
Allocation amContainerAllocation = appAttempt.scheduler.allocate(
appAttempt.applicationAttemptId, EMPTY_CONTAINER_REQUEST_LIST,
EMPTY_CONTAINER_RELEASE_LIST, null, null);
// There must be at least one container allocated, because a
// CONTAINER_ALLOCATED is emitted after an RMContainer is constructed,
// and is put in SchedulerApplication#newlyAllocatedContainers. Then,
// YarnScheduler#allocate will fetch it.
assert amContainerAllocation.getContainers().size() != 0;
// Set the masterContainer
appAttempt.setMasterContainer(amContainerAllocation.getContainers().get(
0));

View File

@ -80,6 +80,8 @@ import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
@ -179,7 +181,8 @@ public class CapacityScheduler
private Resource minimumAllocation;
private Resource maximumAllocation;
private Map<ApplicationAttemptId, FiCaSchedulerApp> applications =
@VisibleForTesting
protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications =
new ConcurrentHashMap<ApplicationAttemptId, FiCaSchedulerApp>();
private boolean initialized = false;

View File

@ -37,7 +37,6 @@ import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
@ -50,6 +49,7 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
@ -154,8 +154,9 @@ public class FairScheduler implements ResourceScheduler {
// This stores per-application scheduling information, indexed by
// attempt ID's for fast lookup.
@VisibleForTesting
protected Map<ApplicationAttemptId, FSSchedulerApp> applications =
new HashMap<ApplicationAttemptId, FSSchedulerApp>();
new ConcurrentHashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
private Map<NodeId, FSSchedulerNode> nodes =

View File

@ -25,8 +25,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -36,12 +36,10 @@ import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@ -50,7 +48,9 @@ import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.Token;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
@ -90,6 +90,8 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
@ -113,8 +115,10 @@ public class FifoScheduler implements ResourceScheduler, Configurable {
private Resource maximumAllocation;
private boolean usePortForNodeName;
private Map<ApplicationAttemptId, FiCaSchedulerApp> applications
= new TreeMap<ApplicationAttemptId, FiCaSchedulerApp>();
// Use ConcurrentSkipListMap because applications need to be ordered
@VisibleForTesting
protected Map<ApplicationAttemptId, FiCaSchedulerApp> applications
= new ConcurrentSkipListMap<ApplicationAttemptId, FiCaSchedulerApp>();
private ActiveUsersManager activeUsersManager;

View File

@ -19,22 +19,27 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
@ -44,19 +49,24 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.AsyncDispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.Application;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
@ -525,4 +535,63 @@ public class TestCapacityScheduler {
assertTrue(appComparator.compare(app2, app3) < 0);
}
@Test
public void testConcurrentAccessOnApplications() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
verifyConcurrentAccessOnApplications(
cs.applications, FiCaSchedulerApp.class);
}
public static <T extends SchedulerApplication>
void verifyConcurrentAccessOnApplications(
final Map<ApplicationAttemptId, T> applications, Class<T> clazz)
throws Exception {
final int size = 10000;
final ApplicationId appId = ApplicationId.newInstance(0, 0);
final Constructor<T> ctor = clazz.getDeclaredConstructor(
ApplicationAttemptId.class, String.class, Queue.class,
ActiveUsersManager.class, RMContext.class);
ApplicationAttemptId appAttemptId0
= ApplicationAttemptId.newInstance(appId, 0);
applications.put(appAttemptId0, ctor.newInstance(
appAttemptId0, null, mock(Queue.class), null, null));
assertNotNull(applications.get(appAttemptId0));
// Imitating the thread of scheduler that will add and remove apps
final AtomicBoolean finished = new AtomicBoolean(false);
final AtomicBoolean failed = new AtomicBoolean(false);
Thread t = new Thread() {
@Override
public void run() {
for (int i = 1; i <= size; ++i) {
ApplicationAttemptId appAttemptId
= ApplicationAttemptId.newInstance(appId, i);
try {
applications.put(appAttemptId, ctor.newInstance(
appAttemptId, null, mock(Queue.class), null, null));
} catch (Exception e) {
failed.set(true);
finished.set(true);
return;
}
}
for (int i = 1; i <= size; ++i) {
ApplicationAttemptId appAttemptId
= ApplicationAttemptId.newInstance(appId, i);
applications.remove(appAttemptId);
}
finished.set(true);
}
};
t.start();
// Imitating the thread of rmappattempt that will get the app
while (!finished.get()) {
assertNotNull(applications.get(appAttemptId0));
}
assertFalse(failed.get());
}
}

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
@ -79,6 +80,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -2196,4 +2198,11 @@ public class TestFairScheduler {
assertEquals(1, app.getLiveContainers().size());
}
@Test
public void testConcurrentAccessOnApplications() throws Exception {
FairScheduler fs = new FairScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications(
fs.applications, FSSchedulerApp.class);
}
}

View File

@ -51,13 +51,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
@ -415,6 +417,13 @@ public class TestFifoScheduler {
LOG.info("--- END: testFifoScheduler ---");
}
@Test
public void testConcurrentAccessOnApplications() throws Exception {
FifoScheduler fs = new FifoScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications(
fs.applications, FiCaSchedulerApp.class);
}
private void checkApplicationResourceUsage(int expected,
Application application) {
Assert.assertEquals(expected, application.getUsedResources().getMemory());