YARN-5556. CapacityScheduler: Support deleting queues without requiring a RM restart. (Naganarasimha G R via wangda)

(cherry picked from commit 72054a817d)
This commit is contained in:
Wangda Tan 2017-01-18 13:26:48 -08:00
parent e2b3eff641
commit 8d6fa4569b
4 changed files with 277 additions and 62 deletions

View File

@ -144,7 +144,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@LimitedPrivate("yarn")
@Evolving
@SuppressWarnings("unchecked")
@ -415,7 +414,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext)
} catch (Throwable t) {
this.conf = oldConf;
refreshMaximumAllocation(this.conf.getMaximumAllocation());
throw new IOException("Failed to re-init queues", t);
throw new IOException("Failed to re-init queues : "+ t.getMessage(), t);
}
// update lazy preemption

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@ -27,6 +26,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -34,6 +34,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.Permission;
@ -45,6 +46,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import com.google.common.annotations.VisibleForTesting;
/**
*
* Context of the Queues in Capacity Scheduler.
@ -164,11 +167,11 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
CSQueue newRoot = parseQueue(this.csContext, newConf, null,
CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
// Ensure all existing queues are still present
validateExistingQueues(queues, newQueues);
// Ensure queue hiearchy in the new XML file is proper.
validateQueueHierarchy(queues, newQueues);
// Add new queues
addNewQueues(queues, newQueues);
// Add new queues and delete OldQeueus only after validation.
updateQueues(queues, newQueues);
// Re-configure queues
root.reinitialize(newRoot, this.csContext.getClusterResource());
@ -261,13 +264,14 @@ static CSQueue parseQueue(
}
/**
* Ensure all existing queues are present. Queues cannot be deleted
* Ensure all existing queues are present. Queues cannot be deleted if its not
* in Stopped state, Queue's cannot be moved from one hierarchy to other also.
*
* @param queues existing queues
* @param newQueues new queues
*/
private void validateExistingQueues(
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
throws IOException {
private void validateQueueHierarchy(Map<String, CSQueue> queues,
Map<String, CSQueue> newQueues) throws IOException {
// check that all static queues are included in the newQueues list
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
if (!(e.getValue() instanceof ReservationQueue)) {
@ -275,8 +279,18 @@ private void validateExistingQueues(
CSQueue oldQueue = e.getValue();
CSQueue newQueue = newQueues.get(queueName);
if (null == newQueue) {
throw new IOException(queueName + " cannot be found during refresh!");
// old queue doesn't exist in the new XML
if (oldQueue.getState() == QueueState.STOPPED) {
LOG.info("Deleting Queue " + queueName + ", as it is not"
+ " present in the modified capacity configuration xml");
} else {
throw new IOException(oldQueue.getQueuePath() + " is deleted from"
+ " the new capacity scheduler configuration, but the"
+ " queue is not yet in stopped state. "
+ "Current State : " + oldQueue.getState());
}
} else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
//Queue's cannot be moved from one hierarchy to other
throw new IOException(queueName + " is moved from:"
+ oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
+ " after refresh, which is not allowed.");
@ -286,18 +300,25 @@ private void validateExistingQueues(
}
/**
* Add the new queues (only) to our list of queues...
* ... be careful, do not overwrite existing queues.
* @param queues the existing queues
* @param newQueues the new queues
* Updates to our list of queues: Adds the new queues and deletes the removed
* ones... be careful, do not overwrite existing queues.
*
* @param existingQueues, the existing queues
* @param newQueues the new queues based on new XML
*/
private void addNewQueues(
Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) {
private void updateQueues(Map<String, CSQueue> existingQueues,
Map<String, CSQueue> newQueues) {
for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
String queueName = e.getKey();
CSQueue queue = e.getValue();
if (!queues.containsKey(queueName)) {
queues.put(queueName, queue);
if (!existingQueues.containsKey(queueName)) {
existingQueues.put(queueName, queue);
}
}
for (Map.Entry<String, CSQueue> e : existingQueues.entrySet()) {
String queueName = e.getKey();
if (!newQueues.containsKey(queueName)) {
existingQueues.remove(queueName);
}
}
}

View File

@ -18,6 +18,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -42,9 +54,14 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.AllocationState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
@ -56,18 +73,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSetUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
@Private
@Evolving
public class ParentQueue extends AbstractCSQueue {
@ -317,6 +322,14 @@ public void reinitialize(CSQueue newlyParsedQueue,
}
}
// remove the deleted queue in the refreshed xml.
for (Map.Entry<String, CSQueue> e : currentChildQueues.entrySet()) {
String queueName = e.getKey();
if (!newChildQueues.containsKey(queueName)) {
currentChildQueues.remove(queueName);
}
}
// Re-sort all queues
childQueues.clear();
childQueues.addAll(currentChildQueues.values());

View File

@ -18,26 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -114,13 +97,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeResourceUpdateEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.
ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNodeReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestSchedulerUtils;
@ -130,6 +109,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@ -143,6 +123,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerLeafQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
@ -153,9 +134,26 @@
import org.junit.Test;
import org.mockito.Mockito;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class TestCapacityScheduler {
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@ -391,7 +389,17 @@ private void nodeUpdate(
NodeUpdateSchedulerEvent nodeUpdate = new NodeUpdateSchedulerEvent(node);
resourceManager.getResourceScheduler().handle(nodeUpdate);
}
/**
* @param conf
* @return
* root
* / \
* a b
* / \ / | \
* a1 a2 b1 b2 b3
*
*/
private CapacitySchedulerConfiguration setupQueueConfiguration(
CapacitySchedulerConfiguration conf) {
@ -420,6 +428,67 @@ private CapacitySchedulerConfiguration setupQueueConfiguration(
return conf;
}
/**
* @param conf, to be modified
* @return, CS configuration which has deleted a queue(b1)
* root
* / \
* a b
* / \ | \
* a1 a2 b2 b3
*/
private CapacitySchedulerConfiguration setupQueueConfigurationWithOutB1(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
conf.setCapacity(A, A_CAPACITY);
conf.setCapacity(B, B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[] { "a1", "a2" });
conf.setCapacity(A1, A1_CAPACITY);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, A2_CAPACITY);
conf.setUserLimitFactor(A2, 100.0f);
conf.setQueues(B, new String[] { "b2", "b3" });
conf.setCapacity(B2, B2_CAPACITY + B1_CAPACITY); //as B1 is deleted
conf.setUserLimitFactor(B2, 100.0f);
conf.setCapacity(B3, B3_CAPACITY);
conf.setUserLimitFactor(B3, 100.0f);
LOG.info("Setup top-level queues a and b (without b3)");
return conf;
}
/**
* @param conf, to be modified
* @return, CS configuration which has deleted a
* Parent queue(b)
*/
private CapacitySchedulerConfiguration setupQueueConfigurationWithOutB(
CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a" });
conf.setCapacity(A, A_CAPACITY + B_CAPACITY);
// Define 2nd-level queues
conf.setQueues(A, new String[] { "a1", "a2" });
conf.setCapacity(A1, A1_CAPACITY);
conf.setUserLimitFactor(A1, 100.0f);
conf.setCapacity(A2, A2_CAPACITY);
conf.setUserLimitFactor(A2, 100.0f);
LOG.info("Setup top-level queues a");
return conf;
}
private CapacitySchedulerConfiguration setupBlockedQueueConfiguration(
CapacitySchedulerConfiguration conf) {
@ -3753,4 +3822,117 @@ protected RMNodeLabelsManager createNodeLabelManager() {
Assert.assertArrayEquals(new int[][] { { 1, 0, 0 }, { 0, 1, 0 }, { 0, 0, 1 } },
attemptMetrics.getLocalityStatistics());
}
/**
* Test for queue deletion.
* @throws Exception
*/
@Test
public void testRefreshQueuesWithQueueDelete() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
RMContextImpl rmContext = new RMContextImpl(null, null, null, null, null,
null, new RMContainerTokenSecretManager(conf),
new NMTokenSecretManagerInRM(conf),
new ClientToAMTokenSecretManagerInRM(), null);
setupQueueConfiguration(conf);
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
cs.init(conf);
cs.start();
cs.reinitialize(conf, rmContext);
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
// test delete leaf queue when there is application running.
Map<String, CSQueue> queues =
cs.getCapacitySchedulerQueueManager().getQueues();
String b1QTobeDeleted = "b1";
LeafQueue csB1Queue = Mockito.spy((LeafQueue) queues.get(b1QTobeDeleted));
when(csB1Queue.getState()).thenReturn(QueueState.DRAINING)
.thenReturn(QueueState.STOPPED);
queues.put(b1QTobeDeleted, csB1Queue);
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithOutB1(conf);
try {
cs.reinitialize(conf, mockContext);
fail("Expected to throw exception when refresh queue tries to delete a"
+ " queue with running apps");
} catch (IOException e) {
// ignore
}
// test delete leaf queue(root.b.b1) when there is no application running.
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithOutB1(conf);
try {
cs.reinitialize(conf, mockContext);
} catch (IOException e) {
fail("Expected to NOT throw exception when refresh queue tries to delete"
+ " a queue WITHOUT running apps");
}
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueB = findQueue(rootQueue, B);
CSQueue queueB3 = findQueue(queueB, B1);
assertNull("Refresh needs to support delete of leaf queue ", queueB3);
// reset back to default configuration for testing parent queue delete
conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
cs.reinitialize(conf, rmContext);
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
// set the configurations such that it fails once but should be successfull
// next time
queues = cs.getCapacitySchedulerQueueManager().getQueues();
CSQueue bQueue = Mockito.spy((ParentQueue) queues.get("b"));
when(bQueue.getState()).thenReturn(QueueState.DRAINING)
.thenReturn(QueueState.STOPPED);
queues.put("b", bQueue);
bQueue = Mockito.spy((LeafQueue) queues.get("b1"));
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
queues.put("b1", bQueue);
bQueue = Mockito.spy((LeafQueue) queues.get("b2"));
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
queues.put("b2", bQueue);
bQueue = Mockito.spy((LeafQueue) queues.get("b3"));
when(bQueue.getState()).thenReturn(QueueState.STOPPED);
queues.put("b3", bQueue);
// test delete Parent queue when there is application running.
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithOutB(conf);
try {
cs.reinitialize(conf, mockContext);
fail("Expected to throw exception when refresh queue tries to delete a"
+ " parent queue with running apps in children queue");
} catch (IOException e) {
// ignore
}
// test delete Parent queue when there is no application running.
conf = new CapacitySchedulerConfiguration();
setupQueueConfigurationWithOutB(conf);
try {
cs.reinitialize(conf, mockContext);
} catch (IOException e) {
fail("Expected to not throw exception when refresh queue tries to delete"
+ " a queue without running apps");
}
rootQueue = cs.getRootQueue();
queueB = findQueue(rootQueue, B);
String message =
"Refresh needs to support delete of Parent queue and its children.";
assertNull(message, queueB);
assertNull(message,
cs.getCapacitySchedulerQueueManager().getQueues().get("b"));
assertNull(message,
cs.getCapacitySchedulerQueueManager().getQueues().get("b1"));
assertNull(message,
cs.getCapacitySchedulerQueueManager().getQueues().get("b2"));
cs.stop();
}
}