HBASE 17959 Canary timeout should be configurable on a per-table basis

For branch-1: Added support for configuring read/write timeouts on a per-table basis
when in region mode.
Added unit test for per-table timeout checks.

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Chinmay Kulkarni 2017-05-31 14:38:41 -07:00 committed by Andrew Purtell
parent 50b201668b
commit 46ee6e0fcc
2 changed files with 210 additions and 32 deletions

View File

@ -244,6 +244,30 @@ public final class Canary implements Tool {
}
}
public static class RegionStdOutSink extends StdOutSink {
private Map<String, AtomicLong> perTableReadLatency = new HashMap<>();
private AtomicLong writeLatency = new AtomicLong();
public Map<String, AtomicLong> getReadLatencyMap() {
return this.perTableReadLatency;
}
public AtomicLong initializeAndGetReadLatencyForTable(String tableName) {
AtomicLong initLatency = new AtomicLong(0L);
this.perTableReadLatency.put(tableName, initLatency);
return initLatency;
}
public void initializeWriteLatency() {
this.writeLatency.set(0L);
}
public AtomicLong getWriteLatency() {
return this.writeLatency;
}
}
static class ZookeeperTask implements Callable<Void> {
private final Connection connection;
private final String host;
@ -291,19 +315,21 @@ public final class Canary implements Tool {
}
private Connection connection;
private HRegionInfo region;
private Sink sink;
private RegionStdOutSink sink;
private TaskType taskType;
private boolean rawScanEnabled;
private ServerName serverName;
private AtomicLong readWriteLatency;
RegionTask(Connection connection, HRegionInfo region, ServerName serverName, Sink sink,
TaskType taskType, boolean rawScanEnabled) {
RegionTask(Connection connection, HRegionInfo region, ServerName serverName, RegionStdOutSink sink,
TaskType taskType, boolean rawScanEnabled, AtomicLong rwLatency) {
this.connection = connection;
this.region = region;
this.serverName = serverName;
this.sink = sink;
this.taskType = taskType;
this.rawScanEnabled = rawScanEnabled;
this.readWriteLatency = rwLatency;
}
@Override
@ -384,6 +410,7 @@ public final class Canary implements Tool {
rs.next();
}
stopWatch.stop();
this.readWriteLatency.addAndGet(stopWatch.getTime());
sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
} catch (Exception e) {
sink.publishReadFailure(serverName, region, column, e);
@ -394,7 +421,6 @@ public final class Canary implements Tool {
}
scan = null;
get = null;
startKey = null;
}
}
try {
@ -436,6 +462,7 @@ public final class Canary implements Tool {
long startTime = System.currentTimeMillis();
table.put(put);
long time = System.currentTimeMillis() - startTime;
this.readWriteLatency.addAndGet(time);
sink.publishWriteTiming(serverName, region, column, time);
} catch (Exception e) {
sink.publishWriteFailure(serverName, region, column, e);
@ -569,8 +596,10 @@ public final class Canary implements Tool {
private boolean zookeeperMode = false;
private boolean regionServerAllRegions = false;
private boolean writeSniffing = false;
private long configuredWriteTableTimeout = DEFAULT_TIMEOUT;
private boolean treatFailureAsError = false;
private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
private HashMap<String, Long> configuredReadTableTimeouts = new HashMap<>();
private ExecutorService executor; // threads to retrieve data from regionservers
@ -653,6 +682,20 @@ public final class Canary implements Tool {
System.err.println("-t needs a numeric value argument.");
printUsageAndExit();
}
} else if(cmd.equals("-writeTableTimeout")) {
i++;
if (i == args.length) {
System.err.println("-writeTableTimeout needs a numeric value argument.");
printUsageAndExit();
}
try {
this.configuredWriteTableTimeout = Long.parseLong(args[i]);
} catch (NumberFormatException e) {
System.err.println("-writeTableTimeout needs a numeric value argument.");
printUsageAndExit();
}
} else if (cmd.equals("-writeTable")) {
i++;
@ -671,6 +714,29 @@ public final class Canary implements Tool {
}
this.failOnError = Boolean.parseBoolean(args[i]);
} else if (cmd.equals("-readTableTimeouts")) {
i++;
if (i == args.length) {
System.err.println("-readTableTimeouts needs a comma-separated list of read timeouts per table (without spaces).");
printUsageAndExit();
}
String [] tableTimeouts = args[i].split(",");
for (String tT: tableTimeouts) {
String [] nameTimeout = tT.split("=");
if (nameTimeout.length < 2) {
System.err.println("Each -readTableTimeouts argument must be of the form <tableName>=<read timeout>.");
printUsageAndExit();
}
long timeoutVal = 0L;
try {
timeoutVal = Long.parseLong(nameTimeout[1]);
} catch (NumberFormatException e) {
System.err.println("-readTableTimeouts read timeout for each table must be a numeric value argument.");
printUsageAndExit();
}
this.configuredReadTableTimeouts.put(nameTimeout[0], timeoutVal);
}
} else {
// no options match
System.err.println(cmd + " options is invalid.");
@ -692,6 +758,10 @@ public final class Canary implements Tool {
printUsageAndExit();
}
}
if (!this.configuredReadTableTimeouts.isEmpty() && (this.regionServerMode || this.zookeeperMode)) {
System.err.println("-readTableTimeouts can only be configured in region mode.");
printUsageAndExit();
}
return index;
}
@ -792,7 +862,10 @@ public final class Canary implements Tool {
System.err.println(" which means the table/regionserver is regular expression pattern");
System.err.println(" -f <B> stop whole program if first error occurs," +
" default is true");
System.err.println(" -t <N> timeout for a check, default is 600000 (milisecs)");
System.err.println(" -t <N> timeout for a check, default is 600000 (millisecs)");
System.err.println(" -writeTableTimeout <N> write timeout for the writeTable, default is 600000 (millisecs)");
System.err.println(" -readTableTimeouts <tableName>=<read timeout>,<tableName>=<read timeout>, ... "
+ "comma-separated list of read timeouts per table (no spaces), default is 600000 (millisecs)");
System.err.println(" -writeSniffing enable the write sniffing in canary");
System.err.println(" -treatFailureAsError treats read / write failure as error");
System.err.println(" -writeTable The table used for write sniffing."
@ -832,8 +905,10 @@ public final class Canary implements Tool {
(ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
} else {
monitor =
new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
this.writeSniffing, this.writeTableName, this.treatFailureAsError);
new RegionMonitor(connection, monitorTargets, this.useRegExp,
(RegionStdOutSink) this.sink, this.executor, this.writeSniffing,
this.writeTableName, this.treatFailureAsError, this.configuredReadTableTimeouts,
this.configuredWriteTableTimeout);
}
return monitor;
}
@ -924,10 +999,12 @@ public final class Canary implements Tool {
private float regionsUpperLimit;
private int checkPeriod;
private boolean rawScanEnabled;
private HashMap<String, Long> configuredReadTableTimeouts;
private long configuredWriteTableTimeout;
public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
boolean treatFailureAsError) {
RegionStdOutSink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, long configuredWriteTableTimeout) {
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
Configuration conf = connection.getConfiguration();
this.writeSniffing = writeSniffing;
@ -942,22 +1019,38 @@ public final class Canary implements Tool {
conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
DEFAULT_WRITE_TABLE_CHECK_PERIOD);
this.rawScanEnabled = conf.getBoolean(HConstants.HBASE_CANARY_READ_RAW_SCAN_KEY, false);
this.configuredReadTableTimeouts = new HashMap<>(configuredReadTableTimeouts);
this.configuredWriteTableTimeout = configuredWriteTableTimeout;
}
private RegionStdOutSink getSink() {
if (!(sink instanceof RegionStdOutSink)) {
throw new RuntimeException("Can only write to Region sink");
}
return ((RegionStdOutSink) sink);
}
@Override
public void run() {
if (this.initAdmin()) {
try {
List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
List<Future<Void>> taskFutures = new LinkedList<>();
RegionStdOutSink regionSink = this.getSink();
if (this.targets != null && this.targets.length > 0) {
String[] tables = generateMonitorTables(this.targets);
// Check to see that each table name passed in the -readTableTimeouts argument is also passed as a monitor target.
if (! new HashSet<>(Arrays.asList(tables)).containsAll(this.configuredReadTableTimeouts.keySet())) {
LOG.error("-readTableTimeouts can only specify read timeouts for monitor targets passed via command line.");
this.errorCode = USAGE_EXIT_CODE;
}
this.initialized = true;
for (String table : tables) {
taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ,
this.rawScanEnabled));
AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table);
taskFutures.addAll(Canary.sniff(admin, regionSink, table, executor, TaskType.READ,
this.rawScanEnabled, readLatency));
}
} else {
taskFutures.addAll(sniff(TaskType.READ));
taskFutures.addAll(sniff(TaskType.READ, regionSink));
}
if (writeSniffing) {
@ -970,8 +1063,10 @@ public final class Canary implements Tool {
lastCheckTime = EnvironmentEdgeManager.currentTime();
}
// sniff canary table with write operation
taskFutures.addAll(Canary.sniff(admin, sink, admin.getTableDescriptor(writeTableName),
executor, TaskType.WRITE, this.rawScanEnabled));
regionSink.initializeWriteLatency();
AtomicLong writeTableLatency = regionSink.getWriteLatency();
taskFutures.addAll(Canary.sniff(admin, regionSink, admin.getTableDescriptor(writeTableName),
executor, TaskType.WRITE, this.rawScanEnabled, writeTableLatency));
}
for (Future<Void> future : taskFutures) {
@ -981,6 +1076,30 @@ public final class Canary implements Tool {
LOG.error("Sniff region failed!", e);
}
}
Map<String, AtomicLong> actualReadTableLatency = regionSink.getReadLatencyMap();
for (String tableName : this.configuredReadTableTimeouts.keySet()) {
if (actualReadTableLatency.containsKey(tableName)) {
Long actual = actualReadTableLatency.get(tableName).longValue();
Long configured = this.configuredReadTableTimeouts.get(tableName);
LOG.info("Read operation for " + tableName + " took " + actual +
" ms. The configured read timeout was " + configured + " ms.");
if (actual > configured) {
LOG.error("Read operation for " + tableName + " exceeded the configured read timeout.");
}
} else {
LOG.error("Read operation for " + tableName + " failed!");
}
}
if (this.writeSniffing) {
String writeTableStringName = this.writeTableName.getNameAsString();
long actualWriteLatency = regionSink.getWriteLatency().longValue();
LOG.info("Write operation for " + writeTableStringName + " took " + actualWriteLatency + " ms. The configured write timeout was " +
this.configuredWriteTableTimeout + " ms.");
// Check that the writeTable write operation latency does not exceed the configured timeout.
if (actualWriteLatency > this.configuredWriteTableTimeout) {
LOG.error("Write operation for " + writeTableStringName + " exceeded the configured write timeout.");
}
}
} catch (Exception e) {
LOG.error("Run regionMonitor failed", e);
this.errorCode = ERROR_EXIT_CODE;
@ -1035,15 +1154,16 @@ public final class Canary implements Tool {
/*
* canary entry point to monitor all the tables.
*/
private List<Future<Void>> sniff(TaskType taskType) throws Exception {
private List<Future<Void>> sniff(TaskType taskType, RegionStdOutSink regionSink) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading list of tables"));
}
List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
List<Future<Void>> taskFutures = new LinkedList<>();
for (HTableDescriptor table : admin.listTables()) {
if (admin.isTableEnabled(table.getTableName())
&& (!table.getTableName().equals(writeTableName))) {
taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled));
AtomicLong readLatency = regionSink.initializeAndGetReadLatencyForTable(table.getNameAsString());
taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType, this.rawScanEnabled, readLatency));
}
}
return taskFutures;
@ -1115,14 +1235,14 @@ public final class Canary implements Tool {
* @throws Exception
*/
private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
ExecutorService executor, TaskType taskType, boolean rawScanEnabled) throws Exception {
ExecutorService executor, TaskType taskType, boolean rawScanEnabled, AtomicLong readLatency) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("checking table is enabled and getting table descriptor for table %s",
tableName));
}
if (admin.isTableEnabled(TableName.valueOf(tableName))) {
return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
executor, taskType, rawScanEnabled);
executor, taskType, rawScanEnabled, readLatency);
} else {
LOG.warn(String.format("Table %s is not enabled", tableName));
}
@ -1130,11 +1250,11 @@ public final class Canary implements Tool {
}
/*
* Loops over regions that owns this table, and output some information abouts the state.
* Loops over regions that owns this table, and output some information about the state.
*/
private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType,
boolean rawScanEnabled) throws Exception {
boolean rawScanEnabled, AtomicLong rwLatency) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(String.format("reading list of regions for table %s", tableDesc.getTableName()));
@ -1158,7 +1278,8 @@ public final class Canary implements Tool {
for (HRegionLocation location : regionLocator.getAllRegionLocations()) {
ServerName rs = location.getServerName();
HRegionInfo region = location.getRegionInfo();
tasks.add(new RegionTask(admin.getConnection(), region, rs, sink, taskType, rawScanEnabled));
tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, taskType, rawScanEnabled,
rwLatency));
}
} finally {
if (regionLocator != null) {

View File

@ -1,5 +1,5 @@
/**
q *
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -47,15 +47,13 @@ import org.mockito.runners.MockitoJUnitRunner;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import static org.junit.Assert.assertNotEquals;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Matchers.argThat;
import static org.mockito.Mockito.never;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
@RunWith(MockitoJUnitRunner.class)
@Category({MediumTests.class})
@ -111,7 +109,7 @@ public class TestCanaryTool {
table.put(p);
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
Canary.RegionServerStdOutSink sink = spy(new Canary.RegionServerStdOutSink());
Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
Canary canary = new Canary(executor, sink);
String[] args = { "-writeSniffing", "-t", "10000", "testTable" };
ToolRunner.run(testingUtility.getConfiguration(), canary, args);
@ -120,6 +118,66 @@ public class TestCanaryTool {
verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(HRegionInfo.class), isA(HColumnDescriptor.class), anyLong());
}
@Test
public void testReadTableTimeouts() throws Exception {
final TableName [] tableNames = new TableName[2];
tableNames[0] = TableName.valueOf("testReadTableTimeouts1");
tableNames[1] = TableName.valueOf("testReadTableTimeouts2");
// Create 2 test tables.
for (int j = 0; j<2; j++) {
Table table = testingUtility.createTable(tableNames[j], new byte[][] { FAMILY });
// insert some test rows
for (int i=0; i<1000; i++) {
byte[] iBytes = Bytes.toBytes(i + j);
Put p = new Put(iBytes);
p.addColumn(FAMILY, COLUMN, iBytes);
table.put(p);
}
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
Canary canary = new Canary(executor, sink);
String configuredTimeoutStr = tableNames[0].getNameAsString() + "=" + Long.MAX_VALUE + "," +
tableNames[1].getNameAsString() + "=0";
String[] args = { "-readTableTimeouts", configuredTimeoutStr, tableNames[0].getNameAsString(), tableNames[1].getNameAsString()};
ToolRunner.run(testingUtility.getConfiguration(), canary, args);
verify(sink, times(tableNames.length)).initializeAndGetReadLatencyForTable(isA(String.class));
for (int i=0; i<2; i++) {
assertNotEquals("verify non-null read latency", null, sink.getReadLatencyMap().get(tableNames[i].getNameAsString()));
assertNotEquals("verify non-zero read latency", 0L, sink.getReadLatencyMap().get(tableNames[i].getNameAsString()));
}
// One table's timeout is set for 0 ms and thus, should lead to an error.
verify(mockAppender, times(1)).doAppend(argThat(new ArgumentMatcher<LoggingEvent>() {
@Override
public boolean matches(Object argument) {
return ((LoggingEvent) argument).getRenderedMessage().contains("exceeded the configured read timeout.");
}
}));
verify(mockAppender, times(2)).doAppend(argThat(new ArgumentMatcher<LoggingEvent>() {
@Override
public boolean matches(Object argument) {
return ((LoggingEvent) argument).getRenderedMessage().contains("The configured read timeout was");
}
}));
}
@Test
public void testWriteTableTimeout() throws Exception {
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
Canary canary = new Canary(executor, sink);
String[] args = { "-writeSniffing", "-writeTableTimeout", String.valueOf(Long.MAX_VALUE)};
ToolRunner.run(testingUtility.getConfiguration(), canary, args);
assertNotEquals("verify non-null write latency", null, sink.getWriteLatency());
assertNotEquals("verify non-zero write latency", 0L, sink.getWriteLatency());
verify(mockAppender, times(1)).doAppend(argThat(new ArgumentMatcher<LoggingEvent>() {
@Override
public boolean matches(Object argument) {
return ((LoggingEvent) argument).getRenderedMessage().contains("The configured write timeout was");
}
}));
}
//no table created, so there should be no regions
@Test
public void testRegionserverNoRegions() throws Exception {
@ -158,7 +216,7 @@ public class TestCanaryTool {
table.put(p);
}
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
Canary.RegionServerStdOutSink sink = spy(new Canary.RegionServerStdOutSink());
Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
Canary canary = new Canary(executor, sink);
String[] args = { "-t", "10000", "testTableRawScan" };
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(testingUtility.getConfiguration());
@ -177,5 +235,4 @@ public class TestCanaryTool {
assertEquals("verify no read error count", 0, canary.getReadFailures().size());
}
}
}