HADOOP-7788. Add simple HealthMonitor class to watch an HAService. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1303207 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-03-20 23:27:42 +00:00
parent d31bade781
commit 164fb303f8
5 changed files with 535 additions and 1 deletions

View File

@ -96,6 +96,8 @@ Release 0.23.3 - UNRELEASED
HADOOP-8183. Stop using "mapred.used.genericoptions.parser" (harsh) HADOOP-8183. Stop using "mapred.used.genericoptions.parser" (harsh)
HADOOP-7788. Add simple HealthMonitor class to watch an HAService (todd)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -121,5 +121,29 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
"hadoop.security.token.service.use_ip"; "hadoop.security.token.service.use_ip";
public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT = public static final boolean HADOOP_SECURITY_TOKEN_SERVICE_USE_IP_DEFAULT =
true; true;
/**
* HA health monitor and failover controller.
*/
/** How often to retry connecting to the service. */
public static final String HA_HM_CONNECT_RETRY_INTERVAL_KEY =
"ha.health-monitor.connect-retry-interval.ms";
public static final long HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT = 1000;
/* How often to check the service. */
public static final String HA_HM_CHECK_INTERVAL_KEY =
"ha.health-monitor.check-interval.ms";
public static final long HA_HM_CHECK_INTERVAL_DEFAULT = 1000;
/* How long to sleep after an unexpected RPC error. */
public static final String HA_HM_SLEEP_AFTER_DISCONNECT_KEY =
"ha.health-monitor.sleep-after-disconnect.ms";
public static final long HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT = 1000;
/* Timeout for the actual monitorHealth() calls. */
public static final String HA_HM_RPC_TIMEOUT_KEY =
"ha.health-monitor.rpc-timeout.ms";
public static final int HA_HM_RPC_TIMEOUT_DEFAULT = 45000;
} }

View File

