From 7a241aee90f042672fc30ba402ed9dff912bc535 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Tue, 29 Apr 2014 19:49:44 +0000 Subject: [PATCH] YARN-1929. Fixed a deadlock in ResourceManager that occurs when failover happens right at the time of shutdown. Contributed by Karthik Kambatla. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1591071 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/service/CompositeService.java | 3 +- hadoop-yarn-project/CHANGES.txt | 3 + .../server/resourcemanager/AdminService.java | 6 +- .../EmbeddedElectorService.java | 16 +- .../TestRMEmbeddedElector.java | 138 ++++++++++++++++++ 5 files changed, 153 insertions(+), 13 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java index ca667e2bcf4..51cb4a336d6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/service/CompositeService.java @@ -141,8 +141,7 @@ public class CompositeService extends AbstractService { * @throws RuntimeException the first exception raised during the * stop process -after all services are stopped */ - private synchronized void stop(int numOfServicesStarted, - boolean stopOnlyStartedServices) { + private void stop(int numOfServicesStarted, boolean stopOnlyStartedServices) { // stop in reverse order of start Exception firstException = null; List services = getServices(); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 7a9474c5991..e900c7a53f1 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -164,6 +164,9 @@ Release 2.4.1 - UNRELEASED YARN-1975. Used resources shows escaped html in CapacityScheduler and FairScheduler page (Mit Desai via jlowe) + YARN-1929. Fixed a deadlock in ResourceManager that occurs when failover + happens right at the time of shutdown. (Karthik Kambatla via vinodkv) + Release 2.4.0 - 2014-04-07 INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java index c5b26513192..6d521d479b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/AdminService.java @@ -101,7 +101,7 @@ public class AdminService extends CompositeService implements } @Override - public synchronized void serviceInit(Configuration conf) throws Exception { + public void serviceInit(Configuration conf) throws Exception { if (rmContext.isHAEnabled()) { autoFailoverEnabled = HAUtil.isAutomaticFailoverEnabled(conf); if (autoFailoverEnabled) { @@ -123,13 +123,13 @@ public class AdminService extends CompositeService implements } @Override - protected synchronized void serviceStart() throws Exception { + protected void serviceStart() throws Exception { startServer(); super.serviceStart(); } @Override - protected synchronized void serviceStop() throws Exception { + protected void serviceStop() throws Exception { stopServer(); super.serviceStop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java index 3cd986cf34d..618f83dd02d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/EmbeddedElectorService.java @@ -61,7 +61,7 @@ public class EmbeddedElectorService extends AbstractService } @Override - protected synchronized void serviceInit(Configuration conf) + protected void serviceInit(Configuration conf) throws Exception { conf = conf instanceof YarnConfiguration ? conf : new YarnConfiguration(conf); @@ -102,20 +102,20 @@ public class EmbeddedElectorService extends AbstractService } @Override - protected synchronized void serviceStart() throws Exception { + protected void serviceStart() throws Exception { elector.joinElection(localActiveNodeInfo); super.serviceStart(); } @Override - protected synchronized void serviceStop() throws Exception { + protected void serviceStop() throws Exception { elector.quitElection(false); elector.terminateConnection(); super.serviceStop(); } @Override - public synchronized void becomeActive() throws ServiceFailedException { + public void becomeActive() throws ServiceFailedException { try { rmContext.getRMAdminService().transitionToActive(req); } catch (Exception e) { @@ -124,7 +124,7 @@ public class EmbeddedElectorService extends AbstractService } @Override - public synchronized void becomeStandby() { + public void becomeStandby() { try { rmContext.getRMAdminService().transitionToStandby(req); } catch (Exception e) { @@ -143,13 +143,13 @@ public class EmbeddedElectorService extends AbstractService @SuppressWarnings(value = "unchecked") @Override - public synchronized void notifyFatalError(String errorMessage) { + public void notifyFatalError(String errorMessage) { rmContext.getDispatcher().getEventHandler().handle( new RMFatalEvent(RMFatalEventType.EMBEDDED_ELECTOR_FAILED, errorMessage)); } @Override - public synchronized void fenceOldActive(byte[] oldActiveData) { + public void fenceOldActive(byte[] oldActiveData) { if (LOG.isDebugEnabled()) { LOG.debug("Request to fence old active being ignored, " + "as embedded leader election doesn't support fencing"); @@ -166,7 +166,7 @@ public class EmbeddedElectorService extends AbstractService .toByteArray(); } - private synchronized boolean isParentZnodeSafe(String clusterId) + private boolean isParentZnodeSafe(String clusterId) throws InterruptedException, IOException, KeeperException { byte[] data; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java new file mode 100644 index 00000000000..0d9ee6d0455 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMEmbeddedElector.java @@ -0,0 +1,138 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ClientBaseWithFixes; +import org.apache.hadoop.ha.ServiceFailedException; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicBoolean; + +public class TestRMEmbeddedElector extends ClientBaseWithFixes { + private static final Log LOG = + LogFactory.getLog(TestRMEmbeddedElector.class.getName()); + + private static final String RM1_NODE_ID = "rm1"; + private static final int RM1_PORT_BASE = 10000; + private static final String RM2_NODE_ID = "rm2"; + private static final int RM2_PORT_BASE = 20000; + + private Configuration conf; + private AtomicBoolean callbackCalled; + + private void setConfForRM(String rmId, String prefix, String value) { + conf.set(HAUtil.addSuffix(prefix, rmId), value); + } + + private void setRpcAddressForRM(String rmId, int base) { + setConfForRM(rmId, YarnConfiguration.RM_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_SCHEDULER_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_SCHEDULER_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_ADMIN_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_ADMIN_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_RESOURCE_TRACKER_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_RESOURCE_TRACKER_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_WEBAPP_PORT)); + setConfForRM(rmId, YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "0.0.0.0:" + + (base + YarnConfiguration.DEFAULT_RM_WEBAPP_HTTPS_PORT)); + } + + @Before + public void setup() throws IOException { + conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, true); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_ENABLED, true); + conf.setBoolean(YarnConfiguration.AUTO_FAILOVER_EMBEDDED, true); + conf.set(YarnConfiguration.RM_CLUSTER_ID, "yarn-test-cluster"); + conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); + conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, 2000); + + conf.set(YarnConfiguration.RM_HA_IDS, RM1_NODE_ID + "," + RM2_NODE_ID); + conf.set(YarnConfiguration.RM_HA_ID, RM1_NODE_ID); + setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE); + setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE); + + conf.setLong(YarnConfiguration.CLIENT_FAILOVER_SLEEPTIME_BASE_MS, 100L); + + callbackCalled = new AtomicBoolean(false); + } + + /** + * Test that tries to see if there is a deadlock between + * (a) the thread stopping the RM + * (b) thread processing the ZK event asking RM to transition to active + * + * The test times out if there is a deadlock. + */ + @Test (timeout = 10000) + public void testDeadlockShutdownBecomeActive() throws InterruptedException { + MockRM rm = new MockRMWithElector(conf, 1000); + rm.start(); + LOG.info("Waiting for callback"); + while (!callbackCalled.get()); + LOG.info("Stopping RM"); + rm.stop(); + LOG.info("Stopped RM"); + } + + private class MockRMWithElector extends MockRM { + private long delayMs = 0; + + MockRMWithElector(Configuration conf) { + super(conf); + } + + MockRMWithElector(Configuration conf, long delayMs) { + this(conf); + this.delayMs = delayMs; + } + + @Override + protected AdminService createAdminService() { + return new AdminService(MockRMWithElector.this, getRMContext()) { + @Override + protected EmbeddedElectorService createEmbeddedElectorService() { + return new EmbeddedElectorService(getRMContext()) { + @Override + public void becomeActive() throws + ServiceFailedException { + try { + callbackCalled.set(true); + LOG.info("Callback called. Sleeping now"); + Thread.sleep(delayMs); + LOG.info("Sleep done"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + super.becomeActive(); + } + }; + } + }; + } + } +}