HBASE-15727 Canary Tool for Zookeeper (churro morales)
This commit is contained in:
parent
72dc6fe5d5
commit
a8c8bfd5ee
|
@ -19,8 +19,14 @@
|
|||
|
||||
package org.apache.hadoop.hbase.tool;
|
||||
|
||||
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
|
||||
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -32,12 +38,12 @@ import java.util.Map;
|
|||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
|
@ -76,20 +82,29 @@ import org.apache.hadoop.hbase.util.Bytes;
|
|||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||
import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.client.ConnectStringParser;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
|
||||
/**
|
||||
* HBase Canary Tool, that that can be used to do
|
||||
* "canary monitoring" of a running HBase cluster.
|
||||
*
|
||||
* Here are two modes
|
||||
* Here are three modes
|
||||
* 1. region mode - Foreach region tries to get one row per column family
|
||||
* and outputs some information about failure or latency.
|
||||
*
|
||||
* 2. regionserver mode - Foreach regionserver tries to get one row from one table
|
||||
* selected randomly and outputs some information about failure or latency.
|
||||
*
|
||||
* 3. zookeeper mode - for each zookeeper instance, selects a zNode and
|
||||
* outputs some information about failure or latency.
|
||||
*/
|
||||
public final class Canary implements Tool {
|
||||
// Sink interface used by the canary to outputs information
|
||||
|
@ -186,6 +201,55 @@ public final class Canary implements Tool {
|
|||
}
|
||||
}
|
||||
|
||||
public static class ZookeeperStdOutSink extends StdOutSink implements ExtendedSink {
|
||||
@Override public void publishReadFailure(String zNode, String server) {
|
||||
incReadFailureCount();
|
||||
LOG.error(String.format("Read from zNode:%s on zookeeper instance:%s", zNode, server));
|
||||
}
|
||||
|
||||
@Override public void publishReadTiming(String znode, String server, long msTime) {
|
||||
LOG.info(String.format("Read from zNode:%s on zookeeper instance:%s in %dms",
|
||||
znode, server, msTime));
|
||||
}
|
||||
}
|
||||
|
||||
static class ZookeeperTask implements Callable<Void> {
|
||||
private final Connection connection;
|
||||
private final String host;
|
||||
private String znode;
|
||||
private final int timeout;
|
||||
private ZookeeperStdOutSink sink;
|
||||
|
||||
public ZookeeperTask(Connection connection, String host, String znode, int timeout,
|
||||
ZookeeperStdOutSink sink) {
|
||||
this.connection = connection;
|
||||
this.host = host;
|
||||
this.znode = znode;
|
||||
this.timeout = timeout;
|
||||
this.sink = sink;
|
||||
}
|
||||
|
||||
@Override public Void call() throws Exception {
|
||||
ZooKeeper zooKeeper = null;
|
||||
try {
|
||||
zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
|
||||
Stat exists = zooKeeper.exists(znode, false);
|
||||
StopWatch stopwatch = new StopWatch();
|
||||
stopwatch.start();
|
||||
zooKeeper.getData(znode, false, exists);
|
||||
stopwatch.stop();
|
||||
sink.publishReadTiming(znode, host, stopwatch.getTime());
|
||||
} catch (KeeperException | InterruptedException e) {
|
||||
sink.publishReadFailure(znode, host);
|
||||
} finally {
|
||||
if (zooKeeper != null) {
|
||||
zooKeeper.close();
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* For each column family of the region tries to get one row and outputs the latency, or the
|
||||
* failure.
|
||||
|
@ -459,6 +523,7 @@ public final class Canary implements Tool {
|
|||
private long timeout = DEFAULT_TIMEOUT;
|
||||
private boolean failOnError = true;
|
||||
private boolean regionServerMode = false;
|
||||
private boolean zookeeperMode = false;
|
||||
private boolean regionServerAllRegions = false;
|
||||
private boolean writeSniffing = false;
|
||||
private boolean treatFailureAsError = false;
|
||||
|
@ -519,6 +584,8 @@ public final class Canary implements Tool {
|
|||
System.err.println("-interval needs a numeric value argument.");
|
||||
printUsageAndExit();
|
||||
}
|
||||
} else if (cmd.equals("-zookeeper")) {
|
||||
this.zookeeperMode = true;
|
||||
} else if(cmd.equals("-regionserver")) {
|
||||
this.regionServerMode = true;
|
||||
} else if(cmd.equals("-allRegions")) {
|
||||
|
@ -575,6 +642,13 @@ public final class Canary implements Tool {
|
|||
System.err.println("-allRegions can only be specified in regionserver mode.");
|
||||
printUsageAndExit();
|
||||
}
|
||||
if (this.zookeeperMode) {
|
||||
if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) {
|
||||
System.err.println("-zookeeper is exclusive and cannot be combined with "
|
||||
+ "other modes.");
|
||||
printUsageAndExit();
|
||||
}
|
||||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
|
@ -659,6 +733,8 @@ public final class Canary implements Tool {
|
|||
System.err.println(" which means to enable regionserver mode");
|
||||
System.err.println(" -allRegions Tries all regions on a regionserver,");
|
||||
System.err.println(" only works in regionserver mode.");
|
||||
System.err.println(" -zookeeper Tries to grab zookeeper.znode.parent ");
|
||||
System.err.println(" on each zookeeper instance");
|
||||
System.err.println(" -daemon Continuous check at defined intervals.");
|
||||
System.err.println(" -interval <N> Interval between checks (sec)");
|
||||
System.err.println(" -e Use table/regionserver as regular expression");
|
||||
|
@ -697,6 +773,10 @@ public final class Canary implements Tool {
|
|||
new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
|
||||
(ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
|
||||
this.treatFailureAsError);
|
||||
} else if (this.zookeeperMode) {
|
||||
monitor =
|
||||
new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
|
||||
(ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
|
||||
} else {
|
||||
monitor =
|
||||
new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
|
||||
|
@ -1101,6 +1181,62 @@ public final class Canary implements Tool {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// monitor for zookeeper mode
|
||||
private static class ZookeeperMonitor extends Monitor {
|
||||
private List<String> hosts;
|
||||
private final String znode;
|
||||
private final int timeout;
|
||||
|
||||
protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
|
||||
ExtendedSink sink, ExecutorService executor, boolean treatFailureAsError) {
|
||||
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
|
||||
Configuration configuration = connection.getConfiguration();
|
||||
znode =
|
||||
configuration.get(ZOOKEEPER_ZNODE_PARENT,
|
||||
DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||
timeout = configuration
|
||||
.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
|
||||
ConnectStringParser parser =
|
||||
new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
|
||||
hosts = Lists.newArrayList();
|
||||
for (InetSocketAddress server : parser.getServerAddresses()) {
|
||||
hosts.add(server.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@Override public void run() {
|
||||
List<ZookeeperTask> tasks = Lists.newArrayList();
|
||||
for (final String host : hosts) {
|
||||
tasks.add(new ZookeeperTask(connection, host, znode, timeout, getSink()));
|
||||
}
|
||||
try {
|
||||
for (Future<Void> future : this.executor.invokeAll(tasks)) {
|
||||
try {
|
||||
future.get();
|
||||
} catch (ExecutionException e) {
|
||||
LOG.error("Sniff zookeeper failed!", e);
|
||||
this.errorCode = ERROR_EXIT_CODE;
|
||||
}
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
this.errorCode = ERROR_EXIT_CODE;
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.error("Sniff zookeeper interrupted!", e);
|
||||
}
|
||||
this.done = true;
|
||||
}
|
||||
|
||||
|
||||
private ZookeeperStdOutSink getSink() {
|
||||
if (!(sink instanceof ZookeeperStdOutSink)) {
|
||||
throw new RuntimeException("Can only write to zookeeper sink");
|
||||
}
|
||||
return ((ZookeeperStdOutSink) sink);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// a monitor for regionserver mode
|
||||
private static class RegionServerMonitor extends Monitor {
|
||||
|
||||
|
@ -1321,7 +1457,7 @@ public final class Canary implements Tool {
|
|||
new GenericOptionsParser(conf, args);
|
||||
|
||||
int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
|
||||
LOG.info("Number of exection threads " + numThreads);
|
||||
LOG.info("Number of execution threads " + numThreads);
|
||||
|
||||
ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
|
||||
|
||||
|
|
|
@ -37,6 +37,8 @@ import org.apache.hadoop.util.ToolRunner;
|
|||
import org.apache.log4j.Appender;
|
||||
import org.apache.log4j.LogManager;
|
||||
import org.apache.log4j.spi.LoggingEvent;
|
||||
import com.google.common.collect.Iterables;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -46,13 +48,11 @@ import org.mockito.ArgumentMatcher;
|
|||
import org.mockito.Mock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Matchers.anyLong;
|
||||
import static org.mockito.Matchers.eq;
|
||||
import static org.mockito.Matchers.isA;
|
||||
import static org.mockito.Mockito.atLeastOnce;
|
||||
import static org.mockito.Mockito.spy;
|
||||
|
@ -84,6 +84,24 @@ public class TestCanaryTool {
|
|||
@Mock
|
||||
Appender mockAppender;
|
||||
|
||||
@Test
|
||||
public void testBasicZookeeperCanaryWorks() throws Exception {
|
||||
Integer port =
|
||||
Iterables.getOnlyElement(testingUtility.getZkCluster().getClientPortList(), null);
|
||||
testingUtility.getConfiguration().set(HConstants.ZOOKEEPER_QUORUM,
|
||||
"localhost:" + port + "/hbase");
|
||||
ExecutorService executor = new ScheduledThreadPoolExecutor(2);
|
||||
Canary.ZookeeperStdOutSink sink = spy(new Canary.ZookeeperStdOutSink());
|
||||
Canary canary = new Canary(executor, sink);
|
||||
String[] args = { "-t", "10000", "-zookeeper" };
|
||||
ToolRunner.run(testingUtility.getConfiguration(), canary, args);
|
||||
|
||||
String baseZnode = testingUtility.getConfiguration()
|
||||
.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
|
||||
verify(sink, atLeastOnce())
|
||||
.publishReadTiming(eq(baseZnode), eq("localhost:" + port), anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBasicCanaryWorks() throws Exception {
|
||||
TableName tableName = TableName.valueOf("testTable");
|
||||
|
|
Loading…
Reference in New Issue