From 5aa8d3a20b6e540f4ac361e70690d57e579ea061 Mon Sep 17 00:00:00 2001 From: Rushabh Date: Tue, 20 Aug 2019 12:40:49 -0700 Subject: [PATCH] HBASE-22874 Define a public API for Canary checking and a non-public tool implementation Closes #580 * Canary is now an IA.Public interface * CanaryTool is now the implementation Signed-off-by: Sean Busbey --- bin/hbase | 2 +- .../hbase/tmpl/master/MasterStatusTmpl.jamon | 4 +- .../org/apache/hadoop/hbase/tool/Canary.java | 1764 +--------------- .../apache/hadoop/hbase/tool/CanaryTool.java | 1866 +++++++++++++++++ .../hadoop/hbase/tool/TestCanaryTool.java | 39 +- 5 files changed, 1911 insertions(+), 1764 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/tool/CanaryTool.java diff --git a/bin/hbase b/bin/hbase index aeba22a5775..a69ad6d1adf 100755 --- a/bin/hbase +++ b/bin/hbase @@ -643,7 +643,7 @@ elif [ "$COMMAND" = "ltt" ] ; then CLASS='org.apache.hadoop.hbase.util.LoadTestTool' HBASE_OPTS="$HBASE_OPTS $HBASE_LTT_OPTS" elif [ "$COMMAND" = "canary" ] ; then - CLASS='org.apache.hadoop.hbase.tool.Canary' + CLASS='org.apache.hadoop.hbase.tool.CanaryTool' HBASE_OPTS="$HBASE_OPTS $HBASE_CANARY_OPTS" elif [ "$COMMAND" = "version" ] ; then CLASS='org.apache.hadoop.hbase.util.VersionInfo' diff --git a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon index e5541a7944b..aac6bb73dd3 100644 --- a/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon +++ b/hbase-server/src/main/jamon/org/apache/hadoop/hbase/tmpl/master/MasterStatusTmpl.jamon @@ -55,7 +55,7 @@ org.apache.hadoop.hbase.quotas.QuotaUtil; org.apache.hadoop.hbase.security.access.PermissionStorage; org.apache.hadoop.hbase.security.visibility.VisibilityConstants; org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; -org.apache.hadoop.hbase.tool.Canary; +org.apache.hadoop.hbase.tool.CanaryTool; org.apache.hadoop.hbase.util.Bytes; org.apache.hadoop.hbase.util.FSUtils; org.apache.hadoop.hbase.util.JvmVersion; @@ -513,7 +513,7 @@ AssignmentManager assignmentManager = master.getAssignmentManager(); <%java>String description = null; if (tableName.equals(TableName.META_TABLE_NAME)){ description = "The hbase:meta table holds references to all User Table regions."; - } else if (tableName.equals(Canary.DEFAULT_WRITE_TABLE_NAME)){ + } else if (tableName.equals(CanaryTool.DEFAULT_WRITE_TABLE_NAME)){ description = "The hbase:canary table is used to sniff the write availbility of" + " each regionserver."; } else if (tableName.equals(PermissionStorage.ACL_TABLE_NAME)){ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java index 3cff2b7ef4f..b49d2cebede 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/Canary.java @@ -19,1765 +19,49 @@ package org.apache.hadoop.hbase.tool; -import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; -import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; -import java.io.Closeable; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; import java.util.Map; -import java.util.Random; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.LongAdder; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.commons.lang3.time.StopWatch; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.AuthUtil; -import org.apache.hadoop.hbase.ChoreService; -import org.apache.hadoop.hbase.ClusterMetrics; -import org.apache.hadoop.hbase.ClusterMetrics.Option; -import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.MetaTableAccessor; -import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.ScheduledChore; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.TableNotEnabledException; -import org.apache.hadoop.hbase.TableNotFoundException; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Get; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RegionLocator; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; -import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.hadoop.hbase.util.RegionSplitter; -import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; -import org.apache.hadoop.hbase.zookeeper.ZKConfig; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.util.Tool; -import org.apache.hadoop.util.ToolRunner; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.client.ConnectStringParser; -import org.apache.zookeeper.data.Stat; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.yetus.audience.InterfaceAudience; -/** - * HBase Canary Tool for "canary monitoring" of a running HBase cluster. - * - * There are three modes: - *
    - *
  1. region mode (Default): For each region, try to get one row per column family outputting - * information on failure (ERROR) or else the latency. - *
  2. - * - *
  3. regionserver mode: For each regionserver try to get one row from one table selected - * randomly outputting information on failure (ERROR) or else the latency. - *
  4. - * - *
  5. zookeeper mode: for each zookeeper instance, selects a znode outputting information on - * failure (ERROR) or else the latency. - *
  6. - *