@ -0,0 +1,319 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import static org.apache.hadoop.fs.CommonConfigurationKeys.*;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Daemon;
import com.google.common.base.Preconditions;
/**
* This class is a daemon which runs in a loop, periodically heartbeating
* with an HA service. It is responsible for keeping track of that service's
* health and exposing callbacks to the failover controller when the health
* status changes.
*
* Classes which need callbacks should implement the {@link Callback}
* interface.
*/
class HealthMonitor {
private static final Log LOG = LogFactory.getLog(
HealthMonitor.class);
private Daemon daemon;
private long connectRetryInterval;
private long checkIntervalMillis;
private long sleepAfterDisconnectMillis;
private int rpcTimeout;
private volatile boolean shouldRun = true;
/** The connected proxy */
private HAServiceProtocol proxy;
/** The address running the HA Service */
private final InetSocketAddress addrToMonitor;
private final Configuration conf;
private State state = State.INITIALIZING;
/**
* Listeners for state changes
*/
private List<Callback> callbacks = Collections.synchronizedList(
new LinkedList<Callback>());
private HAServiceState lastServiceState = HAServiceState.INITIALIZING;
enum State {
/**
* The health monitor is still starting up.
*/
INITIALIZING,
/**
* The service is not responding to health check RPCs.
*/
SERVICE_NOT_RESPONDING,
/**
* The service is connected and healthy.
*/
SERVICE_HEALTHY,
/**
* The service is running but unhealthy.
*/
SERVICE_UNHEALTHY,
/**
* The health monitor itself failed unrecoverably and can
* no longer provide accurate information.
*/
HEALTH_MONITOR_FAILED;
}
HealthMonitor(Configuration conf, InetSocketAddress addrToMonitor) {
this.conf = conf;
this.addrToMonitor = addrToMonitor;
this.sleepAfterDisconnectMillis = conf.getLong(
HA_HM_SLEEP_AFTER_DISCONNECT_KEY,
HA_HM_SLEEP_AFTER_DISCONNECT_DEFAULT);
this.checkIntervalMillis = conf.getLong(
HA_HM_CHECK_INTERVAL_KEY,
HA_HM_CHECK_INTERVAL_DEFAULT);
this.connectRetryInterval = conf.getLong(
HA_HM_CONNECT_RETRY_INTERVAL_KEY,
HA_HM_CONNECT_RETRY_INTERVAL_DEFAULT);
this.rpcTimeout = conf.getInt(
HA_HM_RPC_TIMEOUT_KEY,
HA_HM_RPC_TIMEOUT_DEFAULT);
this.daemon = new MonitorDaemon();
}
public void addCallback(Callback cb) {
this.callbacks.add(cb);
}
public void removeCallback(Callback cb) {
callbacks.remove(cb);
}
public void shutdown() {
LOG.info("Stopping HealthMonitor thread");
shouldRun = false;
daemon.interrupt();
}
/**
* @return the current proxy object to the underlying service.
* Note that this may return null in the case that the service
* is not responding. Also note that, even if the last indicated
* state is healthy, the service may have gone down in the meantime.
*/
public synchronized HAServiceProtocol getProxy() {
return proxy;
}
private void loopUntilConnected() throws InterruptedException {
tryConnect();
while (proxy == null) {
Thread.sleep(connectRetryInterval);
tryConnect();
}
assert proxy != null;
}
private void tryConnect() {
Preconditions.checkState(proxy == null);
try {
synchronized (this) {
proxy = createProxy();
}
} catch (IOException e) {
LOG.warn("Could not connect to local service at " + addrToMonitor +
": " + e.getMessage());
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
}
}
/**
* Connect to the service to be monitored. Stubbed out for easier testing.
*/
protected HAServiceProtocol createProxy() throws IOException {
SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf);
return new HAServiceProtocolClientSideTranslatorPB(
addrToMonitor,
conf, socketFactory, rpcTimeout);
}
private void doHealthChecks() throws InterruptedException {
while (shouldRun) {
HAServiceState state = null;
boolean healthy = false;
try {
state = proxy.getServiceState();
proxy.monitorHealth();
healthy = true;
} catch (HealthCheckFailedException e) {
LOG.warn("Service health check failed: " + e.getMessage());
enterState(State.SERVICE_UNHEALTHY);
} catch (Throwable t) {
LOG.warn("Transport-level exception trying to monitor health of " +
addrToMonitor + ": " + t.getLocalizedMessage());
RPC.stopProxy(proxy);
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
Thread.sleep(sleepAfterDisconnectMillis);
return;
}
if (state != null) {
setLastServiceState(state);
}
if (healthy) {
enterState(State.SERVICE_HEALTHY);
}
Thread.sleep(checkIntervalMillis);
}
}
private synchronized void setLastServiceState(HAServiceState serviceState) {
this.lastServiceState = serviceState;
}
private synchronized void enterState(State newState) {
if (newState != state) {
LOG.info("Entering state " + newState);
state = newState;
synchronized (callbacks) {
for (Callback cb : callbacks) {
cb.enteredState(newState);
}
}
}
}
synchronized State getHealthState() {
return state;
}
synchronized HAServiceState getLastServiceState() {
return lastServiceState;
}
boolean isAlive() {
return daemon.isAlive();
}
void join() throws InterruptedException {
daemon.join();
}
void start() {
daemon.start();
}
private class MonitorDaemon extends Daemon {
private MonitorDaemon() {
super();
setName("Health Monitor for " + addrToMonitor);
setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
LOG.fatal("Health monitor failed", e);
enterState(HealthMonitor.State.HEALTH_MONITOR_FAILED);
}
});
}
@Override
public void run() {
while (shouldRun) {
try {
loopUntilConnected();
doHealthChecks();
} catch (InterruptedException ie) {
Preconditions.checkState(!shouldRun,
"Interrupted but still supposed to run");
}
}
}
}
/**
* Callback interface for state change events.
*
* This interface is called from a single thread which also performs
* the health monitoring. If the callback processing takes a long time,
* no further health checks will be made during this period, nor will
* other registered callbacks be called.
*
* If the callback itself throws an unchecked exception, no other
* callbacks following it will be called, and the health monitor
* will terminate, entering HEALTH_MONITOR_FAILED state.
*/
static interface Callback {
void enteredState(State newState);
}
/**
* Simple main() for testing.
*/
public static void main(String[] args) throws InterruptedException {
if (args.length != 1) {
System.err.println("Usage: " + HealthMonitor.class.getName() +
" <addr to monitor>");
System.exit(1);
}
Configuration conf = new Configuration();
String target = args[0];
InetSocketAddress addr = NetUtils.createSocketAddr(target);
HealthMonitor hm = new HealthMonitor(conf, addr);
hm.start();
hm.join();
}
}

View File

