HBASE-13996 Add write sniffing in canary (Liu Shaohui)
This commit is contained in:
parent
a7327a30c4
commit
902cd172f8
|
@ -823,7 +823,8 @@ public final class HConstants {
|
|||
/**
|
||||
* timeout for short operation RPC
|
||||
*/
|
||||
public static final String HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY = "hbase.rpc.shortoperation.timeout";
|
||||
public static final String HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY =
|
||||
"hbase.rpc.shortoperation.timeout";
|
||||
|
||||
/**
|
||||
* Default value of {@link #HBASE_RPC_SHORTOPERATION_TIMEOUT_KEY}
|
||||
|
@ -878,8 +879,8 @@ public final class HConstants {
|
|||
*/
|
||||
public static final float HBASE_CLUSTER_MINIMUM_MEMORY_THRESHOLD = 0.2f;
|
||||
|
||||
public static final Pattern CP_HTD_ATTR_KEY_PATTERN = Pattern.compile
|
||||
("^coprocessor\\$([0-9]+)$", Pattern.CASE_INSENSITIVE);
|
||||
public static final Pattern CP_HTD_ATTR_KEY_PATTERN =
|
||||
Pattern.compile("^coprocessor\\$([0-9]+)$", Pattern.CASE_INSENSITIVE);
|
||||
public static final Pattern CP_HTD_ATTR_VALUE_PATTERN =
|
||||
Pattern.compile("(^[^\\|]*)\\|([^\\|]+)\\|[\\s]*([\\d]*)[\\s]*(\\|.*)?$");
|
||||
|
||||
|
@ -929,7 +930,7 @@ public final class HConstants {
|
|||
* 1 => Abort only all of the handers have died
|
||||
*/
|
||||
public static final String REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT =
|
||||
"hbase.regionserver.handler.abort.on.error.percent";
|
||||
"hbase.regionserver.handler.abort.on.error.percent";
|
||||
public static final double DEFAULT_REGION_SERVER_HANDLER_ABORT_ON_ERROR_PERCENT = 0.5;
|
||||
|
||||
//High priority handlers to deal with admin requests and system table operation requests
|
||||
|
@ -989,7 +990,8 @@ public final class HConstants {
|
|||
public static final String DEFAULT_WAL_STORAGE_POLICY = "NONE";
|
||||
|
||||
/** Region in Transition metrics threshold time */
|
||||
public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD="hbase.metrics.rit.stuck.warning.threshold";
|
||||
public static final String METRICS_RIT_STUCK_WARNING_THRESHOLD =
|
||||
"hbase.metrics.rit.stuck.warning.threshold";
|
||||
|
||||
public static final String LOAD_BALANCER_SLOP_KEY = "hbase.regions.slop";
|
||||
|
||||
|
@ -1081,7 +1083,8 @@ public final class HConstants {
|
|||
* 0.0.0.0.
|
||||
* @see <a href="https://issues.apache.org/jira/browse/HBASE-9961">HBASE-9961</a>
|
||||
*/
|
||||
public static final String STATUS_MULTICAST_BIND_ADDRESS = "hbase.status.multicast.bind.address.ip";
|
||||
public static final String STATUS_MULTICAST_BIND_ADDRESS =
|
||||
"hbase.status.multicast.bind.address.ip";
|
||||
public static final String DEFAULT_STATUS_MULTICAST_BIND_ADDRESS = "0.0.0.0";
|
||||
|
||||
/**
|
||||
|
@ -1210,6 +1213,20 @@ public final class HConstants {
|
|||
public static final String REGION_SPLIT_THREADS_MAX =
|
||||
"hbase.regionserver.region.split.threads.max";
|
||||
|
||||
/** Canary config keys */
|
||||
public static final String HBASE_CANARY_WRITE_DATA_TTL_KEY = "hbase.canary.write.data.ttl";
|
||||
|
||||
public static final String HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY =
|
||||
"hbase.canary.write.perserver.regions.lowerLimit";
|
||||
|
||||
public static final String HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY =
|
||||
"hbase.canary.write.perserver.regions.upperLimit";
|
||||
|
||||
public static final String HBASE_CANARY_WRITE_VALUE_SIZE_KEY = "hbase.canary.write.value.size";
|
||||
|
||||
public static final String HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY =
|
||||
"hbase.canary.write.table.check.period";
|
||||
|
||||
private HConstants() {
|
||||
// Can't be instantiated with this ctor.
|
||||
}
|
||||
|
|
|
@ -47,6 +47,7 @@ org.apache.hadoop.hbase.master.RegionState;
|
|||
org.apache.hadoop.hbase.HTableDescriptor;
|
||||
org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
org.apache.hadoop.hbase.TableName;
|
||||
org.apache.hadoop.hbase.tool.Canary;
|
||||
org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
|
||||
org.apache.hadoop.hbase.master.DeadServer;
|
||||
org.apache.hadoop.hbase.protobuf.ProtobufUtil;
|
||||
|
@ -365,11 +366,14 @@ AssignmentManager assignmentManager = master.getAssignmentManager();
|
|||
</%if>
|
||||
<%java>String description = null;
|
||||
if (tableName.equals(TableName.META_TABLE_NAME)){
|
||||
description = "The hbase:meta table holds references to all User Table regions";
|
||||
description = "The hbase:meta table holds references to all User Table regions.";
|
||||
} else if (tableName.equals(Canary.DEFAULT_WRITE_TABLE_NAME)){
|
||||
description = "The hbase:canary table is used to sniff the write availbility of"
|
||||
+ " each regionserver.";
|
||||
} else if (tableName.equals(AccessControlLists.ACL_TABLE_NAME)){
|
||||
description = "The hbase:acl table holds information about acl";
|
||||
} else if (tableName.equals(VisibilityConstants.LABELS_TABLE_NAME)){
|
||||
description = "The hbase:labels table holds information about visibility labels";
|
||||
} else if (tableName.equals(VisibilityConstants.LABELS_TABLE_NAME)){
|
||||
description = "The hbase:labels table holds information about visibility labels.";
|
||||
} else if (tableName.equals(TableName.NAMESPACE_TABLE_NAME)){
|
||||
description = "The hbase:namespace table holds information about namespaces.";
|
||||
} else if (tableName.equals(QuotaUtil.QUOTA_TABLE_NAME)){
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
@ -47,9 +48,12 @@ import org.apache.hadoop.hbase.ChoreService;
|
|||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HRegionLocation;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.ScheduledChore;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
|
@ -59,12 +63,18 @@ import org.apache.hadoop.hbase.client.Admin;
|
|||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
|
||||
import org.apache.hadoop.hbase.tool.Canary.RegionTask.TaskType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.util.ReflectionUtils;
|
||||
import org.apache.hadoop.hbase.util.RegionSplitter;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
|
||||
|
@ -85,6 +95,9 @@ public final class Canary implements Tool {
|
|||
public void publishReadFailure(HRegionInfo region, Exception e);
|
||||
public void publishReadFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
|
||||
public void publishReadTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
|
||||
public void publishWriteFailure(HRegionInfo region, Exception e);
|
||||
public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e);
|
||||
public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime);
|
||||
}
|
||||
// new extended sink for output regionserver mode info
|
||||
// do not change the Sink interface directly due to maintaining the API
|
||||
|
@ -112,6 +125,23 @@ public final class Canary implements Tool {
|
|||
LOG.info(String.format("read from region %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), column.getNameAsString(), msTime));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishWriteFailure(HRegionInfo region, Exception e) {
|
||||
LOG.error(String.format("write to region %s failed", region.getRegionNameAsString()), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishWriteFailure(HRegionInfo region, HColumnDescriptor column, Exception e) {
|
||||
LOG.error(String.format("write to region %s column family %s failed",
|
||||
region.getRegionNameAsString(), column.getNameAsString()), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishWriteTiming(HRegionInfo region, HColumnDescriptor column, long msTime) {
|
||||
LOG.info(String.format("write to region %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), column.getNameAsString(), msTime));
|
||||
}
|
||||
}
|
||||
// a ExtendedSink implementation
|
||||
public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
|
||||
|
@ -133,18 +163,34 @@ public final class Canary implements Tool {
|
|||
* failure.
|
||||
*/
|
||||
static class RegionTask implements Callable<Void> {
|
||||
public enum TaskType{
|
||||
READ, WRITE
|
||||
}
|
||||
private Connection connection;
|
||||
private HRegionInfo region;
|
||||
private Sink sink;
|
||||
private TaskType taskType;
|
||||
|
||||
RegionTask(Connection connection, HRegionInfo region, Sink sink) {
|
||||
RegionTask(Connection connection, HRegionInfo region, Sink sink, TaskType taskType) {
|
||||
this.connection = connection;
|
||||
this.region = region;
|
||||
this.sink = sink;
|
||||
this.taskType = taskType;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Void call() {
|
||||
switch (taskType) {
|
||||
case READ:
|
||||
return read();
|
||||
case WRITE:
|
||||
return write();
|
||||
default:
|
||||
return read();
|
||||
}
|
||||
}
|
||||
|
||||
public Void read() {
|
||||
Table table = null;
|
||||
HTableDescriptor tableDesc = null;
|
||||
try {
|
||||
|
@ -157,6 +203,7 @@ public final class Canary implements Tool {
|
|||
try {
|
||||
table.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Close table failed", e);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
|
@ -212,6 +259,44 @@ public final class Canary implements Tool {
|
|||
try {
|
||||
table.close();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Close table failed", e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check writes for the canary table
|
||||
* @return
|
||||
*/
|
||||
private Void write() {
|
||||
Table table = null;
|
||||
HTableDescriptor tableDesc = null;
|
||||
try {
|
||||
table = connection.getTable(region.getTable());
|
||||
tableDesc = table.getTableDescriptor();
|
||||
byte[] rowToCheck = region.getStartKey();
|
||||
if (rowToCheck.length == 0) {
|
||||
rowToCheck = new byte[]{0x0};
|
||||
}
|
||||
int writeValueSize =
|
||||
connection.getConfiguration().getInt(HConstants.HBASE_CANARY_WRITE_VALUE_SIZE_KEY, 10);
|
||||
for (HColumnDescriptor column : tableDesc.getColumnFamilies()) {
|
||||
Put put = new Put(rowToCheck);
|
||||
byte[] value = new byte[writeValueSize];
|
||||
Bytes.random(value);
|
||||
put.addColumn(column.getName(), HConstants.EMPTY_BYTE_ARRAY, value);
|
||||
try {
|
||||
long startTime = System.currentTimeMillis();
|
||||
table.put(put);
|
||||
long time = System.currentTimeMillis() - startTime;
|
||||
sink.publishWriteTiming(region, column, time);
|
||||
} catch (Exception e) {
|
||||
sink.publishWriteFailure(region, column, e);
|
||||
}
|
||||
}
|
||||
table.close();
|
||||
} catch (IOException e) {
|
||||
sink.publishWriteFailure(region, e);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -269,6 +354,7 @@ public final class Canary implements Tool {
|
|||
}
|
||||
sink.publishReadTiming(tableName.getNameAsString(), serverName, stopWatch.getTime());
|
||||
} catch (TableNotFoundException tnfe) {
|
||||
LOG.error("Table may be deleted", tnfe);
|
||||
// This is ignored because it doesn't imply that the regionserver is dead
|
||||
} catch (TableNotEnabledException tnee) {
|
||||
// This is considered a success since we got a response.
|
||||
|
@ -284,6 +370,7 @@ public final class Canary implements Tool {
|
|||
try {
|
||||
table.close();
|
||||
} catch (IOException e) {/* DO NOTHING */
|
||||
LOG.error("Close table failed", e);
|
||||
}
|
||||
}
|
||||
scan = null;
|
||||
|
@ -302,11 +389,15 @@ public final class Canary implements Tool {
|
|||
private static final long DEFAULT_INTERVAL = 6000;
|
||||
|
||||
private static final long DEFAULT_TIMEOUT = 600000; // 10 mins
|
||||
|
||||
private static final int MAX_THREADS_NUM = 16; // #threads to contact regions
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(Canary.class);
|
||||
|
||||
public static final TableName DEFAULT_WRITE_TABLE_NAME = TableName.valueOf(
|
||||
NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "canary");
|
||||
|
||||
private static final String CANARY_TABLE_FAMILY_NAME = "Test";
|
||||
|
||||
private Configuration conf = null;
|
||||
private long interval = 0;
|
||||
private Sink sink = null;
|
||||
|
@ -315,6 +406,9 @@ public final class Canary implements Tool {
|
|||
private long timeout = DEFAULT_TIMEOUT;
|
||||
private boolean failOnError = true;
|
||||
private boolean regionServerMode = false;
|
||||
private boolean writeSniffing = false;
|
||||
private TableName writeTableName = DEFAULT_WRITE_TABLE_NAME;
|
||||
|
||||
private ExecutorService executor; // threads to retrieve data from regionservers
|
||||
|
||||
public Canary() {
|
||||
|
@ -336,11 +430,8 @@ public final class Canary implements Tool {
|
|||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
private int parseArgs(String[] args) {
|
||||
int index = -1;
|
||||
ChoreService choreService = null;
|
||||
|
||||
// Process command line args
|
||||
for (int i = 0; i < args.length; i++) {
|
||||
String cmd = args[i];
|
||||
|
@ -375,6 +466,8 @@ public final class Canary implements Tool {
|
|||
}
|
||||
} else if(cmd.equals("-regionserver")) {
|
||||
this.regionServerMode = true;
|
||||
} else if(cmd.equals("-writeSniffing")) {
|
||||
this.writeSniffing = true;
|
||||
} else if (cmd.equals("-e")) {
|
||||
this.useRegExp = true;
|
||||
} else if (cmd.equals("-t")) {
|
||||
|
@ -391,7 +484,14 @@ public final class Canary implements Tool {
|
|||
System.err.println("-t needs a numeric value argument.");
|
||||
printUsageAndExit();
|
||||
}
|
||||
} else if (cmd.equals("-writeTable")) {
|
||||
i++;
|
||||
|
||||
if (i == args.length) {
|
||||
System.err.println("-writeTable needs a string value argument.");
|
||||
printUsageAndExit();
|
||||
}
|
||||
this.writeTableName = TableName.valueOf(args[i]);
|
||||
} else if (cmd.equals("-f")) {
|
||||
i++;
|
||||
|
||||
|
@ -412,6 +512,13 @@ public final class Canary implements Tool {
|
|||
index = i;
|
||||
}
|
||||
}
|
||||
return index;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(String[] args) throws Exception {
|
||||
int index = parseArgs(args);
|
||||
ChoreService choreService = null;
|
||||
|
||||
// Launches chore for refreshing kerberos credentials if security is enabled.
|
||||
// Please see http://hbase.apache.org/book.html#_running_canary_in_a_kerberos_enabled_cluster
|
||||
|
@ -497,6 +604,9 @@ public final class Canary implements Tool {
|
|||
System.err.println(" -f <B> stop whole program if first error occurs," +
|
||||
" default is true");
|
||||
System.err.println(" -t <N> timeout for a check, default is 600000 (milisecs)");
|
||||
System.err.println(" -writeSniffing enable the write sniffing in canary");
|
||||
System.err.println(" -writeTable The table used for write sniffing."
|
||||
+ " Default is hbase:canary");
|
||||
System.exit(USAGE_EXIT_CODE);
|
||||
}
|
||||
|
||||
|
@ -523,7 +633,8 @@ public final class Canary implements Tool {
|
|||
(ExtendedSink) this.sink, this.executor);
|
||||
} else {
|
||||
monitor =
|
||||
new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor);
|
||||
new RegionMonitor(connection, monitorTargets, this.useRegExp, this.sink, this.executor,
|
||||
this.writeSniffing, this.writeTableName);
|
||||
}
|
||||
return monitor;
|
||||
}
|
||||
|
@ -586,10 +697,34 @@ public final class Canary implements Tool {
|
|||
|
||||
// a monitor for region mode
|
||||
private static class RegionMonitor extends Monitor {
|
||||
// 10 minutes
|
||||
private static final int DEFAULT_WRITE_TABLE_CHECK_PERIOD = 10 * 60 * 1000;
|
||||
// 1 days
|
||||
private static final int DEFAULT_WRITE_DATA_TTL = 24 * 60 * 60;
|
||||
|
||||
private long lastCheckTime = -1;
|
||||
private boolean writeSniffing;
|
||||
private TableName writeTableName;
|
||||
private int writeDataTTL;
|
||||
private float regionsLowerLimit;
|
||||
private float regionsUpperLimit;
|
||||
private int checkPeriod;
|
||||
|
||||
public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
|
||||
Sink sink, ExecutorService executor) {
|
||||
Sink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName) {
|
||||
super(connection, monitorTargets, useRegExp, sink, executor);
|
||||
Configuration conf = connection.getConfiguration();
|
||||
this.writeSniffing = writeSniffing;
|
||||
this.writeTableName = writeTableName;
|
||||
this.writeDataTTL =
|
||||
conf.getInt(HConstants.HBASE_CANARY_WRITE_DATA_TTL_KEY, DEFAULT_WRITE_DATA_TTL);
|
||||
this.regionsLowerLimit =
|
||||
conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY, 1.0f);
|
||||
this.regionsUpperLimit =
|
||||
conf.getFloat(HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_UPPERLIMIT_KEY, 1.5f);
|
||||
this.checkPeriod =
|
||||
conf.getInt(HConstants.HBASE_CANARY_WRITE_TABLE_CHECK_PERIOD_KEY,
|
||||
DEFAULT_WRITE_TABLE_CHECK_PERIOD);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -601,11 +736,26 @@ public final class Canary implements Tool {
|
|||
String[] tables = generateMonitorTables(this.targets);
|
||||
this.initialized = true;
|
||||
for (String table : tables) {
|
||||
taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
|
||||
taskFutures.addAll(Canary.sniff(admin, sink, table, executor, TaskType.READ));
|
||||
}
|
||||
} else {
|
||||
taskFutures.addAll(sniff());
|
||||
taskFutures.addAll(sniff(TaskType.READ));
|
||||
}
|
||||
|
||||
if (writeSniffing) {
|
||||
if (EnvironmentEdgeManager.currentTime() - lastCheckTime > checkPeriod) {
|
||||
try {
|
||||
checkWriteTableDistribution();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Check canary table distribution failed!", e);
|
||||
}
|
||||
lastCheckTime = EnvironmentEdgeManager.currentTime();
|
||||
}
|
||||
// sniff canary table with write operation
|
||||
taskFutures.addAll(Canary.sniff(admin, sink,
|
||||
admin.getTableDescriptor(writeTableName), executor, TaskType.WRITE));
|
||||
}
|
||||
|
||||
for (Future<Void> future : taskFutures) {
|
||||
try {
|
||||
future.get();
|
||||
|
@ -661,25 +811,86 @@ public final class Canary implements Tool {
|
|||
/*
|
||||
* canary entry point to monitor all the tables.
|
||||
*/
|
||||
private List<Future<Void>> sniff() throws Exception {
|
||||
private List<Future<Void>> sniff(TaskType taskType) throws Exception {
|
||||
List<Future<Void>> taskFutures = new LinkedList<Future<Void>>();
|
||||
for (HTableDescriptor table : admin.listTables()) {
|
||||
if (admin.isTableEnabled(table.getTableName())) {
|
||||
taskFutures.addAll(Canary.sniff(admin, sink, table, executor));
|
||||
if (admin.isTableEnabled(table.getTableName())
|
||||
&& (!table.getTableName().equals(writeTableName))) {
|
||||
taskFutures.addAll(Canary.sniff(admin, sink, table, executor, taskType));
|
||||
}
|
||||
}
|
||||
return taskFutures;
|
||||
}
|
||||
|
||||
private void checkWriteTableDistribution() throws IOException {
|
||||
if (!admin.tableExists(writeTableName)) {
|
||||
int numberOfServers = admin.getClusterStatus().getServers().size();
|
||||
if (numberOfServers == 0) {
|
||||
throw new IllegalStateException("No live regionservers");
|
||||
}
|
||||
createWriteTable(numberOfServers);
|
||||
}
|
||||
|
||||
if (!admin.isTableEnabled(writeTableName)) {
|
||||
admin.enableTable(writeTableName);
|
||||
}
|
||||
|
||||
int numberOfServers = admin.getClusterStatus().getServers().size();
|
||||
List<Pair<HRegionInfo, ServerName>> pairs =
|
||||
MetaTableAccessor.getTableRegionsAndLocations(connection, writeTableName);
|
||||
int numberOfRegions = pairs.size();
|
||||
if (numberOfRegions < numberOfServers * regionsLowerLimit
|
||||
|| numberOfRegions > numberOfServers * regionsUpperLimit) {
|
||||
admin.disableTable(writeTableName);
|
||||
admin.deleteTable(writeTableName);
|
||||
createWriteTable(numberOfServers);
|
||||
}
|
||||
HashSet<ServerName> serverSet = new HashSet<ServerName>();
|
||||
for (Pair<HRegionInfo, ServerName> pair : pairs) {
|
||||
serverSet.add(pair.getSecond());
|
||||
}
|
||||
int numberOfCoveredServers = serverSet.size();
|
||||
if (numberOfCoveredServers < numberOfServers) {
|
||||
admin.balancer();
|
||||
}
|
||||
}
|
||||
|
||||
private void createWriteTable(int numberOfServers) throws IOException {
|
||||
int numberOfRegions = (int)(numberOfServers * regionsLowerLimit);
|
||||
LOG.info("Number of live regionservers: " + numberOfServers + ", "
|
||||
+ "pre-splitting the canary table into " + numberOfRegions + " regions "
|
||||
+ "(current lower limi of regions per server is " + regionsLowerLimit
|
||||
+ " and you can change it by config: "
|
||||
+ HConstants.HBASE_CANARY_WRITE_PERSERVER_REGIONS_LOWERLIMIT_KEY + " )");
|
||||
HTableDescriptor desc = new HTableDescriptor(writeTableName);
|
||||
HColumnDescriptor family = new HColumnDescriptor(CANARY_TABLE_FAMILY_NAME);
|
||||
family.setMaxVersions(1);
|
||||
family.setTimeToLive(writeDataTTL);
|
||||
|
||||
desc.addFamily(family);
|
||||
byte[][] splits = new RegionSplitter.HexStringSplit().split(numberOfRegions);
|
||||
admin.createTable(desc, splits);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Canary entry point for specified table.
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void sniff(final Admin admin, TableName tableName) throws Exception {
|
||||
public static void sniff(final Admin admin, TableName tableName)
|
||||
throws Exception {
|
||||
sniff(admin, tableName, TaskType.READ);
|
||||
}
|
||||
|
||||
/**
|
||||
* Canary entry point for specified table with task type(read/write)
|
||||
* @throws Exception
|
||||
*/
|
||||
public static void sniff(final Admin admin, TableName tableName, TaskType taskType)
|
||||
throws Exception {
|
||||
List<Future<Void>> taskFutures =
|
||||
Canary.sniff(admin, new StdOutSink(), tableName.getNameAsString(),
|
||||
new ScheduledThreadPoolExecutor(1));
|
||||
new ScheduledThreadPoolExecutor(1), taskType);
|
||||
for (Future<Void> future : taskFutures) {
|
||||
future.get();
|
||||
}
|
||||
|
@ -690,10 +901,10 @@ public final class Canary implements Tool {
|
|||
* @throws Exception
|
||||
*/
|
||||
private static List<Future<Void>> sniff(final Admin admin, final Sink sink, String tableName,
|
||||
ExecutorService executor) throws Exception {
|
||||
ExecutorService executor, TaskType taskType) throws Exception {
|
||||
if (admin.isTableEnabled(TableName.valueOf(tableName))) {
|
||||
return Canary.sniff(admin, sink, admin.getTableDescriptor(TableName.valueOf(tableName)),
|
||||
executor);
|
||||
executor, taskType);
|
||||
} else {
|
||||
LOG.warn(String.format("Table %s is not enabled", tableName));
|
||||
}
|
||||
|
@ -704,7 +915,7 @@ public final class Canary implements Tool {
|
|||
* Loops over regions that owns this table, and output some information abouts the state.
|
||||
*/
|
||||
private static List<Future<Void>> sniff(final Admin admin, final Sink sink,
|
||||
HTableDescriptor tableDesc, ExecutorService executor) throws Exception {
|
||||
HTableDescriptor tableDesc, ExecutorService executor, TaskType taskType) throws Exception {
|
||||
Table table = null;
|
||||
try {
|
||||
table = admin.getConnection().getTable(tableDesc.getTableName());
|
||||
|
@ -714,7 +925,7 @@ public final class Canary implements Tool {
|
|||
List<RegionTask> tasks = new ArrayList<RegionTask>();
|
||||
try {
|
||||
for (HRegionInfo region : admin.getTableRegions(tableDesc.getTableName())) {
|
||||
tasks.add(new RegionTask(admin.getConnection(), region, sink));
|
||||
tasks.add(new RegionTask(admin.getConnection(), region, sink, taskType));
|
||||
}
|
||||
} finally {
|
||||
table.close();
|
||||
|
|
|
@ -92,6 +92,8 @@ Usage: bin/hbase org.apache.hadoop.hbase.tool.Canary [opts] [table1 [table2]...]
|
|||
which means the region/regionserver is regular expression pattern
|
||||
-f <B> stop whole program if first error occurs, default is true
|
||||
-t <N> timeout for a check, default is 600000 (milliseconds)
|
||||
-writeSniffing enable the write sniffing in canary
|
||||
-writeTable The table used for write sniffing. Default is hbase:canary
|
||||
----
|
||||
|
||||
This tool will return non zero error codes to user for collaborating with other monitoring tools, such as Nagios.
|
||||
|
@ -193,6 +195,25 @@ This run sets the timeout value to 60 seconds, the default value is 600 seconds.
|
|||
$ ${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.tool.Canary -t 600000
|
||||
----
|
||||
|
||||
==== Enable write sniffing in canary
|
||||
|
||||
By default, the canary tool only check the read operations, it's hard to find the problem in the
|
||||
write path. To enable the write sniffing, you can run canary with the `-writeSniffing` option.
|
||||
When the write sniffing is enabled, the canary tool will create a hbase table and make sure the
|
||||
regions of the table distributed on all region servers. In each sniffing period, the canary will
|
||||
try to put data to these regions to check the write availability of each region server.
|
||||
----
|
||||
$ ${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.tool.Canary -writeSniffing
|
||||
----
|
||||
|
||||
The default write table is `hbase:canary` and can be specified by the option `-writeTable`.
|
||||
----
|
||||
$ ${HBASE_HOME}/bin/hbase org.apache.hadoop.hbase.tool.Canary -writeSniffing -writeTable ns:canary
|
||||
----
|
||||
|
||||
The default value size of each put is 10 bytes and you can set it by the config key:
|
||||
`hbase.canary.write.value.size`.
|
||||
|
||||
==== Running Canary in a Kerberos-enabled Cluster
|
||||
|
||||
To run Canary in a Kerberos-enabled cluster, configure the following two properties in _hbase-site.xml_:
|
||||
|
|
Loading…
Reference in New Issue