- */ -@InterfaceAudience.Private -public final class Canary implements Tool { - /** - * Sink interface used by the canary to output information - */ - public interface Sink { - long getReadFailureCount(); - long incReadFailureCount(); - Map getReadFailures(); - void updateReadFailures(String regionName, String serverName); - long getWriteFailureCount(); - long incWriteFailureCount(); - Map getWriteFailures(); - void updateWriteFailures(String regionName, String serverName); - long getReadSuccessCount(); - long incReadSuccessCount(); - long getWriteSuccessCount(); - long incWriteSuccessCount(); - } +@InterfaceAudience.Public +public interface Canary { - /** - * Simple implementation of canary sink that allows plotting to a file or standard output. - */ - public static class StdOutSink implements Sink { - private AtomicLong readFailureCount = new AtomicLong(0), - writeFailureCount = new AtomicLong(0), - readSuccessCount = new AtomicLong(0), - writeSuccessCount = new AtomicLong(0); - private Map readFailures = new ConcurrentHashMap<>(); - private Map writeFailures = new ConcurrentHashMap<>(); - - @Override - public long getReadFailureCount() { - return readFailureCount.get(); - } - - @Override - public long incReadFailureCount() { - return readFailureCount.incrementAndGet(); - } - - @Override - public Map getReadFailures() { - return readFailures; - } - - @Override - public void updateReadFailures(String regionName, String serverName) { - readFailures.put(regionName, serverName); - } - - @Override - public long getWriteFailureCount() { - return writeFailureCount.get(); - } - - @Override - public long incWriteFailureCount() { - return writeFailureCount.incrementAndGet(); - } - - @Override - public Map getWriteFailures() { - return writeFailures; - } - - @Override - public void updateWriteFailures(String regionName, String serverName) { - writeFailures.put(regionName, serverName); - } - - @Override - public long getReadSuccessCount() { - return readSuccessCount.get(); - } - - @Override - public long incReadSuccessCount() { - return readSuccessCount.incrementAndGet(); - } - - @Override - public long getWriteSuccessCount() { - return writeSuccessCount.get(); - } - - @Override - public long incWriteSuccessCount() { - return writeSuccessCount.incrementAndGet(); - } - } - - /** - * By RegionServer, for 'regionserver' mode. - */ - public static class RegionServerStdOutSink extends StdOutSink { - public void publishReadFailure(String table, String server) { - incReadFailureCount(); - LOG.error("Read from {} on {}", table, server); - } - - public void publishReadTiming(String table, String server, long msTime) { - LOG.info("Read from {} on {} in {}ms", table, server, msTime); - } - } - - /** - * Output for 'zookeeper' mode. - */ - public static class ZookeeperStdOutSink extends StdOutSink { - public void publishReadFailure(String znode, String server) { - incReadFailureCount(); - LOG.error("Read from {} on {}", znode, server); - } - - public void publishReadTiming(String znode, String server, long msTime) { - LOG.info("Read from {} on {} in {}ms", znode, server, msTime); - } - } - - /** - * By Region, for 'region' mode. - */ - public static class RegionStdOutSink extends StdOutSink { - private Map perTableReadLatency = new HashMap<>(); - private LongAdder writeLatency = new LongAdder(); - private final Map regionMap = new ConcurrentHashMap<>(); - - public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) { - incReadFailureCount(); - LOG.error("Read from {} on {} failed", region.getRegionNameAsString(), serverName, e); - } - - public void publishReadFailure(ServerName serverName, RegionInfo region, - ColumnFamilyDescriptor column, Exception e) { - incReadFailureCount(); - LOG.error("Read from {} on {} {} failed", region.getRegionNameAsString(), serverName, - column.getNameAsString(), e); - } - - public void publishReadTiming(ServerName serverName, RegionInfo region, - ColumnFamilyDescriptor column, long msTime) { - incReadSuccessCount(); - RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString()); - res.setReadSuccess(); - res.setReadLatency(msTime); - LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, - column.getNameAsString(), msTime); - } - - public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) { - incWriteFailureCount(); - LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e); - } - - public void publishWriteFailure(ServerName serverName, RegionInfo region, - ColumnFamilyDescriptor column, Exception e) { - incWriteFailureCount(); - LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName, - column.getNameAsString(), e); - } - - public void publishWriteTiming(ServerName serverName, RegionInfo region, - ColumnFamilyDescriptor column, long msTime) { - incWriteSuccessCount(); - RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString()); - res.setWriteSuccess(); - res.setWriteLatency(msTime); - LOG.info("Write to {} on {} {} in {}ms", - region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime); - } - - public Map getReadLatencyMap() { - return this.perTableReadLatency; - } - - public LongAdder initializeAndGetReadLatencyForTable(String tableName) { - LongAdder initLatency = new LongAdder(); - this.perTableReadLatency.put(tableName, initLatency); - return initLatency; - } - - public void initializeWriteLatency() { - this.writeLatency.reset(); - } - - public LongAdder getWriteLatency() { - return this.writeLatency; - } - - public Map getRegionMap() { - return this.regionMap; - } - - public int getTotalExpectedRegions() { - return this.regionMap.size(); - } - } - - /** - * Run a single zookeeper Task and then exit. - */ - static class ZookeeperTask implements Callable { - private final Connection connection; - private final String host; - private String znode; - private final int timeout; - private ZookeeperStdOutSink sink; - - public ZookeeperTask(Connection connection, String host, String znode, int timeout, - ZookeeperStdOutSink sink) { - this.connection = connection; - this.host = host; - this.znode = znode; - this.timeout = timeout; - this.sink = sink; - } - - @Override public Void call() throws Exception { - ZooKeeper zooKeeper = null; - try { - zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance); - Stat exists = zooKeeper.exists(znode, false); - StopWatch stopwatch = new StopWatch(); - stopwatch.start(); - zooKeeper.getData(znode, false, exists); - stopwatch.stop(); - sink.publishReadTiming(znode, host, stopwatch.getTime()); - } catch (KeeperException | InterruptedException e) { - sink.publishReadFailure(znode, host); - } finally { - if (zooKeeper != null) { - zooKeeper.close(); - } - } - return null; - } - } - - /** - * Run a single Region Task and then exit. For each column family of the Region, get one row and - * output latency or failure. - */ - static class RegionTask implements Callable { - public enum TaskType{ - READ, WRITE - } - private Connection connection; - private RegionInfo region; - private RegionStdOutSink sink; - private TaskType taskType; - private boolean rawScanEnabled; - private ServerName serverName; - private LongAdder readWriteLatency; - - RegionTask(Connection connection, RegionInfo region, ServerName serverName, - RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) { - this.connection = connection; - this.region = region; - this.serverName = serverName; - this.sink = sink; - this.taskType = taskType; - this.rawScanEnabled = rawScanEnabled; - this.readWriteLatency = rwLatency; - } - - @Override - public Void call() { - switch (taskType) { - case READ: - return read(); - case WRITE: - return write(); - default: - return read(); - } - } - - public Void read() { - Table table = null; - TableDescriptor tableDesc = null; - try { - LOG.debug("Reading table descriptor for table {}", region.getTable()); - table = connection.getTable(region.getTable()); - tableDesc = table.getDescriptor(); - } catch (IOException e) { - LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e); - sink.publishReadFailure(serverName, region, e); - if (table != null) { - try { - table.close(); - } catch (IOException ioe) { - LOG.error("Close table failed", e); - } - } - return null; - } - - byte[] startKey = null; - Get get = null; - Scan scan = null; - ResultScanner rs = null; - StopWatch stopWatch = new StopWatch(); - for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { - stopWatch.reset(); - startKey = region.getStartKey(); - // Can't do a get on empty start row so do a Scan of first element if any instead. - if (startKey.length > 0) { - get = new Get(startKey); - get.setCacheBlocks(false); - get.setFilter(new FirstKeyOnlyFilter()); - get.addFamily(column.getName()); - } else { - scan = new Scan(); - LOG.debug("rawScan {} for {}", rawScanEnabled, tableDesc.getTableName()); - scan.setRaw(rawScanEnabled); - scan.setCaching(1); - scan.setCacheBlocks(false); - scan.setFilter(new FirstKeyOnlyFilter()); - scan.addFamily(column.getName()); - scan.setMaxResultSize(1L); - scan.setOneRowLimit(); - } - LOG.debug("Reading from {} {} {} {}", tableDesc.getTableName(), - region.getRegionNameAsString(), column.getNameAsString(), - Bytes.toStringBinary(startKey)); - try { - stopWatch.start(); - if (startKey.length > 0) { - table.get(get); - } else { - rs = table.getScanner(scan); - rs.next(); - } - stopWatch.stop(); - this.readWriteLatency.add(stopWatch.getTime()); - sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); - } catch (Exception e) { - sink.publishReadFailure(serverName, region, column, e); - sink.updateReadFailures(region.getRegionNameAsString(), serverName.getHostname()); - } finally { - if (rs != null) { - rs.close(); - } - scan = null; - get = null; - } - } - try { - table.close(); - } catch (IOException e) { - LOG.error("Close table failed", e); - } - return null; - } - - /** - * Check writes for the canary table - */ - private Void write() { - Table table = null; - TableDescriptor tableDesc = null; - try { - table = connection.getTable(region.getTable()); - tableDesc = table.getDescriptor(); - byte[] rowToCheck = region.getStartKey(); - if (rowToCheck.length == 0) { - rowToCheck = new byte[]{0x0}; - } - int writeValueSize = - connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); - for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { - Put put = new Put(rowToCheck); - byte[] value = new byte[writeValueSize]; - Bytes.random(value); - put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); - - LOG.debug("Writing to {} {} {} {}", - tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(), - Bytes.toStringBinary(rowToCheck)); - try { - long startTime = System.currentTimeMillis(); - table.put(put); - long time = System.currentTimeMillis() - startTime; - this.readWriteLatency.add(time); - sink.publishWriteTiming(serverName, region, column, time); - } catch (Exception e) { - sink.publishWriteFailure(serverName, region, column, e); - } - } - table.close(); - } catch (IOException e) { - sink.publishWriteFailure(serverName, region, e); - sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() ); - } - return null; - } - } - - /** - * Run a single RegionServer Task and then exit. - * Get one row from a region on the regionserver and output latency or the failure. - */ - static class RegionServerTask implements Callable { - private Connection connection; - private String serverName; - private RegionInfo region; - private RegionServerStdOutSink sink; - private AtomicLong successes; - - RegionServerTask(Connection connection, String serverName, RegionInfo region, - RegionServerStdOutSink sink, AtomicLong successes) { - this.connection = connection; - this.serverName = serverName; - this.region = region; - this.sink = sink; - this.successes = successes; - } - - @Override - public Void call() { - TableName tableName = null; - Table table = null; - Get get = null; - byte[] startKey = null; - Scan scan = null; - StopWatch stopWatch = new StopWatch(); - // monitor one region on every region server - stopWatch.reset(); - try { - tableName = region.getTable(); - table = connection.getTable(tableName); - startKey = region.getStartKey(); - // Can't do a get on empty start row so do a Scan of first element if any instead. - LOG.debug("Reading from {} {} {} {}", - serverName, region.getTable(), region.getRegionNameAsString(), - Bytes.toStringBinary(startKey)); - if (startKey.length > 0) { - get = new Get(startKey); - get.setCacheBlocks(false); - get.setFilter(new FirstKeyOnlyFilter()); - stopWatch.start(); - table.get(get); - stopWatch.stop(); - } else { - scan = new Scan(); - scan.setCacheBlocks(false); - scan.setFilter(new FirstKeyOnlyFilter()); - scan.setCaching(1); - scan.setMaxResultSize(1L); - scan.setOneRowLimit(); - stopWatch.start(); - ResultScanner s = table.getScanner(scan); - s.next(); - s.close(); - stopWatch.stop(); - } - successes.incrementAndGet(); - sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); - } catch (TableNotFoundException tnfe) { - LOG.error("Table may be deleted", tnfe); - // This is ignored because it doesn't imply that the regionserver is dead - } catch (TableNotEnabledException tnee) { - // This is considered a success since we got a response. - successes.incrementAndGet(); - LOG.debug("The targeted table was disabled. Assuming success."); - } catch (DoNotRetryIOException dnrioe) { - sink.publishReadFailure(tableName.getNameAsString(), serverName); - LOG.error(dnrioe.toString(), dnrioe); - } catch (IOException e) { - sink.publishReadFailure(tableName.getNameAsString(), serverName); - LOG.error(e.toString(), e); - } finally { - if (table != null) { - try { - table.close(); - } catch (IOException e) {/* DO NOTHING */ - LOG.error("Close table failed", e); - } - } - scan = null; - get = null; - startKey = null; - } - return null; - } - } - - private static final int USAGE_EXIT_CODE = 1; - private static final int INIT_ERROR_EXIT_CODE = 2; - private static final int TIMEOUT_ERROR_EXIT_CODE = 3; - private static final int ERROR_EXIT_CODE = 4; - private static final int FAILURE_EXIT_CODE = 5; - - private static final long DEFAULT_INTERVAL = 60000; - - private static final long DEFAULT_TIMEOUT = 600000; // 10 mins - private static final int MAX_THREADS_NUM = 16; // #threads to contact regions - - private static final Logger LOG = LoggerFactory.getLogger(Canary.class); - - public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf( - NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); - - private static final String CANARY_TABLE_FAMILY_NAME = "Test"; - - private Configuration conf = null; - private long interval = 0; - private Sink sink = null; - - private boolean useRegExp; - private long timeout = DEFAULT_TIMEOUT; - private boolean failOnError = true; - - /** - * True if we are to run in 'regionServer' mode. - */ - private boolean regionServerMode = false; - - /** - * True if we are to run in zookeeper 'mode'. - */ - private boolean zookeeperMode = false; - - private long permittedFailures = 0; - private boolean regionServerAllRegions = false; - private boolean writeSniffing = false; - private long configuredWriteTableTimeout = DEFAULT_TIMEOUT; - private boolean treatFailureAsError = false; - private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME; - - /** - * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. - * we aggregate time to fetch each region and it needs to be less than this value else we - * log an ERROR. - */ - private HashMap configuredReadTableTimeouts = new HashMap<>(); - - private ExecutorService executor; // threads to retrieve data from regionservers - - public Canary() { - this(new ScheduledThreadPoolExecutor(1)); - } - - public Canary(ExecutorService executor) { - this(executor, null); + static Canary create(Configuration conf, ExecutorService executor) { + return new CanaryTool(conf, executor); } @VisibleForTesting - Canary(ExecutorService executor, Sink sink) { - this.executor = executor; - this.sink = sink; - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public void setConf(Configuration conf) { - this.conf = conf; - } - - private int parseArgs(String[] args) { - int index = -1; - // Process command line args - for (int i = 0; i < args.length; i++) { - String cmd = args[i]; - - if (cmd.startsWith("-")) { - if (index >= 0) { - // command line args must be in the form: [opts] [table 1 [table 2 ...]] - System.err.println("Invalid command line options"); - printUsageAndExit(); - } - - if (cmd.equals("-help") || cmd.equals("-h")) { - // user asked for help, print the help and quit. - printUsageAndExit(); - } else if (cmd.equals("-daemon") && interval == 0) { - // user asked for daemon mode, set a default interval between checks - interval = DEFAULT_INTERVAL; - } else if (cmd.equals("-interval")) { - // user has specified an interval for canary breaths (-interval N) - i++; - - if (i == args.length) { - System.err.println("-interval takes a numeric seconds value argument."); - printUsageAndExit(); - } - - try { - interval = Long.parseLong(args[i]) * 1000; - } catch (NumberFormatException e) { - System.err.println("-interval needs a numeric value argument."); - printUsageAndExit(); - } - } else if (cmd.equals("-zookeeper")) { - this.zookeeperMode = true; - } else if(cmd.equals("-regionserver")) { - this.regionServerMode = true; - } else if(cmd.equals("-allRegions")) { - this.regionServerAllRegions = true; - } else if(cmd.equals("-writeSniffing")) { - this.writeSniffing = true; - } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) { - this.treatFailureAsError = true; - } else if (cmd.equals("-e")) { - this.useRegExp = true; - } else if (cmd.equals("-t")) { - i++; - - if (i == args.length) { - System.err.println("-t takes a numeric milliseconds value argument."); - printUsageAndExit(); - } - - try { - this.timeout = Long.parseLong(args[i]); - } catch (NumberFormatException e) { - System.err.println("-t takes a numeric milliseconds value argument."); - printUsageAndExit(); - } - } else if(cmd.equals("-writeTableTimeout")) { - i++; - - if (i == args.length) { - System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); - printUsageAndExit(); - } - - try { - this.configuredWriteTableTimeout = Long.parseLong(args[i]); - } catch (NumberFormatException e) { - System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); - printUsageAndExit(); - } - } else if (cmd.equals("-writeTable")) { - i++; - - if (i == args.length) { - System.err.println("-writeTable takes a string tablename value argument."); - printUsageAndExit(); - } - this.writeTableName = TableName.valueOf(args[i]); - } else if (cmd.equals("-f")) { - i++; - - if (i == args.length) { - System.err - .println("-f needs a boolean value argument (true|false)."); - printUsageAndExit(); - } - - this.failOnError = Boolean.parseBoolean(args[i]); - } else if (cmd.equals("-readTableTimeouts")) { - i++; - - if (i == args.length) { - System.err.println("-readTableTimeouts needs a comma-separated list of read " + - "millisecond timeouts per table (without spaces)."); - printUsageAndExit(); - } - String [] tableTimeouts = args[i].split(","); - for (String tT: tableTimeouts) { - String [] nameTimeout = tT.split("="); - if (nameTimeout.length < 2) { - System.err.println("Each -readTableTimeouts argument must be of the form " + - "= (without spaces)."); - printUsageAndExit(); - } - long timeoutVal = 0L; - try { - timeoutVal = Long.parseLong(nameTimeout[1]); - } catch (NumberFormatException e) { - System.err.println("-readTableTimeouts read timeout for each table must be a numeric value argument."); - printUsageAndExit(); - } - this.configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal); - } - } else if (cmd.equals("-permittedZookeeperFailures")) { - i++; - - if (i == args.length) { - System.err.println("-permittedZookeeperFailures needs a numeric value argument."); - printUsageAndExit(); - } - try { - this.permittedFailures = Long.parseLong(args[i]); - } catch (NumberFormatException e) { - System.err.println("-permittedZookeeperFailures needs a numeric value argument."); - printUsageAndExit(); - } - } else { - // no options match - System.err.println(cmd + " options is invalid."); - printUsageAndExit(); - } - } else if (index < 0) { - // keep track of first table name specified by the user - index = i; - } - } - if (this.regionServerAllRegions && !this.regionServerMode) { - System.err.println("-allRegions can only be specified in regionserver mode."); - printUsageAndExit(); - } - if (this.zookeeperMode) { - if (this.regionServerMode || this.regionServerAllRegions || this.writeSniffing) { - System.err.println("-zookeeper is exclusive and cannot be combined with " - + "other modes."); - printUsageAndExit(); - } - } - if (this.permittedFailures != 0 && !this.zookeeperMode) { - System.err.println("-permittedZookeeperFailures requires -zookeeper mode."); - printUsageAndExit(); - } - if (!this.configuredReadTableTimeouts.isEmpty() && (this.regionServerMode || this.zookeeperMode)) { - System.err.println("-readTableTimeouts can only be configured in region mode."); - printUsageAndExit(); - } - return index; - } - - @Override - public int run(String[] args) throws Exception { - int index = parseArgs(args); - ChoreService choreService = null; - - // Launches chore for refreshing kerberos credentials if security is enabled. - // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster - // for more details. - final ScheduledChore authChore = AuthUtil.getAuthChore(conf); - if (authChore != null) { - choreService = new ChoreService("CANARY_TOOL"); - choreService.scheduleChore(authChore); - } - - // Start to prepare the stuffs - Monitor monitor = null; - Thread monitorThread = null; - long startTime = 0; - long currentTimeLength = 0; - // Get a connection to use in below. - try (Connection connection = ConnectionFactory.createConnection(this.conf)) { - do { - // Do monitor !! - try { - monitor = this.newMonitor(connection, index, args); - monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis()); - startTime = System.currentTimeMillis(); - monitorThread.start(); - while (!monitor.isDone()) { - // wait for 1 sec - Thread.sleep(1000); - // exit if any error occurs - if (this.failOnError && monitor.hasError()) { - monitorThread.interrupt(); - if (monitor.initialized) { - return monitor.errorCode; - } else { - return INIT_ERROR_EXIT_CODE; - } - } - currentTimeLength = System.currentTimeMillis() - startTime; - if (currentTimeLength > this.timeout) { - LOG.error("The monitor is running too long (" + currentTimeLength - + ") after timeout limit:" + this.timeout - + " will be killed itself !!"); - if (monitor.initialized) { - return TIMEOUT_ERROR_EXIT_CODE; - } else { - return INIT_ERROR_EXIT_CODE; - } - } - } - - if (this.failOnError && monitor.finalCheckForErrors()) { - monitorThread.interrupt(); - return monitor.errorCode; - } - } finally { - if (monitor != null) monitor.close(); - } - - Thread.sleep(interval); - } while (interval > 0); - } // try-with-resources close - - if (choreService != null) { - choreService.shutdown(); - } - return monitor.errorCode; - } - - public Map getReadFailures() { - return sink.getReadFailures(); - } - - public Map getWriteFailures() { - return sink.getWriteFailures(); - } - - private void printUsageAndExit() { - System.err.println( - "Usage: canary [OPTIONS] [ [ [ interval between checks in seconds"); - System.err.println(" -e consider table/regionserver argument as regular " + - "expression"); - System.err.println(" -f exit on first error; default=true"); - System.err.println(" -failureAsError treat read/write failure as error"); - System.err.println(" -t timeout for canary-test run; default=600000ms"); - System.err.println(" -writeSniffing enable write sniffing"); - System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); - System.err.println(" -writeTableTimeout timeout for writeTable; default=600000ms"); - System.err.println(" -readTableTimeouts =," + - "=,..."); - System.err.println(" comma-separated list of table read timeouts " + - "(no spaces);"); - System.err.println(" logs 'ERROR' if takes longer. default=600000ms"); - System.err.println(" -permittedZookeeperFailures Ignore first N failures attempting to "); - System.err.println(" connect to individual zookeeper nodes in ensemble"); - System.err.println(""); - System.err.println(" -D= to assign or override configuration params"); - System.err.println(" -Dhbase.canary.read.raw.enabled= Set to enable/disable " + - "raw scan; default=false"); - System.err.println(""); - System.err.println("Canary runs in one of three modes: region (default), regionserver, or " + - "zookeeper."); - System.err.println("To sniff/probe all regions, pass no arguments."); - System.err.println("To sniff/probe all regions of a table, pass tablename."); - System.err.println("To sniff/probe regionservers, pass -regionserver, etc."); - System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation."); - System.exit(USAGE_EXIT_CODE); - } - - Sink getSink(Configuration configuration, Class clazz) { - // In test context, this.sink might be set. Use it if non-null. For testing. - return this.sink != null? this.sink: - (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class", - clazz, Sink.class)); + static Canary create(Configuration conf, ExecutorService executor, CanaryTool.Sink sink) { + return new CanaryTool(conf, executor, sink); } /** - * Canary region mode-specific data structure which stores information about each region - * to be scanned + * Run Canary in Region mode. + * + * @param targets -- list of monitor tables. + * @return the exit code of the Canary tool. */ - public static class RegionTaskResult { - private RegionInfo region; - private TableName tableName; - private ServerName serverName; - private AtomicLong readLatency = null; - private AtomicLong writeLatency = null; - private boolean readSuccess = false; - private boolean writeSuccess = false; - - public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName) { - this.region = region; - this.tableName = tableName; - this.serverName = serverName; - } - - public RegionInfo getRegionInfo() { - return this.region; - } - - public String getRegionNameAsString() { - return this.region.getRegionNameAsString(); - } - - public TableName getTableName() { - return this.tableName; - } - - public String getTableNameAsString() { - return this.tableName.getNameAsString(); - } - - public ServerName getServerName() { - return this.serverName; - } - - public String getServerNameAsString() { - return this.serverName.getServerName(); - } - - public long getReadLatency() { - if (this.readLatency == null) { - return -1; - } - return this.readLatency.get(); - } - - public void setReadLatency(long readLatency) { - if (this.readLatency != null) { - this.readLatency.set(readLatency); - } else { - this.readLatency = new AtomicLong(readLatency); - } - } - - public long getWriteLatency() { - if (this.writeLatency == null) { - return -1; - } - return this.writeLatency.get(); - } - - public void setWriteLatency(long writeLatency) { - if (this.writeLatency != null) { - this.writeLatency.set(writeLatency); - } else { - this.writeLatency = new AtomicLong(writeLatency); - } - } - - public boolean isReadSuccess() { - return this.readSuccess; - } - - public void setReadSuccess() { - this.readSuccess = true; - } - - public boolean isWriteSuccess() { - return this.writeSuccess; - } - - public void setWriteSuccess() { - this.writeSuccess = true; - } - } + public int checkRegions(String[] targets) throws Exception; /** - * A Factory method for {@link Monitor}. - * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor. - * @param index a start index for monitor target - * @param args args passed from user - * @return a Monitor instance + * Runs Canary in Region server mode. + * + * @param targets -- list of monitor tables. + * @return the exit code of the Canary tool. */ - public Monitor newMonitor(final Connection connection, int index, String[] args) { - Monitor monitor = null; - String[] monitorTargets = null; - - if (index >= 0) { - int length = args.length - index; - monitorTargets = new String[length]; - System.arraycopy(args, index, monitorTargets, 0, length); - } - - if (this.regionServerMode) { - monitor = - new RegionServerMonitor(connection, monitorTargets, this.useRegExp, - getSink(connection.getConfiguration(), RegionServerStdOutSink.class), - this.executor, this.regionServerAllRegions, - this.treatFailureAsError, this.permittedFailures); - } else if (this.zookeeperMode) { - monitor = - new ZookeeperMonitor(connection, monitorTargets, this.useRegExp, - getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), - this.executor, this.treatFailureAsError, - this.permittedFailures); - } else { - monitor = - new RegionMonitor(connection, monitorTargets, this.useRegExp, - getSink(connection.getConfiguration(), RegionStdOutSink.class), - this.executor, this.writeSniffing, - this.writeTableName, this.treatFailureAsError, this.configuredReadTableTimeouts, - this.configuredWriteTableTimeout, this.permittedFailures); - } - return monitor; - } + public int checkRegionServers(String[] targets) throws Exception; /** - * A Monitor super-class can be extended by users + * Runs Canary in Zookeeper mode. + * + * @return the exit code of the Canary tool. */ - public static abstract class Monitor implements Runnable, Closeable { - protected Connection connection; - protected Admin admin; - /** - * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. - * Passed on the command-line as arguments. - */ - protected String[] targets; - protected boolean useRegExp; - protected boolean treatFailureAsError; - protected boolean initialized = false; + public int checkZooKeeper() throws Exception; - protected boolean done = false; - protected int errorCode = 0; - protected long allowedFailures = 0; - protected Sink sink; - protected ExecutorService executor; + public Map getReadFailures(); - public boolean isDone() { - return done; - } - - public boolean hasError() { - return errorCode != 0; - } - - public boolean finalCheckForErrors() { - if (errorCode != 0) { - return true; - } - if (treatFailureAsError && - (sink.getReadFailureCount() > allowedFailures || sink.getWriteFailureCount() > allowedFailures)) { - LOG.error("Too many failures detected, treating failure as error, failing the Canary."); - errorCode = FAILURE_EXIT_CODE; - return true; - } - return false; - } - - @Override - public void close() throws IOException { - if (this.admin != null) this.admin.close(); - } - - protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, - ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { - if (null == connection) throw new IllegalArgumentException("connection shall not be null"); - - this.connection = connection; - this.targets = monitorTargets; - this.useRegExp = useRegExp; - this.treatFailureAsError = treatFailureAsError; - this.sink = sink; - this.executor = executor; - this.allowedFailures = allowedFailures; - } - - @Override - public abstract void run(); - - protected boolean initAdmin() { - if (null == this.admin) { - try { - this.admin = this.connection.getAdmin(); - } catch (Exception e) { - LOG.error("Initial HBaseAdmin failed...", e); - this.errorCode = INIT_ERROR_EXIT_CODE; - } - } else if (admin.isAborted()) { - LOG.error("HBaseAdmin aborted"); - this.errorCode = INIT_ERROR_EXIT_CODE; - } - return !this.hasError(); - } - } - - /** - * A monitor for region mode. - */ - private static class RegionMonitor extends Monitor { - // 10 minutes - private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000; - // 1 days - private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60; - - private long lastCheckTime = -1; - private boolean writeSniffing; - private TableName writeTableName; - private int writeDataTTL; - private float regionsLowerLimit; - private float regionsUpperLimit; - private int checkPeriod; - private boolean rawScanEnabled; - - /** - * This is a timeout per table. If read of each region in the table aggregated takes longer - * than what is configured here, we log an ERROR rather than just an INFO. - */ - private HashMap configuredReadTableTimeouts; - - private long configuredWriteTableTimeout; - - public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, - Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, - boolean treatFailureAsError, HashMap configuredReadTableTimeouts, - long configuredWriteTableTimeout, - long allowedFailures) { - super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, - allowedFailures); - Configuration conf = connection.getConfiguration(); - this.writeSniffing = writeSniffing; - this.writeTableName = writeTableName; - this.writeDataTTL = - conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); - this.regionsLowerLimit = - conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f); - this.regionsUpperLimit = - conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f); - this.checkPeriod = - conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, - DEFAULT_WRITE_TABLE_CHECK_PERIOD); - this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); - this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts); - this.configuredWriteTableTimeout = configuredWriteTableTimeout; - } - - private RegionStdOutSink getSink() { - if (!(sink instanceof RegionStdOutSink)) { - throw new RuntimeException("Can only write to Region sink"); - } - return ((RegionStdOutSink) sink); - } - - @Override - public void run() { - if (this.initAdmin()) { - try { - List> taskFutures = new LinkedList<>(); - RegionStdOutSink regionSink = this.getSink(); - if (this.targets != null && this.targets.length > 0) { - String[] tables = generateMonitorTables(this.targets); - // Check to see that each table name passed in the -readTableTimeouts argument is also - // passed as a monitor target. - if (!new HashSet<>(Arrays.asList(tables)). - containsAll(this.configuredReadTableTimeouts.keySet())) { - LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " + - "passed via command line."); - this.errorCode = USAGE_EXIT_CODE; - return; - } - this.initialized = true; - for (String table : tables) { - LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); - taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ, - this.rawScanEnabled, readLatency)); - } - } else { - taskFutures.addAll(sniff(TaskType.READ, regionSink)); - } - - if (writeSniffing) { - if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { - try { - checkWriteTableDistribution(); - } catch (IOException e) { - LOG.error("Check canary table distribution failed!", e); - } - lastCheckTime = EnvironmentEdgeManager.currentTime(); - } - // sniff canary table with write operation - regionSink.initializeWriteLatency(); - LongAdder writeTableLatency = regionSink.getWriteLatency(); - taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getDescriptor(writeTableName), - executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency)); - } - - for (Future future : taskFutures) { - try { - future.get(); - } catch (ExecutionException e) { - LOG.error("Sniff region failed!", e); - } - } - Map actualReadTableLatency = regionSink.getReadLatencyMap(); - for (Map.Entry entry : configuredReadTableTimeouts.entrySet()) { - String tableName = entry.getKey(); - if (actualReadTableLatency.containsKey(tableName)) { - Long actual = actualReadTableLatency.get(tableName).longValue(); - Long configured = entry.getValue(); - if (actual > configured) { - LOG.error("Read operation for {} took {}ms (Configured read timeout {}ms.", - tableName, actual, configured); - } else { - LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.", - tableName, actual, configured); - } - } else { - LOG.error("Read operation for {} failed!", tableName); - } - } - if (this.writeSniffing) { - String writeTableStringName = this.writeTableName.getNameAsString(); - long actualWriteLatency = regionSink.getWriteLatency().longValue(); - LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", - writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); - // Check that the writeTable write operation latency does not exceed the configured timeout. - if (actualWriteLatency > this.configuredWriteTableTimeout) { - LOG.error("Write operation for {} exceeded the configured write timeout.", - writeTableStringName); - } - } - } catch (Exception e) { - LOG.error("Run regionMonitor failed", e); - this.errorCode = ERROR_EXIT_CODE; - } finally { - this.done = true; - } - } - this.done = true; - } - - /** - * @return List of tables to use in test. - */ - private String[] generateMonitorTables(String[] monitorTargets) throws IOException { - String[] returnTables = null; - - if (this.useRegExp) { - Pattern pattern = null; - List tds = null; - Set tmpTables = new TreeSet<>(); - try { - LOG.debug(String.format("reading list of tables")); - tds = this.admin.listTableDescriptors(pattern); - if (tds == null) { - tds = Collections.emptyList(); - } - for (String monitorTarget : monitorTargets) { - pattern = Pattern.compile(monitorTarget); - for (TableDescriptor td : tds) { - if (pattern.matcher(td.getTableName().getNameAsString()).matches()) { - tmpTables.add(td.getTableName().getNameAsString()); - } - } - } - } catch (IOException e) { - LOG.error("Communicate with admin failed", e); - throw e; - } - - if (tmpTables.size() > 0) { - returnTables = tmpTables.toArray(new String[tmpTables.size()]); - } else { - String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); - LOG.error(msg); - this.errorCode = INIT_ERROR_EXIT_CODE; - throw new TableNotFoundException(msg); - } - } else { - returnTables = monitorTargets; - } - - return returnTables; - } - - /* - * Canary entry point to monitor all the tables. - */ - private List> sniff(TaskType taskType, RegionStdOutSink regionSink) - throws Exception { - LOG.debug("Reading list of tables"); - List> taskFutures = new LinkedList<>(); - for (TableDescriptor td: admin.listTableDescriptors()) { - if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) && - (!td.getTableName().equals(writeTableName))) { - LongAdder readLatency = - regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); - taskFutures.addAll(Canary.sniff(admin, sink, td, executor, taskType, this.rawScanEnabled, - readLatency)); - } - } - return taskFutures; - } - - private void checkWriteTableDistribution() throws IOException { - if (!admin.tableExists(writeTableName)) { - int numberOfServers = - admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size(); - if (numberOfServers == 0) { - throw new IllegalStateException("No live regionservers"); - } - createWriteTable(numberOfServers); - } - - if (!admin.isTableEnabled(writeTableName)) { - admin.enableTable(writeTableName); - } - - ClusterMetrics status = - admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER)); - int numberOfServers = status.getLiveServerMetrics().size(); - if (status.getLiveServerMetrics().containsKey(status.getMasterName())) { - numberOfServers -= 1; - } - - List> pairs = - MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); - int numberOfRegions = pairs.size(); - if (numberOfRegions < numberOfServers * regionsLowerLimit - || numberOfRegions > numberOfServers * regionsUpperLimit) { - admin.disableTable(writeTableName); - admin.deleteTable(writeTableName); - createWriteTable(numberOfServers); - } - HashSet serverSet = new HashSet<>(); - for (Pair pair : pairs) { - serverSet.add(pair.getSecond()); - } - int numberOfCoveredServers = serverSet.size(); - if (numberOfCoveredServers < numberOfServers) { - admin.balance(); - } - } - - private void createWriteTable(int numberOfServers) throws IOException { - int numberOfRegions = (int)(numberOfServers * regionsLowerLimit); - LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " + - "(current lower limit of regions per server is {} and you can change it with config {}).", - numberOfServers, numberOfRegions, regionsLowerLimit, - HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); - HTableDescriptor desc = new HTableDescriptor(writeTableName); - HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME); - family.setMaxVersions(1); - family.setTimeToLive(writeDataTTL); - - desc.addFamily(family); - byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); - admin.createTable(desc, splits); - } - } - - /** - * Canary entry point for specified table. - * @throws Exception - */ - private static List> sniff(final Admin admin, final Sink sink, String tableName, - ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency) - throws Exception { - LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName); - if (admin.isTableEnabled(TableName.valueOf(tableName))) { - return Canary.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)), - executor, taskType, rawScanEnabled, readLatency); - } else { - LOG.warn("Table {} is not enabled", tableName); - } - return new LinkedList<>(); - } - - /* - * Loops over regions of this table, and outputs information about the state. - */ - private static List> sniff(final Admin admin, final Sink sink, - TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, - boolean rawScanEnabled, LongAdder rwLatency) throws Exception { - LOG.debug("Reading list of regions for table {}", tableDesc.getTableName()); - try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) { - List tasks = new ArrayList<>(); - try (RegionLocator regionLocator = - admin.getConnection().getRegionLocator(tableDesc.getTableName())) { - for (HRegionLocation location: regionLocator.getAllRegionLocations()) { - ServerName rs = location.getServerName(); - RegionInfo region = location.getRegion(); - tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink, - taskType, rawScanEnabled, rwLatency)); - Map regionMap = ((RegionStdOutSink) sink).getRegionMap(); - regionMap.put(region.getRegionNameAsString(), new RegionTaskResult(region, - region.getTable(), rs)); - } - return executor.invokeAll(tasks); - } - } catch (TableNotFoundException e) { - return Collections.EMPTY_LIST; - } - } - - // monitor for zookeeper mode - private static class ZookeeperMonitor extends Monitor { - private List hosts; - private final String znode; - private final int timeout; - - protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, - Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { - super(connection, monitorTargets, useRegExp, - sink, executor, treatFailureAsError, allowedFailures); - Configuration configuration = connection.getConfiguration(); - znode = - configuration.get(ZOOKEEPER_ZNODE_PARENT, - DEFAULT_ZOOKEEPER_ZNODE_PARENT); - timeout = configuration - .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); - ConnectStringParser parser = - new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); - hosts = Lists.newArrayList(); - for (InetSocketAddress server : parser.getServerAddresses()) { - hosts.add(server.toString()); - } - if (allowedFailures > (hosts.size() - 1) / 2) { - LOG.warn("Confirm allowable number of failed ZooKeeper nodes, as quorum will " + - "already be lost. Setting of {} failures is unexpected for {} ensemble size.", - allowedFailures, hosts.size()); - } - } - - @Override public void run() { - List tasks = Lists.newArrayList(); - ZookeeperStdOutSink zkSink = null; - try { - zkSink = this.getSink(); - } catch (RuntimeException e) { - LOG.error("Run ZooKeeperMonitor failed!", e); - this.errorCode = ERROR_EXIT_CODE; - } - this.initialized = true; - for (final String host : hosts) { - tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink)); - } - try { - for (Future future : this.executor.invokeAll(tasks)) { - try { - future.get(); - } catch (ExecutionException e) { - LOG.error("Sniff zookeeper failed!", e); - this.errorCode = ERROR_EXIT_CODE; - } - } - } catch (InterruptedException e) { - this.errorCode = ERROR_EXIT_CODE; - Thread.currentThread().interrupt(); - LOG.error("Sniff zookeeper interrupted!", e); - } - this.done = true; - } - - private ZookeeperStdOutSink getSink() { - if (!(sink instanceof ZookeeperStdOutSink)) { - throw new RuntimeException("Can only write to zookeeper sink"); - } - return ((ZookeeperStdOutSink) sink); - } - } - - - /** - * A monitor for regionserver mode - */ - private static class RegionServerMonitor extends Monitor { - private boolean allRegions; - - public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, - Sink sink, ExecutorService executor, boolean allRegions, - boolean treatFailureAsError, long allowedFailures) { - super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, - allowedFailures); - this.allRegions = allRegions; - } - - private RegionServerStdOutSink getSink() { - if (!(sink instanceof RegionServerStdOutSink)) { - throw new RuntimeException("Can only write to regionserver sink"); - } - return ((RegionServerStdOutSink) sink); - } - - @Override - public void run() { - if (this.initAdmin() && this.checkNoTableNames()) { - RegionServerStdOutSink regionServerSink = null; - try { - regionServerSink = this.getSink(); - } catch (RuntimeException e) { - LOG.error("Run RegionServerMonitor failed!", e); - this.errorCode = ERROR_EXIT_CODE; - } - Map> rsAndRMap = this.filterRegionServerByName(); - this.initialized = true; - this.monitorRegionServers(rsAndRMap, regionServerSink); - } - this.done = true; - } - - private boolean checkNoTableNames() { - List foundTableNames = new ArrayList<>(); - TableName[] tableNames = null; - LOG.debug("Reading list of tables"); - try { - tableNames = this.admin.listTableNames(); - } catch (IOException e) { - LOG.error("Get listTableNames failed", e); - this.errorCode = INIT_ERROR_EXIT_CODE; - return false; - } - - if (this.targets == null || this.targets.length == 0) return true; - - for (String target : this.targets) { - for (TableName tableName : tableNames) { - if (target.equals(tableName.getNameAsString())) { - foundTableNames.add(target); - } - } - } - - if (foundTableNames.size() > 0) { - System.err.println("Cannot pass a tablename when using the -regionserver " + - "option, tablenames:" + foundTableNames.toString()); - this.errorCode = USAGE_EXIT_CODE; - } - return foundTableNames.isEmpty(); - } - - private void monitorRegionServers(Map> rsAndRMap, RegionServerStdOutSink regionServerSink) { - List tasks = new ArrayList<>(); - Map successMap = new HashMap<>(); - Random rand = new Random(); - for (Map.Entry> entry : rsAndRMap.entrySet()) { - String serverName = entry.getKey(); - AtomicLong successes = new AtomicLong(0); - successMap.put(serverName, successes); - if (entry.getValue().isEmpty()) { - LOG.error("Regionserver not serving any regions - {}", serverName); - } else if (this.allRegions) { - for (RegionInfo region : entry.getValue()) { - tasks.add(new RegionServerTask(this.connection, - serverName, - region, - regionServerSink, - successes)); - } - } else { - // random select a region if flag not set - RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size())); - tasks.add(new RegionServerTask(this.connection, - serverName, - region, - regionServerSink, - successes)); - } - } - try { - for (Future future : this.executor.invokeAll(tasks)) { - try { - future.get(); - } catch (ExecutionException e) { - LOG.error("Sniff regionserver failed!", e); - this.errorCode = ERROR_EXIT_CODE; - } - } - if (this.allRegions) { - for (Map.Entry> entry : rsAndRMap.entrySet()) { - String serverName = entry.getKey(); - LOG.info("Successfully read {} regions out of {} on regionserver {}", - successMap.get(serverName), entry.getValue().size(), serverName); - } - } - } catch (InterruptedException e) { - this.errorCode = ERROR_EXIT_CODE; - LOG.error("Sniff regionserver interrupted!", e); - } - } - - private Map> filterRegionServerByName() { - Map> regionServerAndRegionsMap = this.getAllRegionServerByName(); - regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap); - return regionServerAndRegionsMap; - } - - private Map> getAllRegionServerByName() { - Map> rsAndRMap = new HashMap<>(); - try { - LOG.debug("Reading list of tables and locations"); - List tableDescs = this.admin.listTableDescriptors(); - List regions = null; - for (TableDescriptor tableDesc: tableDescs) { - try (RegionLocator regionLocator = - this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) { - for (HRegionLocation location : regionLocator.getAllRegionLocations()) { - ServerName rs = location.getServerName(); - String rsName = rs.getHostname(); - RegionInfo r = location.getRegion(); - if (rsAndRMap.containsKey(rsName)) { - regions = rsAndRMap.get(rsName); - } else { - regions = new ArrayList<>(); - rsAndRMap.put(rsName, regions); - } - regions.add(r); - } - } - } - - // get any live regionservers not serving any regions - for (ServerName rs: this.admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) - .getLiveServerMetrics().keySet()) { - String rsName = rs.getHostname(); - if (!rsAndRMap.containsKey(rsName)) { - rsAndRMap.put(rsName, Collections. emptyList()); - } - } - } catch (IOException e) { - LOG.error("Get HTables info failed", e); - this.errorCode = INIT_ERROR_EXIT_CODE; - } - return rsAndRMap; - } - - private Map> doFilterRegionServerByName( - Map> fullRsAndRMap) { - - Map> filteredRsAndRMap = null; - - if (this.targets != null && this.targets.length > 0) { - filteredRsAndRMap = new HashMap<>(); - Pattern pattern = null; - Matcher matcher = null; - boolean regExpFound = false; - for (String rsName : this.targets) { - if (this.useRegExp) { - regExpFound = false; - pattern = Pattern.compile(rsName); - for (Map.Entry> entry : fullRsAndRMap.entrySet()) { - matcher = pattern.matcher(entry.getKey()); - if (matcher.matches()) { - filteredRsAndRMap.put(entry.getKey(), entry.getValue()); - regExpFound = true; - } - } - if (!regExpFound) { - LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName); - } - } else { - if (fullRsAndRMap.containsKey(rsName)) { - filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName)); - } else { - LOG.info("No RegionServerInfo found, regionServerName {}", rsName); - } - } - } - } else { - filteredRsAndRMap = fullRsAndRMap; - } - return filteredRsAndRMap; - } - } - - public static void main(String[] args) throws Exception { - final Configuration conf = HBaseConfiguration.create(); - - // Loading the generic options to conf - new GenericOptionsParser(conf, args); - - int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); - LOG.info("Execution thread count={}", numThreads); - - int exitCode = 0; - ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); - try { - exitCode = ToolRunner.run(conf, new Canary(executor), args); - } finally { - executor.shutdown(); - } - System.exit(exitCode); - } -} + public Map getWriteFailures(); +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/CanaryTool.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/CanaryTool.java new file mode 100644 index 00000000000..0be7da2f81f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/CanaryTool.java @@ -0,0 +1,1866 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.tool; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZOOKEEPER_ZNODE_PARENT; +import static org.apache.hadoop.hbase.HConstants.ZOOKEEPER_ZNODE_PARENT; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.time.StopWatch; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.AuthUtil; +import org.apache.hadoop.hbase.ChoreService; +import org.apache.hadoop.hbase.ClusterMetrics; +import org.apache.hadoop.hbase.ClusterMetrics.Option; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.ScheduledChore; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotEnabledException; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; +import org.apache.hadoop.hbase.tool.CanaryTool.RegionTask.TaskType; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.ReflectionUtils; +import org.apache.hadoop.hbase.util.RegionSplitter; +import org.apache.hadoop.hbase.zookeeper.EmptyWatcher; +import org.apache.hadoop.hbase.zookeeper.ZKConfig; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.client.ConnectStringParser; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +/** + * HBase Canary Tool for "canary monitoring" of a running HBase cluster. + * + * There are three modes: + *
    + *
  1. region mode (Default): For each region, try to get one row per column family outputting + * information on failure (ERROR) or else the latency. + *
  2. + * + *
  3. regionserver mode: For each regionserver try to get one row from one table selected + * randomly outputting information on failure (ERROR) or else the latency. + *
  4. + * + *
  5. zookeeper mode: for each zookeeper instance, selects a znode outputting information on + * failure (ERROR) or else the latency. + *
  6. + *
+ */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) +public class CanaryTool implements Tool, Canary { + + @Override + public int checkRegions(String[] targets) throws Exception { + String configuredReadTableTimeoutsStr = conf.get(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT); + try { + if (configuredReadTableTimeoutsStr != null) { + populateReadTableTimeoutsMap(configuredReadTableTimeoutsStr); + } + } catch (IllegalArgumentException e) { + LOG.error("Constructing read table timeouts map failed ", e); + return USAGE_EXIT_CODE; + } + return runMonitor(targets); + } + + @Override + public int checkRegionServers(String[] targets) throws Exception { + regionServerMode = true; + return runMonitor(targets); + } + + @Override + public int checkZooKeeper() throws Exception { + zookeeperMode = true; + return runMonitor(null); + } + + /** + * Sink interface used by the canary to output information + */ + public interface Sink { + long getReadFailureCount(); + long incReadFailureCount(); + Map getReadFailures(); + void updateReadFailures(String regionName, String serverName); + long getWriteFailureCount(); + long incWriteFailureCount(); + Map getWriteFailures(); + void updateWriteFailures(String regionName, String serverName); + long getReadSuccessCount(); + long incReadSuccessCount(); + long getWriteSuccessCount(); + long incWriteSuccessCount(); + } + + /** + * Simple implementation of canary sink that allows plotting to a file or standard output. + */ + public static class StdOutSink implements Sink { + private AtomicLong readFailureCount = new AtomicLong(0), + writeFailureCount = new AtomicLong(0), + readSuccessCount = new AtomicLong(0), + writeSuccessCount = new AtomicLong(0); + private Map readFailures = new ConcurrentHashMap<>(); + private Map writeFailures = new ConcurrentHashMap<>(); + + @Override + public long getReadFailureCount() { + return readFailureCount.get(); + } + + @Override + public long incReadFailureCount() { + return readFailureCount.incrementAndGet(); + } + + @Override + public Map getReadFailures() { + return readFailures; + } + + @Override + public void updateReadFailures(String regionName, String serverName) { + readFailures.put(regionName, serverName); + } + + @Override + public long getWriteFailureCount() { + return writeFailureCount.get(); + } + + @Override + public long incWriteFailureCount() { + return writeFailureCount.incrementAndGet(); + } + + @Override + public Map getWriteFailures() { + return writeFailures; + } + + @Override + public void updateWriteFailures(String regionName, String serverName) { + writeFailures.put(regionName, serverName); + } + + @Override + public long getReadSuccessCount() { + return readSuccessCount.get(); + } + + @Override + public long incReadSuccessCount() { + return readSuccessCount.incrementAndGet(); + } + + @Override + public long getWriteSuccessCount() { + return writeSuccessCount.get(); + } + + @Override + public long incWriteSuccessCount() { + return writeSuccessCount.incrementAndGet(); + } + } + + /** + * By RegionServer, for 'regionserver' mode. + */ + public static class RegionServerStdOutSink extends StdOutSink { + public void publishReadFailure(String table, String server) { + incReadFailureCount(); + LOG.error("Read from {} on {}", table, server); + } + + public void publishReadTiming(String table, String server, long msTime) { + LOG.info("Read from {} on {} in {}ms", table, server, msTime); + } + } + + /** + * Output for 'zookeeper' mode. + */ + public static class ZookeeperStdOutSink extends StdOutSink { + public void publishReadFailure(String znode, String server) { + incReadFailureCount(); + LOG.error("Read from {} on {}", znode, server); + } + + public void publishReadTiming(String znode, String server, long msTime) { + LOG.info("Read from {} on {} in {}ms", znode, server, msTime); + } + } + + /** + * By Region, for 'region' mode. + */ + public static class RegionStdOutSink extends StdOutSink { + private Map perTableReadLatency = new HashMap<>(); + private LongAdder writeLatency = new LongAdder(); + private final Map regionMap = new ConcurrentHashMap<>(); + + public void publishReadFailure(ServerName serverName, RegionInfo region, Exception e) { + incReadFailureCount(); + LOG.error("Read from {} on {} failed", region.getRegionNameAsString(), serverName, e); + } + + public void publishReadFailure(ServerName serverName, RegionInfo region, + ColumnFamilyDescriptor column, Exception e) { + incReadFailureCount(); + LOG.error("Read from {} on {} {} failed", region.getRegionNameAsString(), serverName, + column.getNameAsString(), e); + } + + public void publishReadTiming(ServerName serverName, RegionInfo region, + ColumnFamilyDescriptor column, long msTime) { + incReadSuccessCount(); + RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString()); + res.setReadSuccess(); + res.setReadLatency(msTime); + LOG.info("Read from {} on {} {} in {}ms", region.getRegionNameAsString(), serverName, + column.getNameAsString(), msTime); + } + + public void publishWriteFailure(ServerName serverName, RegionInfo region, Exception e) { + incWriteFailureCount(); + LOG.error("Write to {} on {} failed", region.getRegionNameAsString(), serverName, e); + } + + public void publishWriteFailure(ServerName serverName, RegionInfo region, + ColumnFamilyDescriptor column, Exception e) { + incWriteFailureCount(); + LOG.error("Write to {} on {} {} failed", region.getRegionNameAsString(), serverName, + column.getNameAsString(), e); + } + + public void publishWriteTiming(ServerName serverName, RegionInfo region, + ColumnFamilyDescriptor column, long msTime) { + incWriteSuccessCount(); + RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString()); + res.setWriteSuccess(); + res.setWriteLatency(msTime); + LOG.info("Write to {} on {} {} in {}ms", + region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime); + } + + public Map getReadLatencyMap() { + return this.perTableReadLatency; + } + + public LongAdder initializeAndGetReadLatencyForTable(String tableName) { + LongAdder initLatency = new LongAdder(); + this.perTableReadLatency.put(tableName, initLatency); + return initLatency; + } + + public void initializeWriteLatency() { + this.writeLatency.reset(); + } + + public LongAdder getWriteLatency() { + return this.writeLatency; + } + + public Map getRegionMap() { + return this.regionMap; + } + + public int getTotalExpectedRegions() { + return this.regionMap.size(); + } + } + + /** + * Run a single zookeeper Task and then exit. + */ + static class ZookeeperTask implements Callable { + private final Connection connection; + private final String host; + private String znode; + private final int timeout; + private ZookeeperStdOutSink sink; + + public ZookeeperTask(Connection connection, String host, String znode, int timeout, + ZookeeperStdOutSink sink) { + this.connection = connection; + this.host = host; + this.znode = znode; + this.timeout = timeout; + this.sink = sink; + } + + @Override public Void call() throws Exception { + ZooKeeper zooKeeper = null; + try { + zooKeeper = new ZooKeeper(host, timeout, EmptyWatcher.instance); + Stat exists = zooKeeper.exists(znode, false); + StopWatch stopwatch = new StopWatch(); + stopwatch.start(); + zooKeeper.getData(znode, false, exists); + stopwatch.stop(); + sink.publishReadTiming(znode, host, stopwatch.getTime()); + } catch (KeeperException | InterruptedException e) { + sink.publishReadFailure(znode, host); + } finally { + if (zooKeeper != null) { + zooKeeper.close(); + } + } + return null; + } + } + + /** + * Run a single Region Task and then exit. For each column family of the Region, get one row and + * output latency or failure. + */ + static class RegionTask implements Callable { + public enum TaskType{ + READ, WRITE + } + private Connection connection; + private RegionInfo region; + private RegionStdOutSink sink; + private TaskType taskType; + private boolean rawScanEnabled; + private ServerName serverName; + private LongAdder readWriteLatency; + + RegionTask(Connection connection, RegionInfo region, ServerName serverName, + RegionStdOutSink sink, TaskType taskType, boolean rawScanEnabled, LongAdder rwLatency) { + this.connection = connection; + this.region = region; + this.serverName = serverName; + this.sink = sink; + this.taskType = taskType; + this.rawScanEnabled = rawScanEnabled; + this.readWriteLatency = rwLatency; + } + + @Override + public Void call() { + switch (taskType) { + case READ: + return read(); + case WRITE: + return write(); + default: + return read(); + } + } + + public Void read() { + Table table = null; + TableDescriptor tableDesc = null; + try { + LOG.debug("Reading table descriptor for table {}", region.getTable()); + table = connection.getTable(region.getTable()); + tableDesc = table.getDescriptor(); + } catch (IOException e) { + LOG.debug("sniffRegion {} of {} failed", region.getEncodedName(), e); + sink.publishReadFailure(serverName, region, e); + if (table != null) { + try { + table.close(); + } catch (IOException ioe) { + LOG.error("Close table failed", e); + } + } + return null; + } + + byte[] startKey = null; + Get get = null; + Scan scan = null; + ResultScanner rs = null; + StopWatch stopWatch = new StopWatch(); + for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { + stopWatch.reset(); + startKey = region.getStartKey(); + // Can't do a get on empty start row so do a Scan of first element if any instead. + if (startKey.length > 0) { + get = new Get(startKey); + get.setCacheBlocks(false); + get.setFilter(new FirstKeyOnlyFilter()); + get.addFamily(column.getName()); + } else { + scan = new Scan(); + LOG.debug("rawScan {} for {}", rawScanEnabled, tableDesc.getTableName()); + scan.setRaw(rawScanEnabled); + scan.setCaching(1); + scan.setCacheBlocks(false); + scan.setFilter(new FirstKeyOnlyFilter()); + scan.addFamily(column.getName()); + scan.setMaxResultSize(1L); + scan.setOneRowLimit(); + } + LOG.debug("Reading from {} {} {} {}", tableDesc.getTableName(), + region.getRegionNameAsString(), column.getNameAsString(), + Bytes.toStringBinary(startKey)); + try { + stopWatch.start(); + if (startKey.length > 0) { + table.get(get); + } else { + rs = table.getScanner(scan); + rs.next(); + } + stopWatch.stop(); + this.readWriteLatency.add(stopWatch.getTime()); + sink.publishReadTiming(serverName, region, column, stopWatch.getTime()); + } catch (Exception e) { + sink.publishReadFailure(serverName, region, column, e); + sink.updateReadFailures(region.getRegionNameAsString(), serverName.getHostname()); + } finally { + if (rs != null) { + rs.close(); + } + scan = null; + get = null; + } + } + try { + table.close(); + } catch (IOException e) { + LOG.error("Close table failed", e); + } + return null; + } + + /** + * Check writes for the canary table + */ + private Void write() { + Table table = null; + TableDescriptor tableDesc = null; + try { + table = connection.getTable(region.getTable()); + tableDesc = table.getDescriptor(); + byte[] rowToCheck = region.getStartKey(); + if (rowToCheck.length == 0) { + rowToCheck = new byte[]{0x0}; + } + int writeValueSize = + connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10); + for (ColumnFamilyDescriptor column : tableDesc.getColumnFamilies()) { + Put put = new Put(rowToCheck); + byte[] value = new byte[writeValueSize]; + Bytes.random(value); + put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value); + + LOG.debug("Writing to {} {} {} {}", + tableDesc.getTableName(), region.getRegionNameAsString(), column.getNameAsString(), + Bytes.toStringBinary(rowToCheck)); + try { + long startTime = System.currentTimeMillis(); + table.put(put); + long time = System.currentTimeMillis() - startTime; + this.readWriteLatency.add(time); + sink.publishWriteTiming(serverName, region, column, time); + } catch (Exception e) { + sink.publishWriteFailure(serverName, region, column, e); + } + } + table.close(); + } catch (IOException e) { + sink.publishWriteFailure(serverName, region, e); + sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() ); + } + return null; + } + } + + /** + * Run a single RegionServer Task and then exit. + * Get one row from a region on the regionserver and output latency or the failure. + */ + static class RegionServerTask implements Callable { + private Connection connection; + private String serverName; + private RegionInfo region; + private RegionServerStdOutSink sink; + private AtomicLong successes; + + RegionServerTask(Connection connection, String serverName, RegionInfo region, + RegionServerStdOutSink sink, AtomicLong successes) { + this.connection = connection; + this.serverName = serverName; + this.region = region; + this.sink = sink; + this.successes = successes; + } + + @Override + public Void call() { + TableName tableName = null; + Table table = null; + Get get = null; + byte[] startKey = null; + Scan scan = null; + StopWatch stopWatch = new StopWatch(); + // monitor one region on every region server + stopWatch.reset(); + try { + tableName = region.getTable(); + table = connection.getTable(tableName); + startKey = region.getStartKey(); + // Can't do a get on empty start row so do a Scan of first element if any instead. + LOG.debug("Reading from {} {} {} {}", + serverName, region.getTable(), region.getRegionNameAsString(), + Bytes.toStringBinary(startKey)); + if (startKey.length > 0) { + get = new Get(startKey); + get.setCacheBlocks(false); + get.setFilter(new FirstKeyOnlyFilter()); + stopWatch.start(); + table.get(get); + stopWatch.stop(); + } else { + scan = new Scan(); + scan.setCacheBlocks(false); + scan.setFilter(new FirstKeyOnlyFilter()); + scan.setCaching(1); + scan.setMaxResultSize(1L); + scan.setOneRowLimit(); + stopWatch.start(); + ResultScanner s = table.getScanner(scan); + s.next(); + s.close(); + stopWatch.stop(); + } + successes.incrementAndGet(); + sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime()); + } catch (TableNotFoundException tnfe) { + LOG.error("Table may be deleted", tnfe); + // This is ignored because it doesn't imply that the regionserver is dead + } catch (TableNotEnabledException tnee) { + // This is considered a success since we got a response. + successes.incrementAndGet(); + LOG.debug("The targeted table was disabled. Assuming success."); + } catch (DoNotRetryIOException dnrioe) { + sink.publishReadFailure(tableName.getNameAsString(), serverName); + LOG.error(dnrioe.toString(), dnrioe); + } catch (IOException e) { + sink.publishReadFailure(tableName.getNameAsString(), serverName); + LOG.error(e.toString(), e); + } finally { + if (table != null) { + try { + table.close(); + } catch (IOException e) {/* DO NOTHING */ + LOG.error("Close table failed", e); + } + } + scan = null; + get = null; + startKey = null; + } + return null; + } + } + + private static final int USAGE_EXIT_CODE = 1; + private static final int INIT_ERROR_EXIT_CODE = 2; + private static final int TIMEOUT_ERROR_EXIT_CODE = 3; + private static final int ERROR_EXIT_CODE = 4; + private static final int FAILURE_EXIT_CODE = 5; + + private static final long DEFAULT_INTERVAL = 60000; + + private static final long DEFAULT_TIMEOUT = 600000; // 10 mins + private static final int MAX_THREADS_NUM = 16; // #threads to contact regions + + private static final Logger LOG = LoggerFactory.getLogger(Canary.class); + + public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf( + NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary"); + + private static final String CANARY_TABLE_FAMILY_NAME = "Test"; + + private Configuration conf = null; + private long interval = 0; + private Sink sink = null; + + /** + * True if we are to run in 'regionServer' mode. + */ + private boolean regionServerMode = false; + + /** + * True if we are to run in zookeeper 'mode'. + */ + private boolean zookeeperMode = false; + + /** + * This is a Map of table to timeout. The timeout is for reading all regions in the table; i.e. + * we aggregate time to fetch each region and it needs to be less than this value else we + * log an ERROR. + */ + private HashMap configuredReadTableTimeouts = new HashMap<>(); + + public static final String HBASE_CANARY_REGIONSERVER_ALL_REGIONS + = "hbase.canary.regionserver_all_regions"; + + public static final String HBASE_CANARY_REGION_WRITE_SNIFFING + = "hbase.canary.region.write.sniffing"; + public static final String HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT + = "hbase.canary.region.write.table.timeout"; + public static final String HBASE_CANARY_REGION_WRITE_TABLE_NAME + = "hbase.canary.region.write.table.name"; + public static final String HBASE_CANARY_REGION_READ_TABLE_TIMEOUT + = "hbase.canary.region.read.table.timeout"; + + public static final String HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES + = "hbase.canary.zookeeper.permitted.failures"; + + public static final String HBASE_CANARY_USE_REGEX = "hbase.canary.use.regex"; + public static final String HBASE_CANARY_TIMEOUT = "hbase.canary.timeout"; + public static final String HBASE_CANARY_FAIL_ON_ERROR = "hbase.canary.fail.on.error"; + + + private ExecutorService executor; // threads to retrieve data from regionservers + + public CanaryTool() { + this(new ScheduledThreadPoolExecutor(1)); + } + + public CanaryTool(ExecutorService executor) { + this(executor, null); + } + + @VisibleForTesting + CanaryTool(ExecutorService executor, Sink sink) { + this.executor = executor; + this.sink = sink; + } + + CanaryTool(Configuration conf, ExecutorService executor) { + this(conf, executor, null); + } + + CanaryTool(Configuration conf, ExecutorService executor, Sink sink) { + this(executor, sink); + setConf(conf); + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + if (conf == null) { + conf = HBaseConfiguration.create(); + } + this.conf = conf; + } + + private int parseArgs(String[] args) { + int index = -1; + long permittedFailures = 0; + boolean regionServerAllRegions = false, writeSniffing = false; + String readTableTimeoutsStr = null; + + // Process command line args + for (int i = 0; i < args.length; i++) { + String cmd = args[i]; + + if (cmd.startsWith("-")) { + if (index >= 0) { + // command line args must be in the form: [opts] [table 1 [table 2 ...]] + System.err.println("Invalid command line options"); + printUsageAndExit(); + } + + if (cmd.equals("-help") || cmd.equals("-h")) { + // user asked for help, print the help and quit. + printUsageAndExit(); + } else if (cmd.equals("-daemon") && interval == 0) { + // user asked for daemon mode, set a default interval between checks + interval = DEFAULT_INTERVAL; + } else if (cmd.equals("-interval")) { + // user has specified an interval for canary breaths (-interval N) + i++; + + if (i == args.length) { + System.err.println("-interval takes a numeric seconds value argument."); + printUsageAndExit(); + } + + try { + interval = Long.parseLong(args[i]) * 1000; + } catch (NumberFormatException e) { + System.err.println("-interval needs a numeric value argument."); + printUsageAndExit(); + } + } else if (cmd.equals("-zookeeper")) { + this.zookeeperMode = true; + } else if(cmd.equals("-regionserver")) { + this.regionServerMode = true; + } else if(cmd.equals("-allRegions")) { + conf.setBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, true); + regionServerAllRegions = true; + } else if(cmd.equals("-writeSniffing")) { + writeSniffing = true; + conf.setBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, true); + } else if(cmd.equals("-treatFailureAsError") || cmd.equals("-failureAsError")) { + conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); + } else if (cmd.equals("-e")) { + conf.setBoolean(HBASE_CANARY_USE_REGEX, true); + } else if (cmd.equals("-t")) { + i++; + + if (i == args.length) { + System.err.println("-t takes a numeric milliseconds value argument."); + printUsageAndExit(); + } + long timeout = 0; + try { + timeout = Long.parseLong(args[i]); + } catch (NumberFormatException e) { + System.err.println("-t takes a numeric milliseconds value argument."); + printUsageAndExit(); + } + conf.setLong(HBASE_CANARY_TIMEOUT, timeout); + } else if(cmd.equals("-writeTableTimeout")) { + i++; + + if (i == args.length) { + System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); + printUsageAndExit(); + } + long configuredWriteTableTimeout = 0; + try { + configuredWriteTableTimeout = Long.parseLong(args[i]); + } catch (NumberFormatException e) { + System.err.println("-writeTableTimeout takes a numeric milliseconds value argument."); + printUsageAndExit(); + } + conf.setLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, configuredWriteTableTimeout); + } else if (cmd.equals("-writeTable")) { + i++; + + if (i == args.length) { + System.err.println("-writeTable takes a string tablename value argument."); + printUsageAndExit(); + } + conf.set(HBASE_CANARY_REGION_WRITE_TABLE_NAME, args[i]); + } else if (cmd.equals("-f")) { + i++; + + if (i == args.length) { + System.err + .println("-f needs a boolean value argument (true|false)."); + printUsageAndExit(); + } + + conf.setBoolean(HBASE_CANARY_FAIL_ON_ERROR, Boolean.parseBoolean(args[i])); + } else if (cmd.equals("-readTableTimeouts")) { + i++; + + if (i == args.length) { + System.err.println("-readTableTimeouts needs a comma-separated list of read " + + "millisecond timeouts per table (without spaces)."); + printUsageAndExit(); + } + readTableTimeoutsStr = args[i]; + conf.set(HBASE_CANARY_REGION_READ_TABLE_TIMEOUT, readTableTimeoutsStr); + } else if (cmd.equals("-permittedZookeeperFailures")) { + i++; + + if (i == args.length) { + System.err.println("-permittedZookeeperFailures needs a numeric value argument."); + printUsageAndExit(); + } + try { + permittedFailures = Long.parseLong(args[i]); + } catch (NumberFormatException e) { + System.err.println("-permittedZookeeperFailures needs a numeric value argument."); + printUsageAndExit(); + } + conf.setLong(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, permittedFailures); + } else { + // no options match + System.err.println(cmd + " options is invalid."); + printUsageAndExit(); + } + } else if (index < 0) { + // keep track of first table name specified by the user + index = i; + } + } + if (regionServerAllRegions && !this.regionServerMode) { + System.err.println("-allRegions can only be specified in regionserver mode."); + printUsageAndExit(); + } + if (this.zookeeperMode) { + if (this.regionServerMode || regionServerAllRegions || writeSniffing) { + System.err.println("-zookeeper is exclusive and cannot be combined with " + + "other modes."); + printUsageAndExit(); + } + } + if (permittedFailures != 0 && !this.zookeeperMode) { + System.err.println("-permittedZookeeperFailures requires -zookeeper mode."); + printUsageAndExit(); + } + if (readTableTimeoutsStr != null && (this.regionServerMode || this.zookeeperMode)) { + System.err.println("-readTableTimeouts can only be configured in region mode."); + printUsageAndExit(); + } + return index; + } + + @Override + public int run(String[] args) throws Exception { + int index = parseArgs(args); + String[] monitorTargets = null; + + if (index >= 0) { + int length = args.length - index; + monitorTargets = new String[length]; + System.arraycopy(args, index, monitorTargets, 0, length); + } + + if (zookeeperMode) { + return checkZooKeeper(); + } else if (regionServerMode) { + return checkRegionServers(monitorTargets); + } else { + return checkRegions(monitorTargets); + } + } + + private int runMonitor(String[] monitorTargets) throws Exception { + ChoreService choreService = null; + + // Launches chore for refreshing kerberos credentials if security is enabled. + // Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster + // for more details. + final ScheduledChore authChore = AuthUtil.getAuthChore(conf); + if (authChore != null) { + choreService = new ChoreService("CANARY_TOOL"); + choreService.scheduleChore(authChore); + } + + // Start to prepare the stuffs + Monitor monitor = null; + Thread monitorThread; + long startTime = 0; + long currentTimeLength = 0; + boolean failOnError = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); + long timeout = conf.getLong(HBASE_CANARY_TIMEOUT, DEFAULT_TIMEOUT); + // Get a connection to use in below. + try (Connection connection = ConnectionFactory.createConnection(this.conf)) { + do { + // Do monitor !! + try { + monitor = this.newMonitor(connection, monitorTargets); + monitorThread = new Thread(monitor, "CanaryMonitor-" + System.currentTimeMillis()); + startTime = System.currentTimeMillis(); + monitorThread.start(); + while (!monitor.isDone()) { + // wait for 1 sec + Thread.sleep(1000); + // exit if any error occurs + if (failOnError && monitor.hasError()) { + monitorThread.interrupt(); + if (monitor.initialized) { + return monitor.errorCode; + } else { + return INIT_ERROR_EXIT_CODE; + } + } + currentTimeLength = System.currentTimeMillis() - startTime; + if (currentTimeLength > timeout) { + LOG.error("The monitor is running too long (" + currentTimeLength + + ") after timeout limit:" + timeout + + " will be killed itself !!"); + if (monitor.initialized) { + return TIMEOUT_ERROR_EXIT_CODE; + } else { + return INIT_ERROR_EXIT_CODE; + } + } + } + + if (failOnError && monitor.finalCheckForErrors()) { + monitorThread.interrupt(); + return monitor.errorCode; + } + } finally { + if (monitor != null) monitor.close(); + } + + Thread.sleep(interval); + } while (interval > 0); + } // try-with-resources close + + if (choreService != null) { + choreService.shutdown(); + } + return monitor.errorCode; + } + + @Override + public Map getReadFailures() { + return sink.getReadFailures(); + } + + @Override + public Map getWriteFailures() { + return sink.getWriteFailures(); + } + + private void printUsageAndExit() { + System.err.println( + "Usage: canary [OPTIONS] [ [ [ interval between checks in seconds"); + System.err.println(" -e consider table/regionserver argument as regular " + + "expression"); + System.err.println(" -f exit on first error; default=true"); + System.err.println(" -failureAsError treat read/write failure as error"); + System.err.println(" -t timeout for canary-test run; default=600000ms"); + System.err.println(" -writeSniffing enable write sniffing"); + System.err.println(" -writeTable the table used for write sniffing; default=hbase:canary"); + System.err.println(" -writeTableTimeout timeout for writeTable; default=600000ms"); + System.err.println(" -readTableTimeouts =," + + "=,..."); + System.err.println(" comma-separated list of table read timeouts " + + "(no spaces);"); + System.err.println(" logs 'ERROR' if takes longer. default=600000ms"); + System.err.println(" -permittedZookeeperFailures Ignore first N failures attempting to "); + System.err.println(" connect to individual zookeeper nodes in ensemble"); + System.err.println(""); + System.err.println(" -D= to assign or override configuration params"); + System.err.println(" -Dhbase.canary.read.raw.enabled= Set to enable/disable " + + "raw scan; default=false"); + System.err.println(""); + System.err.println("Canary runs in one of three modes: region (default), regionserver, or " + + "zookeeper."); + System.err.println("To sniff/probe all regions, pass no arguments."); + System.err.println("To sniff/probe all regions of a table, pass tablename."); + System.err.println("To sniff/probe regionservers, pass -regionserver, etc."); + System.err.println("See http://hbase.apache.org/book.html#_canary for Canary documentation."); + System.exit(USAGE_EXIT_CODE); + } + + Sink getSink(Configuration configuration, Class clazz) { + // In test context, this.sink might be set. Use it if non-null. For testing. + return this.sink != null? this.sink: + (Sink)ReflectionUtils.newInstance(configuration.getClass("hbase.canary.sink.class", + clazz, Sink.class)); + } + + /** + * Canary region mode-specific data structure which stores information about each region + * to be scanned + */ + public static class RegionTaskResult { + private RegionInfo region; + private TableName tableName; + private ServerName serverName; + private AtomicLong readLatency = null; + private AtomicLong writeLatency = null; + private boolean readSuccess = false; + private boolean writeSuccess = false; + + public RegionTaskResult(RegionInfo region, TableName tableName, ServerName serverName) { + this.region = region; + this.tableName = tableName; + this.serverName = serverName; + } + + public RegionInfo getRegionInfo() { + return this.region; + } + + public String getRegionNameAsString() { + return this.region.getRegionNameAsString(); + } + + public TableName getTableName() { + return this.tableName; + } + + public String getTableNameAsString() { + return this.tableName.getNameAsString(); + } + + public ServerName getServerName() { + return this.serverName; + } + + public String getServerNameAsString() { + return this.serverName.getServerName(); + } + + public long getReadLatency() { + if (this.readLatency == null) { + return -1; + } + return this.readLatency.get(); + } + + public void setReadLatency(long readLatency) { + if (this.readLatency != null) { + this.readLatency.set(readLatency); + } else { + this.readLatency = new AtomicLong(readLatency); + } + } + + public long getWriteLatency() { + if (this.writeLatency == null) { + return -1; + } + return this.writeLatency.get(); + } + + public void setWriteLatency(long writeLatency) { + if (this.writeLatency != null) { + this.writeLatency.set(writeLatency); + } else { + this.writeLatency = new AtomicLong(writeLatency); + } + } + + public boolean isReadSuccess() { + return this.readSuccess; + } + + public void setReadSuccess() { + this.readSuccess = true; + } + + public boolean isWriteSuccess() { + return this.writeSuccess; + } + + public void setWriteSuccess() { + this.writeSuccess = true; + } + } + + /** + * A Factory method for {@link Monitor}. + * Makes a RegionServerMonitor, or a ZooKeeperMonitor, or a RegionMonitor. + * @return a Monitor instance + */ + private Monitor newMonitor(final Connection connection, String[] monitorTargets) { + Monitor monitor; + boolean useRegExp = conf.getBoolean(HBASE_CANARY_USE_REGEX, false); + boolean regionServerAllRegions + = conf.getBoolean(HBASE_CANARY_REGIONSERVER_ALL_REGIONS, false); + boolean failOnError + = conf.getBoolean(HBASE_CANARY_FAIL_ON_ERROR, true); + int permittedFailures + = conf.getInt(HBASE_CANARY_ZOOKEEPER_PERMITTED_FAILURES, 0); + boolean writeSniffing + = conf.getBoolean(HBASE_CANARY_REGION_WRITE_SNIFFING, false); + String writeTableName = conf.get(HBASE_CANARY_REGION_WRITE_TABLE_NAME, + DEFAULT_WRITE_TABLE_NAME.getNameAsString()); + long configuredWriteTableTimeout + = conf.getLong(HBASE_CANARY_REGION_WRITE_TABLE_TIMEOUT, DEFAULT_TIMEOUT); + + if (this.regionServerMode) { + monitor = + new RegionServerMonitor(connection, monitorTargets, useRegExp, + getSink(connection.getConfiguration(), RegionServerStdOutSink.class), + this.executor, regionServerAllRegions, + failOnError, permittedFailures); + + } else if (this.zookeeperMode) { + monitor = + new ZookeeperMonitor(connection, monitorTargets, useRegExp, + getSink(connection.getConfiguration(), ZookeeperStdOutSink.class), + this.executor, failOnError, permittedFailures); + } else { + monitor = + new RegionMonitor(connection, monitorTargets, useRegExp, + getSink(connection.getConfiguration(), RegionStdOutSink.class), + this.executor, writeSniffing, + TableName.valueOf(writeTableName), failOnError, configuredReadTableTimeouts, + configuredWriteTableTimeout, permittedFailures); + } + return monitor; + } + + private void populateReadTableTimeoutsMap(String configuredReadTableTimeoutsStr) { + String[] tableTimeouts = configuredReadTableTimeoutsStr.split(","); + for (String tT : tableTimeouts) { + String[] nameTimeout = tT.split("="); + if (nameTimeout.length < 2) { + throw new IllegalArgumentException("Each -readTableTimeouts argument must be of the form " + + "= (without spaces)."); + } + long timeoutVal; + try { + timeoutVal = Long.parseLong(nameTimeout[1]); + } catch (NumberFormatException e) { + throw new IllegalArgumentException("-readTableTimeouts read timeout for each table" + + " must be a numeric value argument."); + } + configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal); + } + } + /** + * A Monitor super-class can be extended by users + */ + public static abstract class Monitor implements Runnable, Closeable { + protected Connection connection; + protected Admin admin; + /** + * 'Target' dependent on 'mode'. Could be Tables or RegionServers or ZNodes. + * Passed on the command-line as arguments. + */ + protected String[] targets; + protected boolean useRegExp; + protected boolean treatFailureAsError; + protected boolean initialized = false; + + protected boolean done = false; + protected int errorCode = 0; + protected long allowedFailures = 0; + protected Sink sink; + protected ExecutorService executor; + + public boolean isDone() { + return done; + } + + public boolean hasError() { + return errorCode != 0; + } + + public boolean finalCheckForErrors() { + if (errorCode != 0) { + return true; + } + if (treatFailureAsError && + (sink.getReadFailureCount() > allowedFailures || sink.getWriteFailureCount() > allowedFailures)) { + LOG.error("Too many failures detected, treating failure as error, failing the Canary."); + errorCode = FAILURE_EXIT_CODE; + return true; + } + return false; + } + + @Override + public void close() throws IOException { + if (this.admin != null) this.admin.close(); + } + + protected Monitor(Connection connection, String[] monitorTargets, boolean useRegExp, Sink sink, + ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { + if (null == connection) throw new IllegalArgumentException("connection shall not be null"); + + this.connection = connection; + this.targets = monitorTargets; + this.useRegExp = useRegExp; + this.treatFailureAsError = treatFailureAsError; + this.sink = sink; + this.executor = executor; + this.allowedFailures = allowedFailures; + } + + @Override + public abstract void run(); + + protected boolean initAdmin() { + if (null == this.admin) { + try { + this.admin = this.connection.getAdmin(); + } catch (Exception e) { + LOG.error("Initial HBaseAdmin failed...", e); + this.errorCode = INIT_ERROR_EXIT_CODE; + } + } else if (admin.isAborted()) { + LOG.error("HBaseAdmin aborted"); + this.errorCode = INIT_ERROR_EXIT_CODE; + } + return !this.hasError(); + } + } + + /** + * A monitor for region mode. + */ + private static class RegionMonitor extends Monitor { + // 10 minutes + private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000; + // 1 days + private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60; + + private long lastCheckTime = -1; + private boolean writeSniffing; + private TableName writeTableName; + private int writeDataTTL; + private float regionsLowerLimit; + private float regionsUpperLimit; + private int checkPeriod; + private boolean rawScanEnabled; + + /** + * This is a timeout per table. If read of each region in the table aggregated takes longer + * than what is configured here, we log an ERROR rather than just an INFO. + */ + private HashMap configuredReadTableTimeouts; + + private long configuredWriteTableTimeout; + + public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, + Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName, + boolean treatFailureAsError, HashMap configuredReadTableTimeouts, + long configuredWriteTableTimeout, + long allowedFailures) { + super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, + allowedFailures); + Configuration conf = connection.getConfiguration(); + this.writeSniffing = writeSniffing; + this.writeTableName = writeTableName; + this.writeDataTTL = + conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL); + this.regionsLowerLimit = + conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f); + this.regionsUpperLimit = + conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f); + this.checkPeriod = + conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY, + DEFAULT_WRITE_TABLE_CHECK_PERIOD); + this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false); + this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts); + this.configuredWriteTableTimeout = configuredWriteTableTimeout; + } + + private RegionStdOutSink getSink() { + if (!(sink instanceof RegionStdOutSink)) { + throw new RuntimeException("Can only write to Region sink"); + } + return ((RegionStdOutSink) sink); + } + + @Override + public void run() { + if (this.initAdmin()) { + try { + List> taskFutures = new LinkedList<>(); + RegionStdOutSink regionSink = this.getSink(); + if (this.targets != null && this.targets.length > 0) { + String[] tables = generateMonitorTables(this.targets); + // Check to see that each table name passed in the -readTableTimeouts argument is also + // passed as a monitor target. + if (!new HashSet<>(Arrays.asList(tables)). + containsAll(this.configuredReadTableTimeouts.keySet())) { + LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets " + + "passed via command line."); + this.errorCode = USAGE_EXIT_CODE; + return; + } + this.initialized = true; + for (String table : tables) { + LongAdder readLatency = regionSink.initializeAndGetReadLatencyForTable(table); + taskFutures.addAll(CanaryTool.sniff(admin, regionSink, table, executor, TaskType.READ, + this.rawScanEnabled, readLatency)); + } + } else { + taskFutures.addAll(sniff(TaskType.READ, regionSink)); + } + + if (writeSniffing) { + if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) { + try { + checkWriteTableDistribution(); + } catch (IOException e) { + LOG.error("Check canary table distribution failed!", e); + } + lastCheckTime = EnvironmentEdgeManager.currentTime(); + } + // sniff canary table with write operation + regionSink.initializeWriteLatency(); + LongAdder writeTableLatency = regionSink.getWriteLatency(); + taskFutures.addAll(CanaryTool.sniff(admin, regionSink, admin.getDescriptor(writeTableName), + executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency)); + } + + for (Future future : taskFutures) { + try { + future.get(); + } catch (ExecutionException e) { + LOG.error("Sniff region failed!", e); + } + } + Map actualReadTableLatency = regionSink.getReadLatencyMap(); + for (Map.Entry entry : configuredReadTableTimeouts.entrySet()) { + String tableName = entry.getKey(); + if (actualReadTableLatency.containsKey(tableName)) { + Long actual = actualReadTableLatency.get(tableName).longValue(); + Long configured = entry.getValue(); + if (actual > configured) { + LOG.error("Read operation for {} took {}ms exceeded the configured read timeout." + + "(Configured read timeout {}ms.", tableName, actual, configured); + } else { + LOG.info("Read operation for {} took {}ms (Configured read timeout {}ms.", + tableName, actual, configured); + } + } else { + LOG.error("Read operation for {} failed!", tableName); + } + } + if (this.writeSniffing) { + String writeTableStringName = this.writeTableName.getNameAsString(); + long actualWriteLatency = regionSink.getWriteLatency().longValue(); + LOG.info("Write operation for {} took {}ms. Configured write timeout {}ms.", + writeTableStringName, actualWriteLatency, this.configuredWriteTableTimeout); + // Check that the writeTable write operation latency does not exceed the configured timeout. + if (actualWriteLatency > this.configuredWriteTableTimeout) { + LOG.error("Write operation for {} exceeded the configured write timeout.", + writeTableStringName); + } + } + } catch (Exception e) { + LOG.error("Run regionMonitor failed", e); + this.errorCode = ERROR_EXIT_CODE; + } finally { + this.done = true; + } + } + this.done = true; + } + + /** + * @return List of tables to use in test. + */ + private String[] generateMonitorTables(String[] monitorTargets) throws IOException { + String[] returnTables = null; + + if (this.useRegExp) { + Pattern pattern = null; + List tds = null; + Set tmpTables = new TreeSet<>(); + try { + LOG.debug(String.format("reading list of tables")); + tds = this.admin.listTableDescriptors(pattern); + if (tds == null) { + tds = Collections.emptyList(); + } + for (String monitorTarget : monitorTargets) { + pattern = Pattern.compile(monitorTarget); + for (TableDescriptor td : tds) { + if (pattern.matcher(td.getTableName().getNameAsString()).matches()) { + tmpTables.add(td.getTableName().getNameAsString()); + } + } + } + } catch (IOException e) { + LOG.error("Communicate with admin failed", e); + throw e; + } + + if (tmpTables.size() > 0) { + returnTables = tmpTables.toArray(new String[tmpTables.size()]); + } else { + String msg = "No HTable found, tablePattern:" + Arrays.toString(monitorTargets); + LOG.error(msg); + this.errorCode = INIT_ERROR_EXIT_CODE; + throw new TableNotFoundException(msg); + } + } else { + returnTables = monitorTargets; + } + + return returnTables; + } + + /* + * Canary entry point to monitor all the tables. + */ + private List> sniff(TaskType taskType, RegionStdOutSink regionSink) + throws Exception { + LOG.debug("Reading list of tables"); + List> taskFutures = new LinkedList<>(); + for (TableDescriptor td: admin.listTableDescriptors()) { + if (admin.tableExists(td.getTableName()) && admin.isTableEnabled(td.getTableName()) && + (!td.getTableName().equals(writeTableName))) { + LongAdder readLatency = + regionSink.initializeAndGetReadLatencyForTable(td.getTableName().getNameAsString()); + taskFutures.addAll(CanaryTool.sniff(admin, sink, td, executor, taskType, this.rawScanEnabled, + readLatency)); + } + } + return taskFutures; + } + + private void checkWriteTableDistribution() throws IOException { + if (!admin.tableExists(writeTableName)) { + int numberOfServers = + admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().size(); + if (numberOfServers == 0) { + throw new IllegalStateException("No live regionservers"); + } + createWriteTable(numberOfServers); + } + + if (!admin.isTableEnabled(writeTableName)) { + admin.enableTable(writeTableName); + } + + ClusterMetrics status = + admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS, Option.MASTER)); + int numberOfServers = status.getLiveServerMetrics().size(); + if (status.getLiveServerMetrics().containsKey(status.getMasterName())) { + numberOfServers -= 1; + } + + List> pairs = + MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName); + int numberOfRegions = pairs.size(); + if (numberOfRegions < numberOfServers * regionsLowerLimit + || numberOfRegions > numberOfServers * regionsUpperLimit) { + admin.disableTable(writeTableName); + admin.deleteTable(writeTableName); + createWriteTable(numberOfServers); + } + HashSet serverSet = new HashSet<>(); + for (Pair pair : pairs) { + serverSet.add(pair.getSecond()); + } + int numberOfCoveredServers = serverSet.size(); + if (numberOfCoveredServers < numberOfServers) { + admin.balance(); + } + } + + private void createWriteTable(int numberOfServers) throws IOException { + int numberOfRegions = (int)(numberOfServers * regionsLowerLimit); + LOG.info("Number of live regionservers {}, pre-splitting the canary table into {} regions " + + "(current lower limit of regions per server is {} and you can change it with config {}).", + numberOfServers, numberOfRegions, regionsLowerLimit, + HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY); + HTableDescriptor desc = new HTableDescriptor(writeTableName); + HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME); + family.setMaxVersions(1); + family.setTimeToLive(writeDataTTL); + + desc.addFamily(family); + byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions); + admin.createTable(desc, splits); + } + } + + /** + * Canary entry point for specified table. + * @throws Exception + */ + private static List> sniff(final Admin admin, final Sink sink, String tableName, + ExecutorService executor, TaskType taskType, boolean rawScanEnabled, LongAdder readLatency) + throws Exception { + LOG.debug("Checking table is enabled and getting table descriptor for table {}", tableName); + if (admin.isTableEnabled(TableName.valueOf(tableName))) { + return CanaryTool.sniff(admin, sink, admin.getDescriptor(TableName.valueOf(tableName)), + executor, taskType, rawScanEnabled, readLatency); + } else { + LOG.warn("Table {} is not enabled", tableName); + } + return new LinkedList<>(); + } + + /* + * Loops over regions of this table, and outputs information about the state. + */ + private static List> sniff(final Admin admin, final Sink sink, + TableDescriptor tableDesc, ExecutorService executor, TaskType taskType, + boolean rawScanEnabled, LongAdder rwLatency) throws Exception { + LOG.debug("Reading list of regions for table {}", tableDesc.getTableName()); + try (Table table = admin.getConnection().getTable(tableDesc.getTableName())) { + List tasks = new ArrayList<>(); + try (RegionLocator regionLocator = + admin.getConnection().getRegionLocator(tableDesc.getTableName())) { + for (HRegionLocation location: regionLocator.getAllRegionLocations()) { + ServerName rs = location.getServerName(); + RegionInfo region = location.getRegion(); + tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink)sink, + taskType, rawScanEnabled, rwLatency)); + Map regionMap = ((RegionStdOutSink) sink).getRegionMap(); + regionMap.put(region.getRegionNameAsString(), new RegionTaskResult(region, + region.getTable(), rs)); + } + return executor.invokeAll(tasks); + } + } catch (TableNotFoundException e) { + return Collections.EMPTY_LIST; + } + } + + // monitor for zookeeper mode + private static class ZookeeperMonitor extends Monitor { + private List hosts; + private final String znode; + private final int timeout; + + protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, + Sink sink, ExecutorService executor, boolean treatFailureAsError, long allowedFailures) { + super(connection, monitorTargets, useRegExp, + sink, executor, treatFailureAsError, allowedFailures); + Configuration configuration = connection.getConfiguration(); + znode = + configuration.get(ZOOKEEPER_ZNODE_PARENT, + DEFAULT_ZOOKEEPER_ZNODE_PARENT); + timeout = configuration + .getInt(HConstants.ZK_SESSION_TIMEOUT, HConstants.DEFAULT_ZK_SESSION_TIMEOUT); + ConnectStringParser parser = + new ConnectStringParser(ZKConfig.getZKQuorumServersString(configuration)); + hosts = Lists.newArrayList(); + for (InetSocketAddress server : parser.getServerAddresses()) { + hosts.add(server.toString()); + } + if (allowedFailures > (hosts.size() - 1) / 2) { + LOG.warn("Confirm allowable number of failed ZooKeeper nodes, as quorum will " + + "already be lost. Setting of {} failures is unexpected for {} ensemble size.", + allowedFailures, hosts.size()); + } + } + + @Override public void run() { + List tasks = Lists.newArrayList(); + ZookeeperStdOutSink zkSink = null; + try { + zkSink = this.getSink(); + } catch (RuntimeException e) { + LOG.error("Run ZooKeeperMonitor failed!", e); + this.errorCode = ERROR_EXIT_CODE; + } + this.initialized = true; + for (final String host : hosts) { + tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink)); + } + try { + for (Future future : this.executor.invokeAll(tasks)) { + try { + future.get(); + } catch (ExecutionException e) { + LOG.error("Sniff zookeeper failed!", e); + this.errorCode = ERROR_EXIT_CODE; + } + } + } catch (InterruptedException e) { + this.errorCode = ERROR_EXIT_CODE; + Thread.currentThread().interrupt(); + LOG.error("Sniff zookeeper interrupted!", e); + } + this.done = true; + } + + private ZookeeperStdOutSink getSink() { + if (!(sink instanceof ZookeeperStdOutSink)) { + throw new RuntimeException("Can only write to zookeeper sink"); + } + return ((ZookeeperStdOutSink) sink); + } + } + + + /** + * A monitor for regionserver mode + */ + private static class RegionServerMonitor extends Monitor { + private boolean allRegions; + + public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp, + Sink sink, ExecutorService executor, boolean allRegions, + boolean treatFailureAsError, long allowedFailures) { + super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError, + allowedFailures); + this.allRegions = allRegions; + } + + private RegionServerStdOutSink getSink() { + if (!(sink instanceof RegionServerStdOutSink)) { + throw new RuntimeException("Can only write to regionserver sink"); + } + return ((RegionServerStdOutSink) sink); + } + + @Override + public void run() { + if (this.initAdmin() && this.checkNoTableNames()) { + RegionServerStdOutSink regionServerSink = null; + try { + regionServerSink = this.getSink(); + } catch (RuntimeException e) { + LOG.error("Run RegionServerMonitor failed!", e); + this.errorCode = ERROR_EXIT_CODE; + } + Map> rsAndRMap = this.filterRegionServerByName(); + this.initialized = true; + this.monitorRegionServers(rsAndRMap, regionServerSink); + } + this.done = true; + } + + private boolean checkNoTableNames() { + List foundTableNames = new ArrayList<>(); + TableName[] tableNames = null; + LOG.debug("Reading list of tables"); + try { + tableNames = this.admin.listTableNames(); + } catch (IOException e) { + LOG.error("Get listTableNames failed", e); + this.errorCode = INIT_ERROR_EXIT_CODE; + return false; + } + + if (this.targets == null || this.targets.length == 0) return true; + + for (String target : this.targets) { + for (TableName tableName : tableNames) { + if (target.equals(tableName.getNameAsString())) { + foundTableNames.add(target); + } + } + } + + if (foundTableNames.size() > 0) { + System.err.println("Cannot pass a tablename when using the -regionserver " + + "option, tablenames:" + foundTableNames.toString()); + this.errorCode = USAGE_EXIT_CODE; + } + return foundTableNames.isEmpty(); + } + + private void monitorRegionServers(Map> rsAndRMap, RegionServerStdOutSink regionServerSink) { + List tasks = new ArrayList<>(); + Map successMap = new HashMap<>(); + Random rand = new Random(); + for (Map.Entry> entry : rsAndRMap.entrySet()) { + String serverName = entry.getKey(); + AtomicLong successes = new AtomicLong(0); + successMap.put(serverName, successes); + if (entry.getValue().isEmpty()) { + LOG.error("Regionserver not serving any regions - {}", serverName); + } else if (this.allRegions) { + for (RegionInfo region : entry.getValue()) { + tasks.add(new RegionServerTask(this.connection, + serverName, + region, + regionServerSink, + successes)); + } + } else { + // random select a region if flag not set + RegionInfo region = entry.getValue().get(rand.nextInt(entry.getValue().size())); + tasks.add(new RegionServerTask(this.connection, + serverName, + region, + regionServerSink, + successes)); + } + } + try { + for (Future future : this.executor.invokeAll(tasks)) { + try { + future.get(); + } catch (ExecutionException e) { + LOG.error("Sniff regionserver failed!", e); + this.errorCode = ERROR_EXIT_CODE; + } + } + if (this.allRegions) { + for (Map.Entry> entry : rsAndRMap.entrySet()) { + String serverName = entry.getKey(); + LOG.info("Successfully read {} regions out of {} on regionserver {}", + successMap.get(serverName), entry.getValue().size(), serverName); + } + } + } catch (InterruptedException e) { + this.errorCode = ERROR_EXIT_CODE; + LOG.error("Sniff regionserver interrupted!", e); + } + } + + private Map> filterRegionServerByName() { + Map> regionServerAndRegionsMap = this.getAllRegionServerByName(); + regionServerAndRegionsMap = this.doFilterRegionServerByName(regionServerAndRegionsMap); + return regionServerAndRegionsMap; + } + + private Map> getAllRegionServerByName() { + Map> rsAndRMap = new HashMap<>(); + try { + LOG.debug("Reading list of tables and locations"); + List tableDescs = this.admin.listTableDescriptors(); + List regions = null; + for (TableDescriptor tableDesc: tableDescs) { + try (RegionLocator regionLocator = + this.admin.getConnection().getRegionLocator(tableDesc.getTableName())) { + for (HRegionLocation location : regionLocator.getAllRegionLocations()) { + ServerName rs = location.getServerName(); + String rsName = rs.getHostname(); + RegionInfo r = location.getRegion(); + if (rsAndRMap.containsKey(rsName)) { + regions = rsAndRMap.get(rsName); + } else { + regions = new ArrayList<>(); + rsAndRMap.put(rsName, regions); + } + regions.add(r); + } + } + } + + // get any live regionservers not serving any regions + for (ServerName rs: this.admin.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)) + .getLiveServerMetrics().keySet()) { + String rsName = rs.getHostname(); + if (!rsAndRMap.containsKey(rsName)) { + rsAndRMap.put(rsName, Collections. emptyList()); + } + } + } catch (IOException e) { + LOG.error("Get HTables info failed", e); + this.errorCode = INIT_ERROR_EXIT_CODE; + } + return rsAndRMap; + } + + private Map> doFilterRegionServerByName( + Map> fullRsAndRMap) { + + Map> filteredRsAndRMap = null; + + if (this.targets != null && this.targets.length > 0) { + filteredRsAndRMap = new HashMap<>(); + Pattern pattern = null; + Matcher matcher = null; + boolean regExpFound = false; + for (String rsName : this.targets) { + if (this.useRegExp) { + regExpFound = false; + pattern = Pattern.compile(rsName); + for (Map.Entry> entry : fullRsAndRMap.entrySet()) { + matcher = pattern.matcher(entry.getKey()); + if (matcher.matches()) { + filteredRsAndRMap.put(entry.getKey(), entry.getValue()); + regExpFound = true; + } + } + if (!regExpFound) { + LOG.info("No RegionServerInfo found, regionServerPattern {}", rsName); + } + } else { + if (fullRsAndRMap.containsKey(rsName)) { + filteredRsAndRMap.put(rsName, fullRsAndRMap.get(rsName)); + } else { + LOG.info("No RegionServerInfo found, regionServerName {}", rsName); + } + } + } + } else { + filteredRsAndRMap = fullRsAndRMap; + } + return filteredRsAndRMap; + } + } + + public static void main(String[] args) throws Exception { + final Configuration conf = HBaseConfiguration.create(); + + int numThreads = conf.getInt("hbase.canary.threads.num", MAX_THREADS_NUM); + LOG.info("Execution thread count={}", numThreads); + + int exitCode; + ExecutorService executor = new ScheduledThreadPoolExecutor(numThreads); + try { + exitCode = ToolRunner.run(conf, new CanaryTool(executor), args); + } finally { + executor.shutdown(); + } + System.exit(exitCode); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java index 79d52619203..dbbdf616edd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/tool/TestCanaryTool.java @@ -53,7 +53,6 @@ import org.apache.log4j.spi.LoggingEvent; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -120,8 +119,8 @@ public class TestCanaryTool { table.put(p); } ExecutorService executor = new ScheduledThreadPoolExecutor(1); - Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink()); - Canary canary = new Canary(executor, sink); + CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink()); + CanaryTool canary = new CanaryTool(executor, sink); String[] args = { "-writeSniffing", "-t", "10000", tableName.getNameAsString() }; assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args)); assertEquals("verify no read error count", 0, canary.getReadFailures().size()); @@ -142,8 +141,8 @@ public class TestCanaryTool { table.put(p); } ExecutorService executor = new ScheduledThreadPoolExecutor(1); - Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink()); - Canary canary = new Canary(executor, sink); + CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink()); + CanaryTool canary = new CanaryTool(executor, sink); String[] args = { "-writeSniffing", "-t", "10000", "testCanaryRegionTaskResult" }; assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args)); @@ -156,11 +155,11 @@ public class TestCanaryTool { assertTrue("canary should expect to scan at least 1 region", sink.getTotalExpectedRegions() > 0); - Map regionMap = sink.getRegionMap(); + Map regionMap = sink.getRegionMap(); assertFalse("verify region map has size > 0", regionMap.isEmpty()); for (String regionName : regionMap.keySet()) { - Canary.RegionTaskResult res = regionMap.get(regionName); + CanaryTool.RegionTaskResult res = regionMap.get(regionName); assertNotNull("verify each expected region has a RegionTaskResult object in the map", res); assertNotNull("verify getRegionNameAsString()", regionName); assertNotNull("verify getRegionInfo()", res.getRegionInfo()); @@ -169,7 +168,7 @@ public class TestCanaryTool { assertNotNull("verify getServerName()", res.getServerName()); assertNotNull("verify getServerNameAsString()", res.getServerNameAsString()); - if (regionName.contains(Canary.DEFAULT_WRITE_TABLE_NAME.getNameAsString())) { + if (regionName.contains(CanaryTool.DEFAULT_WRITE_TABLE_NAME.getNameAsString())) { assertTrue("write to region " + regionName + " succeeded", res.isWriteSuccess()); assertTrue("write took some time", res.getWriteLatency() > -1); } else { @@ -180,7 +179,6 @@ public class TestCanaryTool { } @Test - @Ignore("Intermittent argument matching failures, see HBASE-18813") public void testReadTableTimeouts() throws Exception { final TableName [] tableNames = new TableName[2]; tableNames[0] = TableName.valueOf(name.getMethodName() + "1"); @@ -197,8 +195,8 @@ public class TestCanaryTool { } } ExecutorService executor = new ScheduledThreadPoolExecutor(1); - Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink()); - Canary canary = new Canary(executor, sink); + CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink()); + CanaryTool canary = new CanaryTool(executor, sink); String configuredTimeoutStr = tableNames[0].getNameAsString() + "=" + Long.MAX_VALUE + "," + tableNames[1].getNameAsString() + "=0"; String[] args = {"-readTableTimeouts", configuredTimeoutStr, name.getMethodName() + "1", @@ -219,17 +217,16 @@ public class TestCanaryTool { verify(mockAppender, times(2)).doAppend(argThat(new ArgumentMatcher() { @Override public boolean matches(LoggingEvent argument) { - return argument.getRenderedMessage().contains("The configured read timeout was"); + return argument.getRenderedMessage().contains("Configured read timeout"); } })); } @Test - @Ignore("Intermittent argument matching failures, see HBASE-18813") public void testWriteTableTimeout() throws Exception { ExecutorService executor = new ScheduledThreadPoolExecutor(1); - Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink()); - Canary canary = new Canary(executor, sink); + CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink()); + CanaryTool canary = new CanaryTool(executor, sink); String[] args = { "-writeSniffing", "-writeTableTimeout", String.valueOf(Long.MAX_VALUE)}; assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args)); assertNotEquals("verify non-null write latency", null, sink.getWriteLatency()); @@ -238,7 +235,7 @@ public class TestCanaryTool { new ArgumentMatcher() { @Override public boolean matches(LoggingEvent argument) { - return argument.getRenderedMessage().contains("The configured write timeout was"); + return argument.getRenderedMessage().contains("Configured write timeout"); } })); } @@ -281,8 +278,8 @@ public class TestCanaryTool { table.put(p); } ExecutorService executor = new ScheduledThreadPoolExecutor(1); - Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink()); - Canary canary = new Canary(executor, sink); + CanaryTool.RegionStdOutSink sink = spy(new CanaryTool.RegionStdOutSink()); + CanaryTool canary = new CanaryTool(executor, sink); String[] args = { "-t", "10000", name.getMethodName() }; org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(testingUtility.getConfiguration()); @@ -296,7 +293,7 @@ public class TestCanaryTool { private void runRegionserverCanary() throws Exception { ExecutorService executor = new ScheduledThreadPoolExecutor(1); - Canary canary = new Canary(executor, new Canary.RegionServerStdOutSink()); + CanaryTool canary = new CanaryTool(executor, new CanaryTool.RegionServerStdOutSink()); String[] args = { "-t", "10000", "-regionserver"}; assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args)); assertEquals("verify no read error count", 0, canary.getReadFailures().size()); @@ -307,8 +304,8 @@ public class TestCanaryTool { Iterables.getOnlyElement(testingUtility.getZkCluster().getClientPortList(), null); testingUtility.getConfiguration().set(HConstants.ZOOKEEPER_QUORUM, "localhost:" + port); ExecutorService executor = new ScheduledThreadPoolExecutor(2); - Canary.ZookeeperStdOutSink sink = spy(new Canary.ZookeeperStdOutSink()); - Canary canary = new Canary(executor, sink); + CanaryTool.ZookeeperStdOutSink sink = spy(new CanaryTool.ZookeeperStdOutSink()); + CanaryTool canary = new CanaryTool(executor, sink); assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args)); String baseZnode = testingUtility.getConfiguration()