HBASE-18054 log when we add/remove failed servers in client (Ali)
This commit is contained in:
parent
746d1b1819
commit
2fd8e824d5
|
@ -488,7 +488,7 @@ class BlockingRpcConnection extends RpcConnection implements Runnable {
|
||||||
closeSocket();
|
closeSocket();
|
||||||
IOException e = ExceptionUtil.asInterrupt(t);
|
IOException e = ExceptionUtil.asInterrupt(t);
|
||||||
if (e == null) {
|
if (e == null) {
|
||||||
this.rpcClient.failedServers.addToFailedServers(remoteId.address);
|
this.rpcClient.failedServers.addToFailedServers(remoteId.address, t);
|
||||||
if (t instanceof LinkageError) {
|
if (t instanceof LinkageError) {
|
||||||
// probably the hbase hadoop version does not match the running hadoop version
|
// probably the hbase hadoop version does not match the running hadoop version
|
||||||
e = new DoNotRetryIOException(t);
|
e = new DoNotRetryIOException(t);
|
||||||
|
|
|
@ -23,6 +23,8 @@ import java.util.Iterator;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
@ -36,6 +38,7 @@ public class FailedServers {
|
||||||
private final Map<String, Long> failedServers = new HashMap<String, Long>();
|
private final Map<String, Long> failedServers = new HashMap<String, Long>();
|
||||||
private long latestExpiry = 0;
|
private long latestExpiry = 0;
|
||||||
private final int recheckServersTimeout;
|
private final int recheckServersTimeout;
|
||||||
|
private static final Log LOG = LogFactory.getLog(FailedServers.class);
|
||||||
|
|
||||||
public FailedServers(Configuration conf) {
|
public FailedServers(Configuration conf) {
|
||||||
this.recheckServersTimeout = conf.getInt(
|
this.recheckServersTimeout = conf.getInt(
|
||||||
|
@ -45,10 +48,15 @@ public class FailedServers {
|
||||||
/**
|
/**
|
||||||
* Add an address to the list of the failed servers list.
|
* Add an address to the list of the failed servers list.
|
||||||
*/
|
*/
|
||||||
public synchronized void addToFailedServers(InetSocketAddress address) {
|
public synchronized void addToFailedServers(InetSocketAddress address, Throwable throwable) {
|
||||||
final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout;
|
final long expiry = EnvironmentEdgeManager.currentTime() + recheckServersTimeout;
|
||||||
this.failedServers.put(address.toString(), expiry);
|
this.failedServers.put(address.toString(), expiry);
|
||||||
this.latestExpiry = expiry;
|
this.latestExpiry = expiry;
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug(
|
||||||
|
"Added failed server with address " + address.toString() + " to list caused by "
|
||||||
|
+ throwable.toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -266,7 +266,7 @@ class NettyRpcConnection extends RpcConnection {
|
||||||
Channel ch = future.channel();
|
Channel ch = future.channel();
|
||||||
if (!future.isSuccess()) {
|
if (!future.isSuccess()) {
|
||||||
failInit(ch, toIOE(future.cause()));
|
failInit(ch, toIOE(future.cause()));
|
||||||
rpcClient.failedServers.addToFailedServers(remoteId.address);
|
rpcClient.failedServers.addToFailedServers(remoteId.address, future.cause());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
|
ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate());
|
||||||
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
|
||||||
|
* agreements. See the NOTICE file distributed with this work for additional information regarding
|
||||||
|
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance with the License. You may obtain a
|
||||||
|
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
|
||||||
|
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
|
||||||
|
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
|
||||||
|
* for the specific language governing permissions and limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.log4j.Appender;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
|
import org.apache.log4j.LogManager;
|
||||||
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.mockito.ArgumentCaptor;
|
||||||
|
import org.mockito.Captor;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
|
||||||
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
|
@Category({ ClientTests.class, SmallTests.class })
|
||||||
|
public class TestFailedServersLog {
|
||||||
|
static final int TEST_PORT = 9999;
|
||||||
|
private InetSocketAddress addr;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private Appender mockAppender;
|
||||||
|
|
||||||
|
@Captor
|
||||||
|
private ArgumentCaptor captorLoggingEvent;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
LogManager.getRootLogger().addAppender(mockAppender);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void teardown() {
|
||||||
|
LogManager.getRootLogger().removeAppender(mockAppender);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddToFailedServersLogging() {
|
||||||
|
Throwable nullException = new NullPointerException();
|
||||||
|
|
||||||
|
FailedServers fs = new FailedServers(new Configuration());
|
||||||
|
addr = new InetSocketAddress(TEST_PORT);
|
||||||
|
|
||||||
|
fs.addToFailedServers(addr, nullException);
|
||||||
|
|
||||||
|
Mockito.verify(mockAppender).doAppend((LoggingEvent) captorLoggingEvent.capture());
|
||||||
|
LoggingEvent loggingEvent = (LoggingEvent) captorLoggingEvent.getValue();
|
||||||
|
assertThat(loggingEvent.getLevel(), is(Level.DEBUG));
|
||||||
|
assertEquals("Added failed server with address " + addr.toString() + " to list caused by "
|
||||||
|
+ nullException.toString(),
|
||||||
|
loggingEvent.getRenderedMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -37,6 +37,7 @@ public class TestHBaseClient {
|
||||||
ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
|
ManualEnvironmentEdge ee = new ManualEnvironmentEdge();
|
||||||
EnvironmentEdgeManager.injectEdge( ee );
|
EnvironmentEdgeManager.injectEdge( ee );
|
||||||
FailedServers fs = new FailedServers(new Configuration());
|
FailedServers fs = new FailedServers(new Configuration());
|
||||||
|
Throwable testThrowable = new Throwable();//throwable already tested in TestFailedServers.java
|
||||||
|
|
||||||
InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12);
|
InetSocketAddress ia = InetSocketAddress.createUnresolved("bad", 12);
|
||||||
InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia
|
InetSocketAddress ia2 = InetSocketAddress.createUnresolved("bad", 12); // same server as ia
|
||||||
|
@ -46,7 +47,7 @@ public class TestHBaseClient {
|
||||||
|
|
||||||
Assert.assertFalse( fs.isFailedServer(ia) );
|
Assert.assertFalse( fs.isFailedServer(ia) );
|
||||||
|
|
||||||
fs.addToFailedServers(ia);
|
fs.addToFailedServers(ia,testThrowable);
|
||||||
Assert.assertTrue( fs.isFailedServer(ia) );
|
Assert.assertTrue( fs.isFailedServer(ia) );
|
||||||
Assert.assertTrue( fs.isFailedServer(ia2) );
|
Assert.assertTrue( fs.isFailedServer(ia2) );
|
||||||
|
|
||||||
|
@ -58,9 +59,9 @@ public class TestHBaseClient {
|
||||||
Assert.assertFalse( fs.isFailedServer(ia) );
|
Assert.assertFalse( fs.isFailedServer(ia) );
|
||||||
Assert.assertFalse( fs.isFailedServer(ia2) );
|
Assert.assertFalse( fs.isFailedServer(ia2) );
|
||||||
|
|
||||||
fs.addToFailedServers(ia);
|
fs.addToFailedServers(ia,testThrowable);
|
||||||
fs.addToFailedServers(ia3);
|
fs.addToFailedServers(ia3,testThrowable);
|
||||||
fs.addToFailedServers(ia4);
|
fs.addToFailedServers(ia4,testThrowable);
|
||||||
|
|
||||||
Assert.assertTrue( fs.isFailedServer(ia) );
|
Assert.assertTrue( fs.isFailedServer(ia) );
|
||||||
Assert.assertTrue( fs.isFailedServer(ia2) );
|
Assert.assertTrue( fs.isFailedServer(ia2) );
|
||||||
|
@ -74,7 +75,7 @@ public class TestHBaseClient {
|
||||||
Assert.assertFalse( fs.isFailedServer(ia4) );
|
Assert.assertFalse( fs.isFailedServer(ia4) );
|
||||||
|
|
||||||
|
|
||||||
fs.addToFailedServers(ia3);
|
fs.addToFailedServers(ia3,testThrowable);
|
||||||
Assert.assertFalse( fs.isFailedServer(ia4) );
|
Assert.assertFalse( fs.isFailedServer(ia4) );
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue