From 005e1df540aeccbeb371f3bab834b140d85f0ec5 Mon Sep 17 00:00:00 2001 From: Vinayakumar B Date: Tue, 17 Feb 2015 14:55:56 +0530 Subject: [PATCH] HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned to SERVICE_NOT_RESPONDING (Contributed by Ming Ma) (cherry picked from commit cf4b7f506dd338ecf2ed4c643b6a6a334e070fca) --- .../hadoop-common/CHANGES.txt | 3 + .../org/apache/hadoop/ha/HealthMonitor.java | 35 ++++++--- .../org/apache/hadoop/ha/DummyHAService.java | 73 +++++++++++++++++-- .../apache/hadoop/ha/TestHealthMonitor.java | 4 +- 4 files changed, 94 insertions(+), 21 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ff711621212..f3ab6f782e6 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -537,6 +537,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11467. KerberosAuthenticator can connect to a non-secure cluster. (yzhangal via rkanter) + HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned + to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java index 0d144442504..8c8762938e4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java @@ -30,6 +30,7 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HealthCheckFailedException; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.util.Daemon; @@ -201,18 +202,20 @@ private void doHealthChecks() throws InterruptedException { status = proxy.getServiceStatus(); proxy.monitorHealth(); healthy = true; - } catch (HealthCheckFailedException e) { - LOG.warn("Service health check failed for " + targetToMonitor - + ": " + e.getMessage()); - enterState(State.SERVICE_UNHEALTHY); } catch (Throwable t) { - LOG.warn("Transport-level exception trying to monitor health of " + - targetToMonitor + ": " + t.getLocalizedMessage()); - RPC.stopProxy(proxy); - proxy = null; - enterState(State.SERVICE_NOT_RESPONDING); - Thread.sleep(sleepAfterDisconnectMillis); - return; + if (isHealthCheckFailedException(t)) { + LOG.warn("Service health check failed for " + targetToMonitor + + ": " + t.getMessage()); + enterState(State.SERVICE_UNHEALTHY); + } else { + LOG.warn("Transport-level exception trying to monitor health of " + + targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage()); + RPC.stopProxy(proxy); + proxy = null; + enterState(State.SERVICE_NOT_RESPONDING); + Thread.sleep(sleepAfterDisconnectMillis); + return; + } } if (status != null) { @@ -225,7 +228,15 @@ private void doHealthChecks() throws InterruptedException { Thread.sleep(checkIntervalMillis); } } - + + private boolean isHealthCheckFailedException(Throwable t) { + return ((t instanceof HealthCheckFailedException) || + (t instanceof RemoteException && + ((RemoteException)t).unwrapRemoteException( + HealthCheckFailedException.class) instanceof + HealthCheckFailedException)); + } + private synchronized void setLastServiceStatus(HAServiceStatus status) { this.lastServiceState = status; for (ServiceStateCallback cb : serviceStateCallbacks) { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java index e9189e27243..aef6c4da282 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java @@ -22,15 +22,25 @@ import java.net.InetSocketAddress; import java.util.ArrayList; +import com.google.protobuf.BlockingService; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; +import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.mockito.Mockito; import com.google.common.collect.Lists; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT; + /** * Test-only implementation of {@link HAServiceTarget}, which returns * a mock implementation. @@ -50,22 +60,33 @@ class DummyHAService extends HAServiceTarget { DummySharedResource sharedResource; public int fenceCount = 0; public int activeTransitionCount = 0; + boolean testWithProtoBufRPC = false; static ArrayList instances = Lists.newArrayList(); int index; DummyHAService(HAServiceState state, InetSocketAddress address) { + this(state, address, false); + } + + DummyHAService(HAServiceState state, InetSocketAddress address, + boolean testWithProtoBufRPC) { this.state = state; - this.proxy = makeMock(); + this.testWithProtoBufRPC = testWithProtoBufRPC; + if (testWithProtoBufRPC) { + this.address = startAndGetRPCServerAddress(address); + } else { + this.address = address; + } + Configuration conf = new Configuration(); + this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT); try { - Configuration conf = new Configuration(); - conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName()); + conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName()); this.fencer = Mockito.spy( NodeFencer.create(conf, DUMMY_FENCE_KEY)); } catch (BadFencingConfigurationException e) { throw new RuntimeException(e); } - this.address = address; synchronized (instances) { instances.add(this); this.index = instances.size(); @@ -75,9 +96,42 @@ class DummyHAService extends HAServiceTarget { public void setSharedResource(DummySharedResource rsrc) { this.sharedResource = rsrc; } - - private HAServiceProtocol makeMock() { - return Mockito.spy(new MockHAProtocolImpl()); + + private InetSocketAddress startAndGetRPCServerAddress(InetSocketAddress serverAddress) { + Configuration conf = new Configuration(); + + try { + RPC.setProtocolEngine(conf, + HAServiceProtocolPB.class, ProtobufRpcEngine.class); + HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator = + new HAServiceProtocolServerSideTranslatorPB(new MockHAProtocolImpl()); + BlockingService haPbService = HAServiceProtocolService + .newReflectiveBlockingService(haServiceProtocolXlator); + + Server server = new RPC.Builder(conf) + .setProtocol(HAServiceProtocolPB.class) + .setInstance(haPbService) + .setBindAddress(serverAddress.getHostName()) + .setPort(serverAddress.getPort()).build(); + server.start(); + return NetUtils.getConnectAddress(server); + } catch (IOException e) { + return null; + } + } + + private HAServiceProtocol makeMock(Configuration conf, int timeoutMs) { + HAServiceProtocol service; + if (!testWithProtoBufRPC) { + service = new MockHAProtocolImpl(); + } else { + try { + service = super.getProxy(conf, timeoutMs); + } catch (IOException e) { + return null; + } + } + return Mockito.spy(service); } @Override @@ -93,6 +147,9 @@ public InetSocketAddress getZKFCAddress() { @Override public HAServiceProtocol getProxy(Configuration conf, int timeout) throws IOException { + if (testWithProtoBufRPC) { + proxy = makeMock(conf, timeout); + } return proxy; } @@ -168,7 +225,7 @@ public void transitionToStandby(StateChangeRequestInfo req) throws ServiceFailed public HAServiceStatus getServiceStatus() throws IOException { checkUnreachable(); HAServiceStatus ret = new HAServiceStatus(state); - if (state == HAServiceState.STANDBY) { + if (state == HAServiceState.STANDBY || state == HAServiceState.ACTIVE) { ret.setReadyToBecomeActive(); } return ret; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java index db534dec5bb..b58793feae4 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java @@ -20,6 +20,7 @@ import static org.junit.Assert.*; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -54,7 +55,8 @@ public void setupHM() throws InterruptedException, IOException { conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); - svc = new DummyHAService(HAServiceState.ACTIVE, null); + svc = new DummyHAService(HAServiceState.ACTIVE, + new InetSocketAddress("0.0.0.0", 0), true); hm = new HealthMonitor(conf, svc) { @Override protected HAServiceProtocol createProxy() throws IOException {