@ -21,6 +21,8 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import javax.net.SocketFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -34,7 +36,9 @@ import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.TransitionToStandbyReq
import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.UserGroupInformation;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -47,7 +51,7 @@ import com.google.protobuf.ServiceException;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Stable @InterfaceStability.Stable
public class HAServiceProtocolClientSideTranslatorPB implements public class HAServiceProtocolClientSideTranslatorPB implements
HAServiceProtocol, Closeable { HAServiceProtocol, Closeable, ProtocolTranslator {
/** RpcController is not used and hence is set to null */ /** RpcController is not used and hence is set to null */
private final static RpcController NULL_CONTROLLER = null; private final static RpcController NULL_CONTROLLER = null;
private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ = private final static MonitorHealthRequestProto MONITOR_HEALTH_REQ =
@ -71,6 +75,16 @@ public class HAServiceProtocolClientSideTranslatorPB implements
RPC.getProtocolVersion(HAServiceProtocolPB.class), addr, conf); RPC.getProtocolVersion(HAServiceProtocolPB.class), addr, conf);
} }
public HAServiceProtocolClientSideTranslatorPB(
InetSocketAddress addr, Configuration conf,
SocketFactory socketFactory, int timeout) throws IOException {
RPC.setProtocolEngine(conf, HAServiceProtocolPB.class,
ProtobufRpcEngine.class);
rpcProxy = RPC.getProxy(HAServiceProtocolPB.class,
RPC.getProtocolVersion(HAServiceProtocolPB.class), addr,
UserGroupInformation.getCurrentUser(), conf, socketFactory, timeout);
}
@Override @Override
public void monitorHealth() throws IOException { public void monitorHealth() throws IOException {
try { try {
@ -132,4 +146,9 @@ public class HAServiceProtocolClientSideTranslatorPB implements
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;
}
} }

View File

@ -0,0 +1,170 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ha;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.HealthMonitor.Callback;
import org.apache.hadoop.ha.HealthMonitor.State;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
public class TestHealthMonitor {
private static final Log LOG = LogFactory.getLog(
TestHealthMonitor.class);
/* bogus address to pass to constructor - never used */
private static final InetSocketAddress BOGUS_ADDR =
new InetSocketAddress(1);
private HAServiceProtocol mockProxy;
/** How many times has createProxy been called */
private volatile CountDownLatch createProxyLatch;
/** Should throw an IOE when trying to connect */
private volatile boolean shouldThrowOnCreateProxy = false;
private HealthMonitor hm;
@Before
public void setupHM() throws InterruptedException, IOException {
Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 1);
conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
mockProxy = Mockito.mock(HAServiceProtocol.class);
Mockito.doReturn(HAServiceState.ACTIVE)
.when(mockProxy).getServiceState();
hm = new HealthMonitor(conf, BOGUS_ADDR) {
@Override
protected HAServiceProtocol createProxy() throws IOException {
createProxyLatch.countDown();
if (shouldThrowOnCreateProxy) {
throw new IOException("can't connect");
}
return mockProxy;
}
};
createProxyLatch = new CountDownLatch(1);
LOG.info("Starting health monitor");
hm.start();
LOG.info("Waiting for proxy to be created");
assertTrue(createProxyLatch.await(2000, TimeUnit.MILLISECONDS));
createProxyLatch = null;
LOG.info("Waiting for HEALTHY signal");
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
}
@Test(timeout=15000)
public void testMonitor() throws Exception {
LOG.info("Mocking bad health check, waiting for UNHEALTHY");
Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
.when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.SERVICE_UNHEALTHY);
LOG.info("Returning to healthy state, waiting for HEALTHY");
Mockito.doNothing().when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
LOG.info("Returning an IOException, as if node went down");
// should expect many rapid retries
createProxyLatch = new CountDownLatch(3);
shouldThrowOnCreateProxy = true;
Mockito.doThrow(new IOException("Connection lost (fake)"))
.when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.SERVICE_NOT_RESPONDING);
assertTrue("Monitor should retry if createProxy throws an IOE",
createProxyLatch.await(1000, TimeUnit.MILLISECONDS));
LOG.info("Returning to healthy state, waiting for HEALTHY");
shouldThrowOnCreateProxy = false;
Mockito.doNothing().when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
hm.shutdown();
hm.join();
assertFalse(hm.isAlive());
}
/**
* Test that the proper state is propagated when the health monitor
* sees an uncaught exception in its thread.
*/
@Test(timeout=15000)
public void testHealthMonitorDies() throws Exception {
LOG.info("Mocking RTE in health monitor, waiting for FAILED");
Mockito.doThrow(new OutOfMemoryError())
.when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
hm.shutdown();
hm.join();
assertFalse(hm.isAlive());
}
/**
* Test that, if the callback throws an RTE, this will terminate the
* health monitor and thus change its state to FAILED
* @throws Exception
*/
@Test(timeout=15000)
public void testCallbackThrowsRTE() throws Exception {
hm.addCallback(new Callback() {
@Override
public void enteredState(State newState) {
throw new RuntimeException("Injected RTE");
}
});
LOG.info("Mocking bad health check, waiting for UNHEALTHY");
Mockito.doThrow(new HealthCheckFailedException("Fake health check failure"))
.when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
}
private void waitForState(HealthMonitor hm, State state)
throws InterruptedException {
long st = System.currentTimeMillis();
while (System.currentTimeMillis() - st < 2000) {
if (hm.getHealthState() == state) {
return;
}
Thread.sleep(50);
}
assertEquals(state, hm.getHealthState());
}
}