HBASE-15727 Canary Tool for Zookeeper (churro morales)

This commit is contained in:
tedyu 2016-06-02 10:15:08 -07:00
parent cd2588001c
commit 7e5d530870
2 changed files with 160 additions and 6 deletions

View File

@ -19,8 +19,14 @@
package org.apache.hadoop.hbase.tool; package org.apache.hadoop.hbase.tool;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT;
import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT;
import com.google.common.collect.Lists;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
@ -32,12 +38,12 @@ import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -78,20 +84,29 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.RegionSplitter; import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.hbase.zookeeper.EmptyWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.util.GenericOptionsParser; import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.client.ConnectStringParser;
import org.apache.zookeeper.data.Stat;
/** /**
* HBase Canary Tool, that that can be used to do * HBase Canary Tool, that that can be used to do
* "canary monitoring" of a running HBase cluster. * "canary monitoring" of a running HBase cluster.
* *
* Here are two modes * Here are three modes
* 1. region mode - Foreach region tries to get one row per column family * 1. region mode - Foreach region tries to get one row per column family
* and outputs some information about failure or latency. * and outputs some information about failure or latency.
* *
* 2. regionserver mode - Foreach regionserver tries to get one row from one table * 2. regionserver mode - Foreach regionserver tries to get one row from one table
* selected randomly and outputs some information about failure or latency. * selected randomly and outputs some information about failure or latency.
*
* 3. zookeeper mode - for each zookeeper instance, selects a zNode and
* outputs some information about failure or latency.
*/ */
public final class Canary implements Tool { public final class Canary implements Tool {
// Sink interface used by the canary to outputs information // Sink interface used by the canary to outputs information
@ -188,6 +203,55 @@ public final class Canary implements Tool {
} }
} }
public static class ZookeeperStdOutSink extends StdOutSink implements ExtendedSink {
@Override public void publishReadFailure(String zNode, String server) {
incReadFailureCount();
LOG.error(String.format("Read from zNode:%s on zookeeper instance:%s", zNode, server));
}
@Override public void publishReadTiming(String znode, String server, long msTime) {
LOG.info(String.format("Read from zNode:%s on zookeeper instance:%s in %dms",
znode, server, msTime));
}
}
static class ZookeeperTask implements Callable<Void> {
private final Connection connection;
private final String host;
private String znode;
private final int timeout;
private ZookeeperStdOutSink sink;
public ZookeeperTask(Connection connection, String host, String znode, int timeout,
ZookeeperStdOutSink sink) {
this.connection = connection;
this.host = host;
this.znode = znode;
this.timeout = timeout;
this.sink = sink;
}
@Override public Void call() throws Exception {
ZooKeeper zooKeeper = null;
try {
zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance);
Stat exists = zooKeeper.exists(znode, false);
StopWatch stopwatch = new StopWatch();
stopwatch.start();
zooKeeper.getData(znode, false, exists);
stopwatch.stop();
sink.publishReadTiming(znode, host, stopwatch.getTime());
} catch (KeeperException | InterruptedException e) {
sink.publishReadFailure(znode, host);
} finally {
if (zooKeeper != null) {
zooKeeper.close();
}
}
return null;
}
}
/** /**
* For each column family of the region tries to get one row and outputs the latency, or the * For each column family of the region tries to get one row and outputs the latency, or the
* failure. * failure.
@ -462,6 +526,7 @@ public final class Canary implements Tool {
private long timeout = DEFAULT_TIMEOUT; private long timeout = DEFAULT_TIMEOUT;
private boolean failOnError = true; private boolean failOnError = true;
private boolean regionServerMode = false; private boolean regionServerMode = false;
private boolean zookeeperMode = false;
private boolean regionServerAllRegions = false; private boolean regionServerAllRegions = false;
private boolean writeSniffing = false; private boolean writeSniffing = false;
private boolean treatFailureAsError = false; private boolean treatFailureAsError = false;
@ -522,6 +587,8 @@ public final class Canary implements Tool {
System.err.println("-interval needs a numeric value argument."); System.err.println("-interval needs a numeric value argument.");
printUsageAndExit(); printUsageAndExit();
} }
} else if (cmd.equals("-zookeeper")) {
this.zookeeperMode = true;
} else if(cmd.equals("-regionserver")) { } else if(cmd.equals("-regionserver")) {
this.regionServerMode = true; this.regionServerMode = true;
} else if(cmd.equals("-allRegions")) { } else if(cmd.equals("-allRegions")) {
@ -578,6 +645,13 @@ public final class Canary implements Tool {
System.err.println("-allRegions can only be specified in regionserver mode."); System.err.println("-allRegions can only be specified in regionserver mode.");
printUsageAndExit(); printUsageAndExit();
} }
if (this.zookeeperMode) {
if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) {
System.err.println("-zookeeper is exclusive and cannot be combined with "
+ "other modes.");
printUsageAndExit();
}
}
return index; return index;
} }
@ -662,6 +736,8 @@ public final class Canary implements Tool {
System.err.println(" which means to enable regionserver mode"); System.err.println(" which means to enable regionserver mode");
System.err.println(" -allRegions Tries all regions on a regionserver,"); System.err.println(" -allRegions Tries all regions on a regionserver,");
System.err.println(" only works in regionserver mode."); System.err.println(" only works in regionserver mode.");
System.err.println(" -zookeeper Tries to grab zookeeper.znode.parent ");
System.err.println(" on each zookeeper instance");
System.err.println(" -daemon Continuous check at defined intervals."); System.err.println(" -daemon Continuous check at defined intervals.");
System.err.println(" -interval <N> Interval between checks (sec)"); System.err.println(" -interval <N> Interval between checks (sec)");
System.err.println(" -e Use table/regionserver as regular expression"); System.err.println(" -e Use table/regionserver as regular expression");
@ -700,6 +776,10 @@ public final class Canary implements Tool {
new RegionServerMonitor(connection, monitorTargets, this.useRegExp, new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
(ExtendedSink) this.sink, this.executor, this.regionServerAllRegions, (ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
this.treatFailureAsError); this.treatFailureAsError);
} else if (this.zookeeperMode) {
monitor =
new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
(ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
} else { } else {
monitor = monitor =
new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor, new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
@ -1040,6 +1120,62 @@ public final class Canary implements Tool {
} }
return executor.invokeAll(tasks); return executor.invokeAll(tasks);
} }
// monitor for zookeeper mode
private static class ZookeeperMonitor extends Monitor {
private List<String> hosts;
private final String znode;
private final int timeout;
protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
ExtendedSink sink, ExecutorService executor, boolean treatFailureAsError) {
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
Configuration configuration = connection.getConfiguration();
znode =
configuration.get(ZOOKEEPER_ZNODE_PARENT,
DEFAULT_ZOOKEEPER_ZNODE_PARENT);
timeout = configuration
.getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT);
ConnectStringParser parser =
new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration));
hosts = Lists.newArrayList();
for (InetSocketAddress server : parser.getServerAddresses()) {
hosts.add(server.toString());
}
}
@Override public void run() {
List<ZookeeperTask> tasks = Lists.newArrayList();
for (final String host : hosts) {
tasks.add(new ZookeeperTask(connection, host, znode, timeout, getSink()));
}
try {
for (Future<Void> future : this.executor.invokeAll(tasks)) {
try {
future.get();
} catch (ExecutionException e) {
LOG.error("Sniff zookeeper failed!", e);
this.errorCode = ERROR_EXIT_CODE;
}
}
} catch (InterruptedException e) {
this.errorCode = ERROR_EXIT_CODE;
Thread.currentThread().interrupt();
LOG.error("Sniff zookeeper interrupted!", e);
}
this.done = true;
}
private ZookeeperStdOutSink getSink() {
if (!(sink instanceof ZookeeperStdOutSink)) {
throw new RuntimeException("Can only write to zookeeper sink");
}
return ((ZookeeperStdOutSink) sink);
}
}
// a monitor for regionserver mode // a monitor for regionserver mode
private static class RegionServerMonitor extends Monitor { private static class RegionServerMonitor extends Monitor {
@ -1255,7 +1391,7 @@ public final class Canary implements Tool {
new GenericOptionsParser(conf, args); new GenericOptionsParser(conf, args);
int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
LOG.info("Number of exection threads " + numThreads); LOG.info("Number of execution threads " + numThreads);
ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);

View File

@ -31,6 +31,8 @@ import org.apache.hadoop.util.ToolRunner;
import org.apache.log4j.Appender; import org.apache.log4j.Appender;
import org.apache.log4j.LogManager; import org.apache.log4j.LogManager;
import org.apache.log4j.spi.LoggingEvent; import org.apache.log4j.spi.LoggingEvent;
import com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.HConstants;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -40,13 +42,11 @@ import org.mockito.ArgumentMatcher;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import java.util.List;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ScheduledThreadPoolExecutor;
import static org.junit.Assert.*;
import static org.mockito.Matchers.anyLong; import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA; import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
@ -78,6 +78,24 @@ public class TestCanaryTool {
@Mock @Mock
Appender mockAppender; Appender mockAppender;
@Test
public void testBasicZookeeperCanaryWorks() throws Exception {
Integer port =
Iterables.getOnlyElement(testingUtility.getZkCluster().getClientPortList(), null);
testingUtility.getConfiguration().set(HConstants.ZOOKEEPER_QUORUM,
"localhost:" + port + "/hbase");
ExecutorService executor = new ScheduledThreadPoolExecutor(2);
Canary.ZookeeperStdOutSink sink = spy(new Canary.ZookeeperStdOutSink());
Canary canary = new Canary(executor, sink);
String[] args = { "-t", "10000", "-zookeeper" };
ToolRunner.run(testingUtility.getConfiguration(), canary, args);
String baseZnode = testingUtility.getConfiguration()
.get(HConstants.ZOOKEEPER_ZNODE_PARENT, HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT);
verify(sink, atLeastOnce())
.publishReadTiming(eq(baseZnode), eq("localhost:" + port), anyLong());
}
@Test @Test
public void testBasicCanaryWorks() throws Exception { public void testBasicCanaryWorks() throws Exception {
TableName tableName = TableName.valueOf("testTable"); TableName tableName = TableName.valueOf("testTable");