HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned to SERVICE_NOT_RESPONDING (Contributed by Ming Ma)
This commit is contained in:
parent
500e6a0f46
commit
cf4b7f506d
|
@ -934,6 +934,9 @@ Release 2.7.0 - UNRELEASED
|
|||
HADOOP-11467. KerberosAuthenticator can connect to a non-secure cluster.
|
||||
(yzhangal via rkanter)
|
||||
|
||||
HADOOP-11000. HAServiceProtocol's health state is incorrectly transitioned
|
||||
to SERVICE_NOT_RESPONDING (Ming Ma via vinayakumarb)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
|
|
@ -30,6 +30,7 @@ import static org.apache.hadoop.fs.CommonConfigurationKeys.*;
|
|||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.HealthCheckFailedException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
|
||||
|
@ -201,18 +202,20 @@ public class HealthMonitor {
|
|||
status = proxy.getServiceStatus();
|
||||
proxy.monitorHealth();
|
||||
healthy = true;
|
||||
} catch (HealthCheckFailedException e) {
|
||||
LOG.warn("Service health check failed for " + targetToMonitor
|
||||
+ ": " + e.getMessage());
|
||||
enterState(State.SERVICE_UNHEALTHY);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Transport-level exception trying to monitor health of " +
|
||||
targetToMonitor + ": " + t.getLocalizedMessage());
|
||||
RPC.stopProxy(proxy);
|
||||
proxy = null;
|
||||
enterState(State.SERVICE_NOT_RESPONDING);
|
||||
Thread.sleep(sleepAfterDisconnectMillis);
|
||||
return;
|
||||
if (isHealthCheckFailedException(t)) {
|
||||
LOG.warn("Service health check failed for " + targetToMonitor
|
||||
+ ": " + t.getMessage());
|
||||
enterState(State.SERVICE_UNHEALTHY);
|
||||
} else {
|
||||
LOG.warn("Transport-level exception trying to monitor health of " +
|
||||
targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
|
||||
RPC.stopProxy(proxy);
|
||||
proxy = null;
|
||||
enterState(State.SERVICE_NOT_RESPONDING);
|
||||
Thread.sleep(sleepAfterDisconnectMillis);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (status != null) {
|
||||
|
@ -225,7 +228,15 @@ public class HealthMonitor {
|
|||
Thread.sleep(checkIntervalMillis);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
private boolean isHealthCheckFailedException(Throwable t) {
|
||||
return ((t instanceof HealthCheckFailedException) ||
|
||||
(t instanceof RemoteException &&
|
||||
((RemoteException)t).unwrapRemoteException(
|
||||
HealthCheckFailedException.class) instanceof
|
||||
HealthCheckFailedException));
|
||||
}
|
||||
|
||||
private synchronized void setLastServiceStatus(HAServiceStatus status) {
|
||||
this.lastServiceState = status;
|
||||
for (ServiceStateCallback cb : serviceStateCallbacks) {
|
||||
|
|
|
@ -22,15 +22,25 @@ import java.io.IOException;
|
|||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import com.google.protobuf.BlockingService;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolPB;
|
||||
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
|
||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceProtocolService;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import static org.apache.hadoop.fs.CommonConfigurationKeys.HA_HM_RPC_TIMEOUT_DEFAULT;
|
||||
|
||||
/**
|
||||
* Test-only implementation of {@link HAServiceTarget}, which returns
|
||||
* a mock implementation.
|
||||
|
@ -50,22 +60,33 @@ class DummyHAService extends HAServiceTarget {
|
|||
DummySharedResource sharedResource;
|
||||
public int fenceCount = 0;
|
||||
public int activeTransitionCount = 0;
|
||||
boolean testWithProtoBufRPC = false;
|
||||
|
||||
static ArrayList<DummyHAService> instances = Lists.newArrayList();
|
||||
int index;
|
||||
|
||||
DummyHAService(HAServiceState state, InetSocketAddress address) {
|
||||
this(state, address, false);
|
||||
}
|
||||
|
||||
DummyHAService(HAServiceState state, InetSocketAddress address,
|
||||
boolean testWithProtoBufRPC) {
|
||||
this.state = state;
|
||||
this.proxy = makeMock();
|
||||
this.testWithProtoBufRPC = testWithProtoBufRPC;
|
||||
if (testWithProtoBufRPC) {
|
||||
this.address = startAndGetRPCServerAddress(address);
|
||||
} else {
|
||||
this.address = address;
|
||||
}
|
||||
Configuration conf = new Configuration();
|
||||
this.proxy = makeMock(conf, HA_HM_RPC_TIMEOUT_DEFAULT);
|
||||
try {
|
||||
Configuration conf = new Configuration();
|
||||
conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
|
||||
conf.set(DUMMY_FENCE_KEY, DummyFencer.class.getName());
|
||||
this.fencer = Mockito.spy(
|
||||
NodeFencer.create(conf, DUMMY_FENCE_KEY));
|
||||
} catch (BadFencingConfigurationException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
this.address = address;
|
||||
synchronized (instances) {
|
||||
instances.add(this);
|
||||
this.index = instances.size();
|
||||
|
@ -75,9 +96,42 @@ class DummyHAService extends HAServiceTarget {
|
|||
public void setSharedResource(DummySharedResource rsrc) {
|
||||
this.sharedResource = rsrc;
|
||||
}
|
||||
|
||||
private HAServiceProtocol makeMock() {
|
||||
return Mockito.spy(new MockHAProtocolImpl());
|
||||
|
||||
private InetSocketAddress startAndGetRPCServerAddress(InetSocketAddress serverAddress) {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
try {
|
||||
RPC.setProtocolEngine(conf,
|
||||
HAServiceProtocolPB.class, ProtobufRpcEngine.class);
|
||||
HAServiceProtocolServerSideTranslatorPB haServiceProtocolXlator =
|
||||
new HAServiceProtocolServerSideTranslatorPB(new MockHAProtocolImpl());
|
||||
BlockingService haPbService = HAServiceProtocolService
|
||||
.newReflectiveBlockingService(haServiceProtocolXlator);
|
||||
|
||||
Server server = new RPC.Builder(conf)
|
||||
.setProtocol(HAServiceProtocolPB.class)
|
||||
.setInstance(haPbService)
|
||||
.setBindAddress(serverAddress.getHostName())
|
||||
.setPort(serverAddress.getPort()).build();
|
||||
server.start();
|
||||
return NetUtils.getConnectAddress(server);
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
private HAServiceProtocol makeMock(Configuration conf, int timeoutMs) {
|
||||
HAServiceProtocol service;
|
||||
if (!testWithProtoBufRPC) {
|
||||
service = new MockHAProtocolImpl();
|
||||
} else {
|
||||
try {
|
||||
service = super.getProxy(conf, timeoutMs);
|
||||
} catch (IOException e) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
return Mockito.spy(service);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -93,6 +147,9 @@ class DummyHAService extends HAServiceTarget {
|
|||
@Override
|
||||
public HAServiceProtocol getProxy(Configuration conf, int timeout)
|
||||
throws IOException {
|
||||
if (testWithProtoBufRPC) {
|
||||
proxy = makeMock(conf, timeout);
|
||||
}
|
||||
return proxy;
|
||||
}
|
||||
|
||||
|
@ -168,7 +225,7 @@ class DummyHAService extends HAServiceTarget {
|
|||
public HAServiceStatus getServiceStatus() throws IOException {
|
||||
checkUnreachable();
|
||||
HAServiceStatus ret = new HAServiceStatus(state);
|
||||
if (state == HAServiceState.STANDBY) {
|
||||
if (state == HAServiceState.STANDBY || state == HAServiceState.ACTIVE) {
|
||||
ret.setReadyToBecomeActive();
|
||||
}
|
||||
return ret;
|
||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.ha;
|
|||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -54,7 +55,8 @@ public class TestHealthMonitor {
|
|||
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
|
||||
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
|
||||
|
||||
svc = new DummyHAService(HAServiceState.ACTIVE, null);
|
||||
svc = new DummyHAService(HAServiceState.ACTIVE,
|
||||
new InetSocketAddress("0.0.0.0", 0), true);
|
||||
hm = new HealthMonitor(conf, svc) {
|
||||
@Override
|
||||
protected HAServiceProtocol createProxy() throws IOException {
|
||||
|
|
Loading…
Reference in New Issue