From cf7ef936d20b98482a48926a194772c66d3234c1 Mon Sep 17 00:00:00 2001 From: Liu Shaohui Date: Thu, 19 Mar 2015 10:44:35 +0800 Subject: [PATCH] HBASE-13199 Some small improvements on canary tool (Shaohui Liu) --- .../org/apache/hadoop/hbase/tool/Canary.java | 423 +++++++++++------- 1 file changed, 266 insertions(+), 157 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index 309a1c2751d..8efb73db202 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -24,10 +24,17 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; +import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -56,6 +63,8 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; @@ -119,6 +128,174 @@ public final class Canary implements Tool { } } + /** + * For each column family of the region tries to get one row and outputs the latency, or the + * failure. + */ + static class RegionTask implements Callable { + private Connection connection; + private HRegionInfo region; + private Sink sink; + + RegionTask(Connection connection, HRegionInfo region, Sink sink) { + this.connection = connection; + this.region = region; + this.sink = sink; + } + + @Override + public Void call() { + Table table = null; + HTableDescriptor tableDesc = null; + try { + table = connection.getTable(region.getTable()); + tableDesc = table.getTableDescriptor(); + } catch (IOException e) { + LOG.debug("sniffRegion failed", e); + sink.publishReadFailure(region, e); + return null; + } finally { + if (table != null) { + try { + table.close(); + } catch (IOException e) { + } + } + } + + byte[] startKey = null; + Get get = null; + Scan scan = null; + ResultScanner rs = null; + StopWatch stopWatch = new StopWatch(); + for (HColumnDescriptor column : tableDesc.getColumnFamilies()) { + stopWatch.reset(); + startKey = region.getStartKey(); + // Can't do a get on empty start row so do a Scan of first element if any instead. + if (startKey.length > 0) { + get = new Get(startKey); + get.setCacheBlocks(false); + get.setFilter(new FirstKeyOnlyFilter()); + get.addFamily(column.getName()); + } else { + scan = new Scan(); + scan.setCaching(1); + scan.setCacheBlocks(false); + scan.setFilter(new FirstKeyOnlyFilter()); + scan.addFamily(column.getName()); + scan.setMaxResultSize(1L); + } + + try { + if (startKey.length > 0) { + stopWatch.start(); + table.get(get); + stopWatch.stop(); + sink.publishReadTiming(region, column, stopWatch.getTime()); + } else { + stopWatch.start(); + rs = table.getScanner(scan); + stopWatch.stop(); + sink.publishReadTiming(region, column, stopWatch.getTime()); + } + } catch (Exception e) { + sink.publishReadFailure(region, column, e); + } finally { + if (rs != null) { + rs.close(); + } + if (table != null) { + try { + table.close(); + } catch (IOException e) { + } + } + scan = null; + get = null; + startKey = null; + } + } + return null; + } + } + + /** + * Get one row from a region on the regionserver and outputs the latency, or the failure. + */ + static class RegionServerTask implements Callable { + private Connection connection; + private String serverName; + private HRegionInfo region; + private ExtendedSink sink; + + RegionServerTask(Connection connection, String serverName, HRegionInfo region, + ExtendedSink sink) { + this.connection = connection; + this.serverName = serverName; + this.region = region; + this.sink = sink; + } + + @Override + public Void call() { + TableName tableName = null; + Table table = null; + Get get = null; + byte[] startKey = null; + Scan scan = null; + StopWatch stopWatch = new StopWatch(); + // monitor one region on every region server + stopWatch.reset(); + try { + tableName = region.getTable(); + table = connection.getTable(tableName); + startKey = region.getStartKey(); + // Can't do a get on empty start row so do a Scan of first element if any instead. + if (startKey.length > 0) { + get = new Get(startKey); + get.setCacheBlocks(false); + get.setFilter(new FirstKeyOnlyFilter()); + stopWatch.start(); + table.get(get); + stopWatch.stop(); + } else { + scan = new Scan(); + scan.setCacheBlocks(false); + scan.setFilter(new FirstKeyOnlyFilter()); + scan.setCaching(1); + scan.setMaxResultSize(1L); + stopWatch.start(); + ResultScanner s = table.getScanner(scan); + s.close(); + stopWatch.stop(); + } + sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); + } catch (TableNotFoundException tnfe) { + // This is ignored because it doesn't imply that the regionserver is dead + } catch (TableNotEnabledException tnee) { + // This is considered a success since we got a response. + LOG.debug("The targeted table was disabled. Assuming success."); + } catch (DoNotRetryIOException dnrioe) { + sink.publishReadFailure(tableName.getNameAsString(), serverName); + LOG.error(dnrioe); + } catch (IOException e) { + sink.publishReadFailure(tableName.getNameAsString(), serverName); + LOG.error(e); + } finally { + if (table != null) { + try { + table.close(); + } catch (IOException e) {/* DO NOTHING */ + } + } + scan = null; + get = null; + startKey = null; + } + return null; + } + } + private static final int USAGE_EXIT_CODE = 1; private static final int INIT_ERROR_EXIT_CODE = 2; private static final int TIMEOUT_ERROR_EXIT_CODE = 3; @@ -128,6 +305,8 @@ public final class Canary implements Tool { private static final long DEFAULT_TIMEOUT = 600000; // 10 mins + private static final int MAX_THREADS_NUM = 16; // #threads to contact regions + private static final Log LOG = LogFactory.getLog(Canary.class); private Configuration conf = null; @@ -138,12 +317,14 @@ public final class Canary implements Tool { private long timeout = DEFAULT_TIMEOUT; private boolean failOnError = true; private boolean regionServerMode = false; + private ExecutorService executor; // threads to retrieve data from regionservers public Canary() { - this(new RegionServerStdOutSink()); + this(new ScheduledThreadPoolExecutor(1), new RegionServerStdOutSink()); } - public Canary(Sink sink) { + public Canary(ExecutorService executor, Sink sink) { + this.executor = executor; this.sink = sink; } @@ -325,14 +506,13 @@ public final class Canary implements Tool { System.arraycopy(args, index, monitorTargets, 0, length); } - if(this.regionServerMode) { - monitor = new RegionServerMonitor( - connection, - monitorTargets, - this.useRegExp, - (ExtendedSink)this.sink); + if (this.regionServerMode) { + monitor = + new RegionServerMonitor(connection, monitorTargets, this.useRegExp, + (ExtendedSink) this.sink, this.executor); } else { - monitor = new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink); + monitor = + new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor); } return monitor; } @@ -349,6 +529,7 @@ public final class Canary implements Tool { protected boolean done = false; protected int errorCode = 0; protected Sink sink; + protected ExecutorService executor; public boolean isDone() { return done; @@ -363,14 +544,15 @@ public final class Canary implements Tool { if (this.admin != null) this.admin.close(); } - protected Monitor(Connection connection, String[] monitorTargets, - boolean useRegExp, Sink sink) { + protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, + ExecutorService executor) { if (null == connection) throw new IllegalArgumentException("connection shall not be null"); this.connection = connection; this.targets = monitorTargets; this.useRegExp = useRegExp; this.sink = sink; + this.executor = executor; } public abstract void run(); @@ -394,23 +576,31 @@ public final class Canary implements Tool { // a monitor for region mode private static class RegionMonitor extends Monitor { - public RegionMonitor(Connection connection, String[] monitorTargets, - boolean useRegExp, Sink sink) { - super(connection, monitorTargets, useRegExp, sink); + public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, + Sink sink, ExecutorService executor) { + super(connection, monitorTargets, useRegExp, sink, executor); } @Override public void run() { - if(this.initAdmin()) { + if (this.initAdmin()) { try { + List> taskFutures = new LinkedList>(); if (this.targets != null && this.targets.length > 0) { String[] tables = generateMonitorTables(this.targets); this.initialized = true; for (String table : tables) { - Canary.sniff(admin, sink, table); + taskFutures.addAll(Canary.sniff(admin, sink, table, executor)); } } else { - sniff(); + taskFutures.addAll(sniff()); + } + for (Future future : taskFutures) { + try { + future.get(); + } catch (ExecutionException e) { + LOG.error("Sniff region failed!", e); + } } } catch (Exception e) { LOG.error("Run regionMonitor failed", e); @@ -423,7 +613,7 @@ public final class Canary implements Tool { private String[] generateMonitorTables(String[] monitorTargets) throws IOException { String[] returnTables = null; - if(this.useRegExp) { + if (this.useRegExp) { Pattern pattern = null; HTableDescriptor[] tds = null; Set tmpTables = new TreeSet(); @@ -437,16 +627,15 @@ public final class Canary implements Tool { } } } - } catch(IOException e) { + } catch (IOException e) { LOG.error("Communicate with admin failed", e); throw e; } - if(tmpTables.size() > 0) { + if (tmpTables.size() > 0) { returnTables = tmpTables.toArray(new String[tmpTables.size()]); } else { - String msg = "No HTable found, tablePattern:" - + Arrays.toString(monitorTargets); + String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); LOG.error(msg); this.errorCode = INIT_ERROR_EXIT_CODE; throw new TableNotFoundException(msg); @@ -461,12 +650,15 @@ public final class Canary implements Tool { /* * canary entry point to monitor all the tables. */ - private void sniff() throws Exception { + private List> sniff() throws Exception { + List> taskFutures = new LinkedList>(); for (HTableDescriptor table : admin.listTables()) { - Canary.sniff(admin, sink, table); + if (admin.isTableEnabled(table.getTableName())) { + taskFutures.addAll(Canary.sniff(admin, sink, table, executor)); + } } + return taskFutures; } - } /** @@ -474,108 +666,56 @@ public final class Canary implements Tool { * @throws Exception */ public static void sniff(final Admin admin, TableName tableName) throws Exception { - sniff(admin, new StdOutSink(), tableName.getNameAsString()); + List> taskFutures = + Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(), + new ScheduledThreadPoolExecutor(1)); + for (Future future : taskFutures) { + future.get(); + } } /** * Canary entry point for specified table. * @throws Exception */ - private static void sniff(final Admin admin, final Sink sink, String tableName) - throws Exception { - if (admin.isTableAvailable(TableName.valueOf(tableName))) { - sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName))); + private static List> sniff(final Admin admin, final Sink sink, String tableName, + ExecutorService executor) throws Exception { + if (admin.isTableEnabled(TableName.valueOf(tableName))) { + return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)), + executor); } else { - LOG.warn(String.format("Table %s is not available", tableName)); + LOG.warn(String.format("Table %s is not enabled", tableName)); } + return new LinkedList>(); } /* * Loops over regions that owns this table, and output some information abouts the state. */ - private static void sniff(final Admin admin, final Sink sink, HTableDescriptor tableDesc) - throws Exception { + private static List> sniff(final Admin admin, final Sink sink, + HTableDescriptor tableDesc, ExecutorService executor) throws Exception { Table table = null; - try { table = admin.getConnection().getTable(tableDesc.getTableName()); } catch (TableNotFoundException e) { - return; + return new ArrayList>(); } - + List tasks = new ArrayList(); try { for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) { - try { - sniffRegion(admin, sink, region, table); - } catch (Exception e) { - sink.publishReadFailure(region, e); - LOG.debug("sniffRegion failed", e); - } + tasks.add(new RegionTask(admin.getConnection(), region, sink)); } } finally { table.close(); } + return executor.invokeAll(tasks); } - - /* - * For each column family of the region tries to get one row and outputs the latency, or the - * failure. - */ - private static void sniffRegion( - final Admin admin, - final Sink sink, - HRegionInfo region, - Table table) throws Exception { - HTableDescriptor tableDesc = table.getTableDescriptor(); - byte[] startKey = null; - Get get = null; - Scan scan = null; - ResultScanner rs = null; - StopWatch stopWatch = new StopWatch(); - for (HColumnDescriptor column : tableDesc.getColumnFamilies()) { - stopWatch.reset(); - startKey = region.getStartKey(); - // Can't do a get on empty start row so do a Scan of first element if any instead. - if (startKey.length > 0) { - get = new Get(startKey); - get.addFamily(column.getName()); - } else { - scan = new Scan(); - scan.setCaching(1); - scan.addFamily(column.getName()); - scan.setMaxResultSize(1L); - } - - try { - if (startKey.length > 0) { - stopWatch.start(); - table.get(get); - stopWatch.stop(); - sink.publishReadTiming(region, column, stopWatch.getTime()); - } else { - stopWatch.start(); - rs = table.getScanner(scan); - stopWatch.stop(); - sink.publishReadTiming(region, column, stopWatch.getTime()); - } - } catch (Exception e) { - sink.publishReadFailure(region, column, e); - } finally { - if (rs != null) { - rs.close(); - } - scan = null; - get = null; - startKey = null; - } - } - } - //a monitor for regionserver mode + // a monitor for regionserver mode private static class RegionServerMonitor extends Monitor { - public RegionServerMonitor(Connection connection, String[] monitorTargets, - boolean useRegExp, ExtendedSink sink) { - super(connection, monitorTargets, useRegExp, sink); + public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, + ExtendedSink sink, ExecutorService executor) { + super(connection, monitorTargets, useRegExp, sink, executor); } private ExtendedSink getSink() { @@ -623,64 +763,27 @@ public final class Canary implements Tool { } private void monitorRegionServers(Map> rsAndRMap) { - String serverName = null; - TableName tableName = null; - HRegionInfo region = null; - Table table = null; - Get get = null; - byte[] startKey = null; - Scan scan = null; - StopWatch stopWatch = new StopWatch(); + List tasks = new ArrayList(); + Random rand =new Random(); // monitor one region on every region server for (Map.Entry> entry : rsAndRMap.entrySet()) { - stopWatch.reset(); - serverName = entry.getKey(); - // always get the first region - region = entry.getValue().get(0); - try { - tableName = region.getTable(); - table = admin.getConnection().getTable(tableName); - startKey = region.getStartKey(); - // Can't do a get on empty start row so do a Scan of first element if any instead. - if(startKey.length > 0) { - get = new Get(startKey); - stopWatch.start(); - table.get(get); - stopWatch.stop(); - } else { - scan = new Scan(); - scan.setCaching(1); - scan.setMaxResultSize(1L); - stopWatch.start(); - ResultScanner s = table.getScanner(scan); - s.close(); - stopWatch.stop(); + String serverName = entry.getKey(); + // random select a region + HRegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size())); + tasks.add(new RegionServerTask(this.connection, serverName, region, getSink())); + } + try { + for (Future future : this.executor.invokeAll(tasks)) { + try { + future.get(); + } catch (ExecutionException e) { + LOG.error("Sniff regionserver failed!", e); + this.errorCode = ERROR_EXIT_CODE; } - this.getSink().publishReadTiming(tableName.getNameAsString(), - serverName, stopWatch.getTime()); - } catch (TableNotFoundException tnfe) { - // This is ignored because it doesn't imply that the regionserver is dead - } catch (TableNotEnabledException tnee) { - // This is considered a success since we got a response. - LOG.debug("The targeted table was disabled. Assuming success."); - } catch (DoNotRetryIOException dnrioe) { - this.getSink().publishReadFailure(tableName.getNameAsString(), serverName); - LOG.error(dnrioe); - } catch (IOException e) { - this.getSink().publishReadFailure(tableName.getNameAsString(), serverName); - LOG.error(e); - this.errorCode = ERROR_EXIT_CODE; - } finally { - if (table != null) { - try { - table.close(); - } catch (IOException e) {/* DO NOTHING */ - } - } - scan = null; - get = null; - startKey = null; } + } catch (InterruptedException e) { + this.errorCode = ERROR_EXIT_CODE; + LOG.error("Sniff regionserver failed!", e); } } @@ -701,7 +804,7 @@ public final class Canary implements Tool { table = this.admin.getConnection().getTable(tableDesc.getTableName()); regionLocator = this.admin.getConnection().getRegionLocator(tableDesc.getTableName()); - for (HRegionLocation location: regionLocator.getAllRegionLocations()) { + for (HRegionLocation location : regionLocator.getAllRegionLocations()) { ServerName rs = location.getServerName(); String rsName = rs.getHostname(); HRegionInfo r = location.getRegionInfo(); @@ -748,7 +851,7 @@ public final class Canary implements Tool { if (this.useRegExp) { regExpFound = false; pattern = Pattern.compile(rsName); - for (Map.Entry> entry : fullRsAndRMap.entrySet()) { + for (Map.Entry> entry : fullRsAndRMap.entrySet()) { matcher = pattern.matcher(entry.getKey()); if (matcher.matches()) { filteredRsAndRMap.put(entry.getKey(), entry.getValue()); @@ -780,10 +883,16 @@ public final class Canary implements Tool { if (authChore != null) { choreService.scheduleChore(authChore); } + int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); + ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); - int exitCode = ToolRunner.run(conf, new Canary(), args); + Class sinkClass = + conf.getClass("hbase.canary.sink.class", StdOutSink.class, Sink.class); + Sink sink = ReflectionUtils.newInstance(sinkClass); + int exitCode = ToolRunner.run(conf, new Canary(executor, sink), args); choreService.shutdown(); + executor.shutdown(); System.exit(exitCode); } }