diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 9ab0273ebd9..6a4932da132 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -96,6 +96,8 @@ Release 0.23.3 - UNRELEASED HADOOP-8183. Stop using "mapred.used.genericoptions.parser" (harsh) + HADOOP-7788. Add simple HealthMonitor class to watch an HAService (todd) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index c2a6479dd20..54478c87f8a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -121,5 +121,29 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { "hadoop.security.token.service.use_ip"; public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT = true; + + /** + * HA health monitor and failover controller. + */ + + /** How often to retry connecting to the service. */ + public static final String HA_HM_CONNECT_RETRY_INTERVAL_KEY = + "ha.health-monitor.connect-retry-interval.ms"; + public static final long HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT = 1000; + + /* How often to check the service. */ + public static final String HA_HM_CHECK_INTERVAL_KEY = + "ha.health-monitor.check-interval.ms"; + public static final long HA_HM_CHECK_INTERVAL_DEFAULT = 1000; + + /* How long to sleep after an unexpected RPC error. */ + public static final String HA_HM_SLEEP_AFTER_DISCONNECT_KEY = + "ha.health-monitor.sleep-after-disconnect.ms"; + public static final long HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT = 1000; + + /* Timeout for the actual monitorHealth() calls. */ + public static final String HA_HM_RPC_TIMEOUT_KEY = + "ha.health-monitor.rpc-timeout.ms"; + public static final int HA_HM_RPC_TIMEOUT_DEFAULT = 45000; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java new file mode 100644 index 00000000000..9d42d970df1 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java @@ -0,0 +1,319 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collections; +import java.util.LinkedList; +import java.util.List; + +import javax.net.SocketFactory; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import static org.apache.hadoop.fs.CommonConfigurationKeys.*; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HealthCheckFailedException; +import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.util.Daemon; + +import com.google.common.base.Preconditions; + +/** + * This class is a daemon which runs in a loop, periodically heartbeating + * with an HA service. It is responsible for keeping track of that service's + * health and exposing callbacks to the failover controller when the health + * status changes. + * + * Classes which need callbacks should implement the {@link Callback} + * interface. + */ +class HealthMonitor { + private static final Log LOG = LogFactory.getLog( + HealthMonitor.class); + + private Daemon daemon; + private long connectRetryInterval; + private long checkIntervalMillis; + private long sleepAfterDisconnectMillis; + + private int rpcTimeout; + + private volatile boolean shouldRun = true; + + /** The connected proxy */ + private HAServiceProtocol proxy; + + /** The address running the HA Service */ + private final InetSocketAddress addrToMonitor; + + private final Configuration conf; + + private State state = State.INITIALIZING; + + /** + * Listeners for state changes + */ + private List callbacks = Collections.synchronizedList( + new LinkedList()); + + private HAServiceState lastServiceState = HAServiceState.INITIALIZING; + + enum State { + /** + * The health monitor is still starting up. + */ + INITIALIZING, + + /** + * The service is not responding to health check RPCs. + */ + SERVICE_NOT_RESPONDING, + + /** + * The service is connected and healthy. + */ + SERVICE_HEALTHY, + + /** + * The service is running but unhealthy. + */ + SERVICE_UNHEALTHY, + + /** + * The health monitor itself failed unrecoverably and can + * no longer provide accurate information. + */ + HEALTH_MONITOR_FAILED; + } + + + HealthMonitor(Configuration conf, InetSocketAddress addrToMonitor) { + this.conf = conf; + this.addrToMonitor = addrToMonitor; + + this.sleepAfterDisconnectMillis = conf.getLong( + HA_HM_SLEEP_AFTER_DISCONNECT_KEY, + HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT); + this.checkIntervalMillis = conf.getLong( + HA_HM_CHECK_INTERVAL_KEY, + HA_HM_CHECK_INTERVAL_DEFAULT); + this.connectRetryInterval = conf.getLong( + HA_HM_CONNECT_RETRY_INTERVAL_KEY, + HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT); + this.rpcTimeout = conf.getInt( + HA_HM_RPC_TIMEOUT_KEY, + HA_HM_RPC_TIMEOUT_DEFAULT); + + this.daemon = new MonitorDaemon(); + } + + public void addCallback(Callback cb) { + this.callbacks.add(cb); + } + + public void removeCallback(Callback cb) { + callbacks.remove(cb); + } + + public void shutdown() { + LOG.info("Stopping HealthMonitor thread"); + shouldRun = false; + daemon.interrupt(); + } + + /** + * @return the current proxy object to the underlying service. + * Note that this may return null in the case that the service + * is not responding. Also note that, even if the last indicated + * state is healthy, the service may have gone down in the meantime. + */ + public synchronized HAServiceProtocol getProxy() { + return proxy; + } + + private void loopUntilConnected() throws InterruptedException { + tryConnect(); + while (proxy == null) { + Thread.sleep(connectRetryInterval); + tryConnect(); + } + assert proxy != null; + } + + private void tryConnect() { + Preconditions.checkState(proxy == null); + + try { + synchronized (this) { + proxy = createProxy(); + } + } catch (IOException e) { + LOG.warn("Could not connect to local service at " + addrToMonitor + + ": " + e.getMessage()); + proxy = null; + enterState(State.SERVICE_NOT_RESPONDING); + } + } + + /** + * Connect to the service to be monitored. Stubbed out for easier testing. + */ + protected HAServiceProtocol createProxy() throws IOException { + SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf); + return new HAServiceProtocolClientSideTranslatorPB( + addrToMonitor, + conf, socketFactory, rpcTimeout); + } + + private void doHealthChecks() throws InterruptedException { + while (shouldRun) { + HAServiceState state = null; + boolean healthy = false; + try { + state = proxy.getServiceState(); + proxy.monitorHealth(); + healthy = true; + } catch (HealthCheckFailedException e) { + LOG.warn("Service health check failed: " + e.getMessage()); + enterState(State.SERVICE_UNHEALTHY); + } catch (Throwable t) { + LOG.warn("Transport-level exception trying to monitor health of " + + addrToMonitor + ": " + t.getLocalizedMessage()); + RPC.stopProxy(proxy); + proxy = null; + enterState(State.SERVICE_NOT_RESPONDING); + Thread.sleep(sleepAfterDisconnectMillis); + return; + } + + if (state != null) { + setLastServiceState(state); + } + if (healthy) { + enterState(State.SERVICE_HEALTHY); + } + + Thread.sleep(checkIntervalMillis); + } + } + + private synchronized void setLastServiceState(HAServiceState serviceState) { + this.lastServiceState = serviceState; + } + + private synchronized void enterState(State newState) { + if (newState != state) { + LOG.info("Entering state " + newState); + state = newState; + synchronized (callbacks) { + for (Callback cb : callbacks) { + cb.enteredState(newState); + } + } + } + } + + synchronized State getHealthState() { + return state; + } + + synchronized HAServiceState getLastServiceState() { + return lastServiceState; + } + + boolean isAlive() { + return daemon.isAlive(); + } + + void join() throws InterruptedException { + daemon.join(); + } + + void start() { + daemon.start(); + } + + private class MonitorDaemon extends Daemon { + private MonitorDaemon() { + super(); + setName("Health Monitor for " + addrToMonitor); + setUncaughtExceptionHandler(new UncaughtExceptionHandler() { + @Override + public void uncaughtException(Thread t, Throwable e) { + LOG.fatal("Health monitor failed", e); + enterState(HealthMonitor.State.HEALTH_MONITOR_FAILED); + } + }); + } + + @Override + public void run() { + while (shouldRun) { + try { + loopUntilConnected(); + doHealthChecks(); + } catch (InterruptedException ie) { + Preconditions.checkState(!shouldRun, + "Interrupted but still supposed to run"); + } + } + } + } + + /** + * Callback interface for state change events. + * + * This interface is called from a single thread which also performs + * the health monitoring. If the callback processing takes a long time, + * no further health checks will be made during this period, nor will + * other registered callbacks be called. + * + * If the callback itself throws an unchecked exception, no other + * callbacks following it will be called, and the health monitor + * will terminate, entering HEALTH_MONITOR_FAILED state. + */ + static interface Callback { + void enteredState(State newState); + } + + /** + * Simple main() for testing. + */ + public static void main(String[] args) throws InterruptedException { + if (args.length != 1) { + System.err.println("Usage: " + HealthMonitor.class.getName() + + " "); + System.exit(1); + } + Configuration conf = new Configuration(); + + String target = args[0]; + InetSocketAddress addr = NetUtils.createSocketAddr(target); + + HealthMonitor hm = new HealthMonitor(conf, addr); + hm.start(); + hm.join(); + } + +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java index 3bf4f6f0133..ca95f794b3c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/protocolPB/HAServiceProtocolClientSideTranslatorPB.java @@ -21,6 +21,8 @@ import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; +import javax.net.SocketFactory; + import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -34,7 +36,9 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyReq import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.security.UserGroupInformation; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -47,7 +51,7 @@ import com.google.protobuf.ServiceException; @InterfaceAudience.Private @InterfaceStability.Stable public class HAServiceProtocolClientSideTranslatorPB implements - HAServiceProtocol, Closeable { + HAServiceProtocol, Closeable, ProtocolTranslator { /** RpcController is not used and hence is set to null */ private final static RpcController NULL_CONTROLLER = null; private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ = @@ -71,6 +75,16 @@ public class HAServiceProtocolClientSideTranslatorPB implements RPC.getProtocolVersion(HAServiceProtocolPB.class), addr, conf); } + public HAServiceProtocolClientSideTranslatorPB( + InetSocketAddress addr, Configuration conf, + SocketFactory socketFactory, int timeout) throws IOException { + RPC.setProtocolEngine(conf, HAServiceProtocolPB.class, + ProtobufRpcEngine.class); + rpcProxy = RPC.getProxy(HAServiceProtocolPB.class, + RPC.getProtocolVersion(HAServiceProtocolPB.class), addr, + UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout); + } + @Override public void monitorHealth() throws IOException { try { @@ -132,4 +146,9 @@ public class HAServiceProtocolClientSideTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + @Override + public Object getUnderlyingProxyObject() { + return rpcProxy; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java new file mode 100644 index 00000000000..e625d6c339c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HealthCheckFailedException; +import org.apache.hadoop.ha.HealthMonitor.Callback; +import org.apache.hadoop.ha.HealthMonitor.State; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestHealthMonitor { + private static final Log LOG = LogFactory.getLog( + TestHealthMonitor.class); + + /* bogus address to pass to constructor - never used */ + private static final InetSocketAddress BOGUS_ADDR = + new InetSocketAddress(1); + + private HAServiceProtocol mockProxy; + + /** How many times has createProxy been called */ + private volatile CountDownLatch createProxyLatch; + + /** Should throw an IOE when trying to connect */ + private volatile boolean shouldThrowOnCreateProxy = false; + + private HealthMonitor hm; + + @Before + public void setupHM() throws InterruptedException, IOException { + Configuration conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1); + conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50); + conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); + conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); + mockProxy = Mockito.mock(HAServiceProtocol.class); + Mockito.doReturn(HAServiceState.ACTIVE) + .when(mockProxy).getServiceState(); + + hm = new HealthMonitor(conf, BOGUS_ADDR) { + @Override + protected HAServiceProtocol createProxy() throws IOException { + createProxyLatch.countDown(); + if (shouldThrowOnCreateProxy) { + throw new IOException("can't connect"); + } + return mockProxy; + } + }; + + createProxyLatch = new CountDownLatch(1); + + LOG.info("Starting health monitor"); + hm.start(); + + LOG.info("Waiting for proxy to be created"); + assertTrue(createProxyLatch.await(2000, TimeUnit.MILLISECONDS)); + createProxyLatch = null; + + LOG.info("Waiting for HEALTHY signal"); + waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); + } + + @Test(timeout=15000) + public void testMonitor() throws Exception { + LOG.info("Mocking bad health check, waiting for UNHEALTHY"); + Mockito.doThrow(new HealthCheckFailedException("Fake health check failure")) + .when(mockProxy).monitorHealth(); + waitForState(hm, HealthMonitor.State.SERVICE_UNHEALTHY); + + LOG.info("Returning to healthy state, waiting for HEALTHY"); + Mockito.doNothing().when(mockProxy).monitorHealth(); + waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); + + LOG.info("Returning an IOException, as if node went down"); + // should expect many rapid retries + createProxyLatch = new CountDownLatch(3); + shouldThrowOnCreateProxy = true; + Mockito.doThrow(new IOException("Connection lost (fake)")) + .when(mockProxy).monitorHealth(); + waitForState(hm, HealthMonitor.State.SERVICE_NOT_RESPONDING); + assertTrue("Monitor should retry if createProxy throws an IOE", + createProxyLatch.await(1000, TimeUnit.MILLISECONDS)); + + LOG.info("Returning to healthy state, waiting for HEALTHY"); + shouldThrowOnCreateProxy = false; + Mockito.doNothing().when(mockProxy).monitorHealth(); + waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); + + hm.shutdown(); + hm.join(); + assertFalse(hm.isAlive()); + } + + /** + * Test that the proper state is propagated when the health monitor + * sees an uncaught exception in its thread. + */ + @Test(timeout=15000) + public void testHealthMonitorDies() throws Exception { + LOG.info("Mocking RTE in health monitor, waiting for FAILED"); + Mockito.doThrow(new OutOfMemoryError()) + .when(mockProxy).monitorHealth(); + waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED); + hm.shutdown(); + hm.join(); + assertFalse(hm.isAlive()); + } + + /** + * Test that, if the callback throws an RTE, this will terminate the + * health monitor and thus change its state to FAILED + * @throws Exception + */ + @Test(timeout=15000) + public void testCallbackThrowsRTE() throws Exception { + hm.addCallback(new Callback() { + @Override + public void enteredState(State newState) { + throw new RuntimeException("Injected RTE"); + } + }); + LOG.info("Mocking bad health check, waiting for UNHEALTHY"); + Mockito.doThrow(new HealthCheckFailedException("Fake health check failure")) + .when(mockProxy).monitorHealth(); + waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED); + } + + private void waitForState(HealthMonitor hm, State state) + throws InterruptedException { + long st = System.currentTimeMillis(); + while (System.currentTimeMillis() - st < 2000) { + if (hm.getHealthState() == state) { + return; + } + Thread.sleep(50); + } + assertEquals(state, hm.getHealthState()); + } +}