HBASE-13199 Some small improvements on canary tool (Shaohui Liu)

This commit is contained in:
Liu Shaohui 2015-03-19 10:44:35 +08:00
parent 602e11cc74
commit cf7ef936d2
1 changed files with 266 additions and 157 deletions

View File

@ -24,10 +24,17 @@ import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.TreeSet; import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
@ -56,6 +63,8 @@ import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -119,6 +128,174 @@ public final class Canary implements Tool {
} }
} }
/**
* For each column family of the region tries to get one row and outputs the latency, or the
* failure.
*/
static class RegionTask implements Callable<Void> {
private Connection connection;
private HRegionInfo region;
private Sink sink;
RegionTask(Connection connection, HRegionInfo region, Sink sink) {
this.connection = connection;
this.region = region;
this.sink = sink;
}
@Override
public Void call() {
Table table = null;
HTableDescriptor tableDesc = null;
try {
table = connection.getTable(region.getTable());
tableDesc = table.getTableDescriptor();
} catch (IOException e) {
LOG.debug("sniffRegion failed", e);
sink.publishReadFailure(region, e);
return null;
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {
}
}
}
byte[] startKey = null;
Get get = null;
Scan scan = null;
ResultScanner rs = null;
StopWatch stopWatch = new StopWatch();
for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
stopWatch.reset();
startKey = region.getStartKey();
// Can't do a get on empty start row so do a Scan of first element if any instead.
if (startKey.length > 0) {
get = new Get(startKey);
get.setCacheBlocks(false);
get.setFilter(new FirstKeyOnlyFilter());
get.addFamily(column.getName());
} else {
scan = new Scan();
scan.setCaching(1);
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());
scan.addFamily(column.getName());
scan.setMaxResultSize(1L);
}
try {
if (startKey.length > 0) {
stopWatch.start();
table.get(get);
stopWatch.stop();
sink.publishReadTiming(region, column, stopWatch.getTime());
} else {
stopWatch.start();
rs = table.getScanner(scan);
stopWatch.stop();
sink.publishReadTiming(region, column, stopWatch.getTime());
}
} catch (Exception e) {
sink.publishReadFailure(region, column, e);
} finally {
if (rs != null) {
rs.close();
}
if (table != null) {
try {
table.close();
} catch (IOException e) {
}
}
scan = null;
get = null;
startKey = null;
}
}
return null;
}
}
/**
* Get one row from a region on the regionserver and outputs the latency, or the failure.
*/
static class RegionServerTask implements Callable<Void> {
private Connection connection;
private String serverName;
private HRegionInfo region;
private ExtendedSink sink;
RegionServerTask(Connection connection, String serverName, HRegionInfo region,
ExtendedSink sink) {
this.connection = connection;
this.serverName = serverName;
this.region = region;
this.sink = sink;
}
@Override
public Void call() {
TableName tableName = null;
Table table = null;
Get get = null;
byte[] startKey = null;
Scan scan = null;
StopWatch stopWatch = new StopWatch();
// monitor one region on every region server
stopWatch.reset();
try {
tableName = region.getTable();
table = connection.getTable(tableName);
startKey = region.getStartKey();
// Can't do a get on empty start row so do a Scan of first element if any instead.
if (startKey.length > 0) {
get = new Get(startKey);
get.setCacheBlocks(false);
get.setFilter(new FirstKeyOnlyFilter());
stopWatch.start();
table.get(get);
stopWatch.stop();
} else {
scan = new Scan();
scan.setCacheBlocks(false);
scan.setFilter(new FirstKeyOnlyFilter());
scan.setCaching(1);
scan.setMaxResultSize(1L);
stopWatch.start();
ResultScanner s = table.getScanner(scan);
s.close();
stopWatch.stop();
}
sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
} catch (TableNotFoundException tnfe) {
// This is ignored because it doesn't imply that the regionserver is dead
} catch (TableNotEnabledException tnee) {
// This is considered a success since we got a response.
LOG.debug("The targeted table was disabled. Assuming success.");
} catch (DoNotRetryIOException dnrioe) {
sink.publishReadFailure(tableName.getNameAsString(), serverName);
LOG.error(dnrioe);
} catch (IOException e) {
sink.publishReadFailure(tableName.getNameAsString(), serverName);
LOG.error(e);
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {/* DO NOTHING */
}
}
scan = null;
get = null;
startKey = null;
}
return null;
}
}
private static final int USAGE_EXIT_CODE = 1; private static final int USAGE_EXIT_CODE = 1;
private static final int INIT_ERROR_EXIT_CODE = 2; private static final int INIT_ERROR_EXIT_CODE = 2;
private static final int TIMEOUT_ERROR_EXIT_CODE = 3; private static final int TIMEOUT_ERROR_EXIT_CODE = 3;
@ -128,6 +305,8 @@ public final class Canary implements Tool {
private static final long DEFAULT_TIMEOUT = 600000; // 10 mins private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
private static final Log LOG = LogFactory.getLog(Canary.class); private static final Log LOG = LogFactory.getLog(Canary.class);
private Configuration conf = null; private Configuration conf = null;
@ -138,12 +317,14 @@ public final class Canary implements Tool {
private long timeout = DEFAULT_TIMEOUT; private long timeout = DEFAULT_TIMEOUT;
private boolean failOnError = true; private boolean failOnError = true;
private boolean regionServerMode = false; private boolean regionServerMode = false;
private ExecutorService executor; // threads to retrieve data from regionservers
public Canary() { public Canary() {
this(new RegionServerStdOutSink()); this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink());
} }
public Canary(Sink sink) { public Canary(ExecutorService executor, Sink sink) {
this.executor = executor;
this.sink = sink; this.sink = sink;
} }
@ -326,13 +507,12 @@ public final class Canary implements Tool {
} }
if (this.regionServerMode) { if (this.regionServerMode) {
monitor = new RegionServerMonitor( monitor =
connection, new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
monitorTargets, (ExtendedSink) this.sink, this.executor);
this.useRegExp,
(ExtendedSink)this.sink);
} else { } else {
monitor = new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink); monitor =
new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor);
} }
return monitor; return monitor;
} }
@ -349,6 +529,7 @@ public final class Canary implements Tool {
protected boolean done = false; protected boolean done = false;
protected int errorCode = 0; protected int errorCode = 0;
protected Sink sink; protected Sink sink;
protected ExecutorService executor;
public boolean isDone() { public boolean isDone() {
return done; return done;
@ -363,14 +544,15 @@ public final class Canary implements Tool {
if (this.admin != null) this.admin.close(); if (this.admin != null) this.admin.close();
} }
protected Monitor(Connection connection, String[] monitorTargets, protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink,
boolean useRegExp, Sink sink) { ExecutorService executor) {
if (null == connection) throw new IllegalArgumentException("connection shall not be null"); if (null == connection) throw new IllegalArgumentException("connection shall not be null");
this.connection = connection; this.connection = connection;
this.targets = monitorTargets; this.targets = monitorTargets;
this.useRegExp = useRegExp; this.useRegExp = useRegExp;
this.sink = sink; this.sink = sink;
this.executor = executor;
} }
public abstract void run(); public abstract void run();
@ -394,23 +576,31 @@ public final class Canary implements Tool {
// a monitor for region mode // a monitor for region mode
private static class RegionMonitor extends Monitor { private static class RegionMonitor extends Monitor {
public RegionMonitor(Connection connection, String[] monitorTargets, public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
boolean useRegExp, Sink sink) { Sink sink, ExecutorService executor) {
super(connection, monitorTargets, useRegExp, sink); super(connection, monitorTargets, useRegExp, sink, executor);
} }
@Override @Override
public void run() { public void run() {
if (this.initAdmin()) { if (this.initAdmin()) {
try { try {
List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
if (this.targets != null && this.targets.length > 0) { if (this.targets != null && this.targets.length > 0) {
String[] tables = generateMonitorTables(this.targets); String[] tables = generateMonitorTables(this.targets);
this.initialized = true; this.initialized = true;
for (String table : tables) { for (String table : tables) {
Canary.sniff(admin, sink, table); taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
} }
} else { } else {
sniff(); taskFutures.addAll(sniff());
}
for (Future<Void> future : taskFutures) {
try {
future.get();
} catch (ExecutionException e) {
LOG.error("Sniff region failed!", e);
}
} }
} catch (Exception e) { } catch (Exception e) {
LOG.error("Run regionMonitor failed", e); LOG.error("Run regionMonitor failed", e);
@ -445,8 +635,7 @@ public final class Canary implements Tool {
if (tmpTables.size() > 0) { if (tmpTables.size() > 0) {
returnTables = tmpTables.toArray(new String[tmpTables.size()]); returnTables = tmpTables.toArray(new String[tmpTables.size()]);
} else { } else {
String msg = "No HTable found, tablePattern:" String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets);
+ Arrays.toString(monitorTargets);
LOG.error(msg); LOG.error(msg);
this.errorCode = INIT_ERROR_EXIT_CODE; this.errorCode = INIT_ERROR_EXIT_CODE;
throw new TableNotFoundException(msg); throw new TableNotFoundException(msg);
@ -461,12 +650,15 @@ public final class Canary implements Tool {
/* /*
* canary entry point to monitor all the tables. * canary entry point to monitor all the tables.
*/ */
private void sniff() throws Exception { private List<Future<Void>> sniff() throws Exception {
List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
for (HTableDescriptor table : admin.listTables()) { for (HTableDescriptor table : admin.listTables()) {
Canary.sniff(admin, sink, table); if (admin.isTableEnabled(table.getTableName())) {
taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
} }
} }
return taskFutures;
}
} }
/** /**
@ -474,108 +666,56 @@ public final class Canary implements Tool {
* @throws Exception * @throws Exception
*/ */
public static void sniff(final Admin admin, TableName tableName) throws Exception { public static void sniff(final Admin admin, TableName tableName) throws Exception {
sniff(admin, new StdOutSink(), tableName.getNameAsString()); List<Future<Void>> taskFutures =
Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
new ScheduledThreadPoolExecutor(1));
for (Future<Void> future : taskFutures) {
future.get();
}
} }
/** /**
* Canary entry point for specified table. * Canary entry point for specified table.
* @throws Exception * @throws Exception
*/ */
private static void sniff(final Admin admin, final Sink sink, String tableName) private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
throws Exception { ExecutorService executor) throws Exception {
if (admin.isTableAvailable(TableName.valueOf(tableName))) { if (admin.isTableEnabled(TableName.valueOf(tableName))) {
sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName))); return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
executor);
} else { } else {
LOG.warn(String.format("Table %s is not available", tableName)); LOG.warn(String.format("Table %s is not enabled", tableName));
} }
return new LinkedList<Future<Void>>();
} }
/* /*
* Loops over regions that owns this table, and output some information abouts the state. * Loops over regions that owns this table, and output some information abouts the state.
*/ */
private static void sniff(final Admin admin, final Sink sink, HTableDescriptor tableDesc) private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
throws Exception { HTableDescriptor tableDesc, ExecutorService executor) throws Exception {
Table table = null; Table table = null;
try { try {
table = admin.getConnection().getTable(tableDesc.getTableName()); table = admin.getConnection().getTable(tableDesc.getTableName());
} catch (TableNotFoundException e) { } catch (TableNotFoundException e) {
return; return new ArrayList<Future<Void>>();
} }
List<RegionTask> tasks = new ArrayList<RegionTask>();
try { try {
for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) { for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
try { tasks.add(new RegionTask(admin.getConnection(), region, sink));
sniffRegion(admin, sink, region, table);
} catch (Exception e) {
sink.publishReadFailure(region, e);
LOG.debug("sniffRegion failed", e);
}
} }
} finally { } finally {
table.close(); table.close();
} }
} return executor.invokeAll(tasks);
/*
* For each column family of the region tries to get one row and outputs the latency, or the
* failure.
*/
private static void sniffRegion(
final Admin admin,
final Sink sink,
HRegionInfo region,
Table table) throws Exception {
HTableDescriptor tableDesc = table.getTableDescriptor();
byte[] startKey = null;
Get get = null;
Scan scan = null;
ResultScanner rs = null;
StopWatch stopWatch = new StopWatch();
for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
stopWatch.reset();
startKey = region.getStartKey();
// Can't do a get on empty start row so do a Scan of first element if any instead.
if (startKey.length > 0) {
get = new Get(startKey);
get.addFamily(column.getName());
} else {
scan = new Scan();
scan.setCaching(1);
scan.addFamily(column.getName());
scan.setMaxResultSize(1L);
}
try {
if (startKey.length > 0) {
stopWatch.start();
table.get(get);
stopWatch.stop();
sink.publishReadTiming(region, column, stopWatch.getTime());
} else {
stopWatch.start();
rs = table.getScanner(scan);
stopWatch.stop();
sink.publishReadTiming(region, column, stopWatch.getTime());
}
} catch (Exception e) {
sink.publishReadFailure(region, column, e);
} finally {
if (rs != null) {
rs.close();
}
scan = null;
get = null;
startKey = null;
}
}
} }
// a monitor for regionserver mode // a monitor for regionserver mode
private static class RegionServerMonitor extends Monitor { private static class RegionServerMonitor extends Monitor {
public RegionServerMonitor(Connection connection, String[] monitorTargets, public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
boolean useRegExp, ExtendedSink sink) { ExtendedSink sink, ExecutorService executor) {
super(connection, monitorTargets, useRegExp, sink); super(connection, monitorTargets, useRegExp, sink, executor);
} }
private ExtendedSink getSink() { private ExtendedSink getSink() {
@ -623,64 +763,27 @@ public final class Canary implements Tool {
} }
private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) { private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
String serverName = null; List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
TableName tableName = null; Random rand =new Random();
HRegionInfo region = null;
Table table = null;
Get get = null;
byte[] startKey = null;
Scan scan = null;
StopWatch stopWatch = new StopWatch();
// monitor one region on every region server // monitor one region on every region server
for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) { for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
stopWatch.reset(); String serverName = entry.getKey();
serverName = entry.getKey(); // random select a region
// always get the first region HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size()));
region = entry.getValue().get(0); tasks.add(new RegionServerTask(this.connection, serverName, region, getSink()));
try {
tableName = region.getTable();
table = admin.getConnection().getTable(tableName);
startKey = region.getStartKey();
// Can't do a get on empty start row so do a Scan of first element if any instead.
if(startKey.length > 0) {
get = new Get(startKey);
stopWatch.start();
table.get(get);
stopWatch.stop();
} else {
scan = new Scan();
scan.setCaching(1);
scan.setMaxResultSize(1L);
stopWatch.start();
ResultScanner s = table.getScanner(scan);
s.close();
stopWatch.stop();
} }
this.getSink().publishReadTiming(tableName.getNameAsString(), try {
serverName, stopWatch.getTime()); for (Future<Void> future : this.executor.invokeAll(tasks)) {
} catch (TableNotFoundException tnfe) { try {
// This is ignored because it doesn't imply that the regionserver is dead future.get();
} catch (TableNotEnabledException tnee) { } catch (ExecutionException e) {
// This is considered a success since we got a response. LOG.error("Sniff regionserver failed!", e);
LOG.debug("The targeted table was disabled. Assuming success.");
} catch (DoNotRetryIOException dnrioe) {
this.getSink().publishReadFailure(tableName.getNameAsString(), serverName);
LOG.error(dnrioe);
} catch (IOException e) {
this.getSink().publishReadFailure(tableName.getNameAsString(), serverName);
LOG.error(e);
this.errorCode = ERROR_EXIT_CODE; this.errorCode = ERROR_EXIT_CODE;
} finally {
if (table != null) {
try {
table.close();
} catch (IOException e) {/* DO NOTHING */
} }
} }
scan = null; } catch (InterruptedException e) {
get = null; this.errorCode = ERROR_EXIT_CODE;
startKey = null; LOG.error("Sniff regionserver failed!", e);
}
} }
} }
@ -780,10 +883,16 @@ public final class Canary implements Tool {
if (authChore != null) { if (authChore != null) {
choreService.scheduleChore(authChore); choreService.scheduleChore(authChore);
} }
int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM);
ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads);
int exitCode = ToolRunner.run(conf, new Canary(), args); Class<? extends Sink> sinkClass =
conf.getClass("hbase.canary.sink.class", StdOutSink.class, Sink.class);
Sink sink = ReflectionUtils.newInstance(sinkClass);
int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args);
choreService.shutdown(); choreService.shutdown();
executor.shutdown();
System.exit(exitCode); System.exit(exitCode);
} }
} }