HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned to SERVICE_NOT_RESPONDING (Contributed by Ming Ma)

(cherry picked from commit cf4b7f506d)
This commit is contained in:
Vinayakumar B 2015-02-17 14:55:56 +05:30
parent 91a5d92916
commit 005e1df540
4 changed files with 94 additions and 21 deletions

View File

@ -537,6 +537,9 @@ Release 2.7.0 - UNRELEASED
HADOOP-11467. KerberosAuthenticator can connect to a non-secure cluster.
(yzhangal via rkanter)
HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned
to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb)
Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.util.Daemon;
@ -201,18 +202,20 @@ private void doHealthChecks() throws InterruptedException {
status = proxy.getServiceStatus();
proxy.monitorHealth();
healthy = true;
} catch (HealthCheckFailedException e) {
LOG.warn("Service health check failed for " + targetToMonitor
+ ": " + e.getMessage());
enterState(State.SERVICE_UNHEALTHY);
} catch (Throwable t) {
LOG.warn("Transport-level exception trying to monitor health of " +
targetToMonitor + ": " + t.getLocalizedMessage());
RPC.stopProxy(proxy);
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
Thread.sleep(sleepAfterDisconnectMillis);
return;
if (isHealthCheckFailedException(t)) {
LOG.warn("Service health check failed for " + targetToMonitor
+ ": " + t.getMessage());
enterState(State.SERVICE_UNHEALTHY);
} else {
LOG.warn("Transport-level exception trying to monitor health of " +
targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
RPC.stopProxy(proxy);
proxy = null;
enterState(State.SERVICE_NOT_RESPONDING);
Thread.sleep(sleepAfterDisconnectMillis);
return;
}
}
if (status != null) {
@ -226,6 +229,14 @@ private void doHealthChecks() throws InterruptedException {
}
}
private boolean isHealthCheckFailedException(Throwable t) {
return ((t instanceof HealthCheckFailedException) ||
(t instanceof RemoteException &&
((RemoteException)t).unwrapRemoteException(
HealthCheckFailedException.class) instanceof
HealthCheckFailedException));
}
private synchronized void setLastServiceStatus(HAServiceStatus status) {
this.lastServiceState = status;
for (ServiceStateCallback cb : serviceStateCallbacks) {

View File

@ -22,15 +22,25 @@
import java.net.InetSocketAddress;
import java.util.ArrayList;
import com.google.protobuf.BlockingService;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.mockito.Mockito;
import com.google.common.collect.Lists;
import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT;
/**
* Test-only implementation of {@link HAServiceTarget}, which returns
* a mock implementation.
@ -50,22 +60,33 @@ class DummyHAService extends HAServiceTarget {
DummySharedResource sharedResource;
public int fenceCount = 0;
public int activeTransitionCount = 0;
boolean testWithProtoBufRPC = false;
static ArrayList<DummyHAService> instances = Lists.newArrayList();
int index;
DummyHAService(HAServiceState state, InetSocketAddress address) {
this(state, address, false);
}
DummyHAService(HAServiceState state, InetSocketAddress address,
boolean testWithProtoBufRPC) {
this.state = state;
this.proxy = makeMock();
this.testWithProtoBufRPC = testWithProtoBufRPC;
if (testWithProtoBufRPC) {
this.address = startAndGetRPCServerAddress(address);
} else {
this.address = address;
}
Configuration conf = new Configuration();
this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
try {
Configuration conf = new Configuration();
conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
this.fencer = Mockito.spy(
NodeFencer.create(conf, DUMMY_FENCE_KEY));
} catch (BadFencingConfigurationException e) {
throw new RuntimeException(e);
}
this.address = address;
synchronized (instances) {
instances.add(this);
this.index = instances.size();
@ -76,8 +97,41 @@ public void setSharedResource(DummySharedResource rsrc) {
this.sharedResource = rsrc;
}
private HAServiceProtocol makeMock() {
return Mockito.spy(new MockHAProtocolImpl());
private InetSocketAddress startAndGetRPCServerAddress(InetSocketAddress serverAddress) {
Configuration conf = new Configuration();
try {
RPC.setProtocolEngine(conf,
HAServiceProtocolPB.class, ProtobufRpcEngine.class);
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
new HAServiceProtocolServerSideTranslatorPB(new MockHAProtocolImpl());
BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
Server server = new RPC.Builder(conf)
.setProtocol(HAServiceProtocolPB.class)
.setInstance(haPbService)
.setBindAddress(serverAddress.getHostName())
.setPort(serverAddress.getPort()).build();
server.start();
return NetUtils.getConnectAddress(server);
} catch (IOException e) {
return null;
}
}
private HAServiceProtocol makeMock(Configuration conf, int timeoutMs) {
HAServiceProtocol service;
if (!testWithProtoBufRPC) {
service = new MockHAProtocolImpl();
} else {
try {
service = super.getProxy(conf, timeoutMs);
} catch (IOException e) {
return null;
}
}
return Mockito.spy(service);
}
@Override
@ -93,6 +147,9 @@ public InetSocketAddress getZKFCAddress() {
@Override
public HAServiceProtocol getProxy(Configuration conf, int timeout)
throws IOException {
if (testWithProtoBufRPC) {
proxy = makeMock(conf, timeout);
}
return proxy;
}
@ -168,7 +225,7 @@ public void transitionToStandby(StateChangeRequestInfo req) throws ServiceFailed
public HAServiceStatus getServiceStatus() throws IOException {
checkUnreachable();
HAServiceStatus ret = new HAServiceStatus(state);
if (state == HAServiceState.STANDBY) {
if (state == HAServiceState.STANDBY || state == HAServiceState.ACTIVE) {
ret.setReadyToBecomeActive();
}
return ret;

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -54,7 +55,8 @@ public void setupHM() throws InterruptedException, IOException {
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
svc = new DummyHAService(HAServiceState.ACTIVE, null);
svc = new DummyHAService(HAServiceState.ACTIVE,
new InetSocketAddress("0.0.0.0", 0), true);
hm = new HealthMonitor(conf, svc) {
@Override
protected HAServiceProtocol createProxy() throws IOException {