HBASE-18762 Canary sink type cast error
Changed the type hierarchy of Canary sinks to reduce confusion and avoid cast errors. Testing Done: Ran the TestCanaryTool.java test suite and confirmed that the working is correct. Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
fa0b0934db
commit
b565a21480
|
@ -121,23 +121,12 @@ public final class Canary implements Tool {
|
|||
public interface Sink {
|
||||
public long getReadFailureCount();
|
||||
public long incReadFailureCount();
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e);
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, Exception e);
|
||||
public void updateReadFailedHostList(HRegionInfo region, String serverName);
|
||||
public Map<String,String> getReadFailures();
|
||||
public void publishReadTiming(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, long msTime);
|
||||
public void updateReadFailures(String regionName, String serverName);
|
||||
public long getWriteFailureCount();
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, Exception e);
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, Exception e);
|
||||
public void publishWriteTiming(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, long msTime);
|
||||
public void updateWriteFailedHostList(HRegionInfo region, String serverName);
|
||||
public long incWriteFailureCount();
|
||||
public Map<String,String> getWriteFailures();
|
||||
}
|
||||
// new extended sink for output regionserver mode info
|
||||
// do not change the Sink interface directly due to maintaining the API
|
||||
public interface ExtendedSink extends Sink {
|
||||
public void publishReadFailure(String table, String server);
|
||||
public void publishReadTiming(String table, String server, long msTime);
|
||||
public void updateWriteFailures(String regionName, String serverName);
|
||||
}
|
||||
|
||||
// Simple implementation of canary sink that allows to plot on
|
||||
|
@ -159,38 +148,14 @@ public final class Canary implements Tool {
|
|||
return readFailureCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
||||
readFailureCount.incrementAndGet();
|
||||
LOG.error(String.format("read from region %s on regionserver %s failed", region.getRegionNameAsString(), serverName), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, Exception e) {
|
||||
readFailureCount.incrementAndGet();
|
||||
LOG.error(String.format("read from region %s on regionserver %s column family %s failed",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString()), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateReadFailedHostList(HRegionInfo region, String serverName) {
|
||||
readFailures.put(region.getRegionNameAsString(), serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishReadTiming(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, long msTime) {
|
||||
LOG.info(String.format("read from region %s on regionserver %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getReadFailures() {
|
||||
return readFailures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getWriteFailures() {
|
||||
return writeFailures;
|
||||
public void updateReadFailures(String regionName, String serverName) {
|
||||
readFailures.put(regionName, serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -199,53 +164,42 @@ public final class Canary implements Tool {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
||||
writeFailureCount.incrementAndGet();
|
||||
LOG.error(String.format("write to region %s on regionserver %s failed", region.getRegionNameAsString(), serverName), e);
|
||||
public long incWriteFailureCount() {
|
||||
return writeFailureCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, Exception e) {
|
||||
writeFailureCount.incrementAndGet();
|
||||
LOG.error(String.format("write to region %s on regionserver %s column family %s failed",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString()), e);
|
||||
public Map<String, String> getWriteFailures() {
|
||||
return writeFailures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishWriteTiming(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, long msTime) {
|
||||
LOG.info(String.format("write to region %s on regionserver %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||
public void updateWriteFailures(String regionName, String serverName) {
|
||||
writeFailures.put(regionName, serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateWriteFailedHostList(HRegionInfo region, String serverName) {
|
||||
writeFailures.put(region.getRegionNameAsString(), serverName);
|
||||
}
|
||||
|
||||
}
|
||||
// a ExtendedSink implementation
|
||||
public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
|
||||
|
||||
@Override
|
||||
public static class RegionServerStdOutSink extends StdOutSink {
|
||||
|
||||
public void publishReadFailure(String table, String server) {
|
||||
incReadFailureCount();
|
||||
LOG.error(String.format("Read from table:%s on region server:%s", table, server));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishReadTiming(String table, String server, long msTime) {
|
||||
LOG.info(String.format("Read from table:%s on region server:%s in %dms",
|
||||
table, server, msTime));
|
||||
}
|
||||
}
|
||||
|
||||
public static class ZookeeperStdOutSink extends StdOutSink implements ExtendedSink {
|
||||
@Override public void publishReadFailure(String zNode, String server) {
|
||||
public static class ZookeeperStdOutSink extends StdOutSink {
|
||||
|
||||
public void publishReadFailure(String zNode, String server) {
|
||||
incReadFailureCount();
|
||||
LOG.error(String.format("Read from zNode:%s on zookeeper instance:%s", zNode, server));
|
||||
}
|
||||
|
||||
@Override public void publishReadTiming(String znode, String server, long msTime) {
|
||||
public void publishReadTiming(String znode, String server, long msTime) {
|
||||
LOG.info(String.format("Read from zNode:%s on zookeeper instance:%s in %dms",
|
||||
znode, server, msTime));
|
||||
}
|
||||
|
@ -256,6 +210,38 @@ public final class Canary implements Tool {
|
|||
private Map<String, LongAdder> perTableReadLatency = new HashMap<>();
|
||||
private LongAdder writeLatency = new LongAdder();
|
||||
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
||||
incReadFailureCount();
|
||||
LOG.error(String.format("read from region %s on regionserver %s failed", region.getRegionNameAsString(), serverName), e);
|
||||
}
|
||||
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, Exception e) {
|
||||
incReadFailureCount();
|
||||
LOG.error(String.format("read from region %s on regionserver %s column family %s failed",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString()), e);
|
||||
}
|
||||
|
||||
public void publishReadTiming(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, long msTime) {
|
||||
LOG.info(String.format("read from region %s on regionserver %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||
}
|
||||
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
||||
incWriteFailureCount();
|
||||
LOG.error(String.format("write to region %s on regionserver %s failed", region.getRegionNameAsString(), serverName), e);
|
||||
}
|
||||
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, Exception e) {
|
||||
incWriteFailureCount();
|
||||
LOG.error(String.format("write to region %s on regionserver %s column family %s failed",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString()), e);
|
||||
}
|
||||
|
||||
public void publishWriteTiming(ServerName serverName, HRegionInfo region, ColumnFamilyDescriptor column, long msTime) {
|
||||
LOG.info(String.format("write to region %s on regionserver %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||
}
|
||||
|
||||
public Map<String, LongAdder> getReadLatencyMap() {
|
||||
return this.perTableReadLatency;
|
||||
}
|
||||
|
@ -421,7 +407,7 @@ public final class Canary implements Tool {
|
|||
sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
|
||||
} catch (Exception e) {
|
||||
sink.publishReadFailure(serverName, region, column, e);
|
||||
sink.updateReadFailedHostList(region, serverName.getHostname());
|
||||
sink.updateReadFailures(region.getRegionNameAsString(), serverName.getHostname());
|
||||
} finally {
|
||||
if (rs != null) {
|
||||
rs.close();
|
||||
|
@ -478,7 +464,7 @@ public final class Canary implements Tool {
|
|||
table.close();
|
||||
} catch (IOException e) {
|
||||
sink.publishWriteFailure(serverName, region, e);
|
||||
sink.updateWriteFailedHostList(region, serverName.getHostname());
|
||||
sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() );
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -491,11 +477,11 @@ public final class Canary implements Tool {
|
|||
private Connection connection;
|
||||
private String serverName;
|
||||
private HRegionInfo region;
|
||||
private ExtendedSink sink;
|
||||
private RegionServerStdOutSink sink;
|
||||
private AtomicLong successes;
|
||||
|
||||
RegionServerTask(Connection connection, String serverName, HRegionInfo region,
|
||||
ExtendedSink sink, AtomicLong successes) {
|
||||
RegionServerStdOutSink sink, AtomicLong successes) {
|
||||
this.connection = connection;
|
||||
this.serverName = serverName;
|
||||
this.region = region;
|
||||
|
@ -901,19 +887,19 @@ public final class Canary implements Tool {
|
|||
System.arraycopy(args, index, monitorTargets, 0, length);
|
||||
}
|
||||
|
||||
if (this.regionServerMode) {
|
||||
if (this.sink instanceof RegionServerStdOutSink || this.regionServerMode) {
|
||||
monitor =
|
||||
new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
|
||||
(ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
|
||||
(StdOutSink) this.sink, this.executor, this.regionServerAllRegions,
|
||||
this.treatFailureAsError);
|
||||
} else if (this.zookeeperMode) {
|
||||
} else if (this.sink instanceof ZookeeperStdOutSink || this.zookeeperMode) {
|
||||
monitor =
|
||||
new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
|
||||
(ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
|
||||
(StdOutSink) this.sink, this.executor, this.treatFailureAsError);
|
||||
} else {
|
||||
monitor =
|
||||
new RegionMonitor(connection, monitorTargets, this.useRegExp,
|
||||
(RegionStdOutSink) this.sink, this.executor, this.writeSniffing,
|
||||
(StdOutSink) this.sink, this.executor, this.writeSniffing,
|
||||
this.writeTableName, this.treatFailureAsError, this.configuredReadTableTimeouts,
|
||||
this.configuredWriteTableTimeout);
|
||||
}
|
||||
|
@ -1010,7 +996,7 @@ public final class Canary implements Tool {
|
|||
private long configuredWriteTableTimeout;
|
||||
|
||||
public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
|
||||
RegionStdOutSink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
|
||||
StdOutSink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
|
||||
boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, long configuredWriteTableTimeout) {
|
||||
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
|
||||
Configuration conf = connection.getConfiguration();
|
||||
|
@ -1042,7 +1028,13 @@ public final class Canary implements Tool {
|
|||
if (this.initAdmin()) {
|
||||
try {
|
||||
List<Future<Void>> taskFutures = new LinkedList<>();
|
||||
RegionStdOutSink regionSink = this.getSink();
|
||||
RegionStdOutSink regionSink = null;
|
||||
try {
|
||||
regionSink = this.getSink();
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("Run RegionMonitor failed!", e);
|
||||
this.errorCode = ERROR_EXIT_CODE;
|
||||
}
|
||||
if (this.targets != null && this.targets.length > 0) {
|
||||
String[] tables = generateMonitorTables(this.targets);
|
||||
// Check to see that each table name passed in the -readTableTimeouts argument is also passed as a monitor target.
|
||||
|
@ -1303,7 +1295,7 @@ public final class Canary implements Tool {
|
|||
private final int timeout;
|
||||
|
||||
protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
|
||||
ExtendedSink sink, ExecutorService executor, boolean treatFailureAsError) {
|
||||
StdOutSink sink, ExecutorService executor, boolean treatFailureAsError) {
|
||||
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
|
||||
Configuration configuration = connection.getConfiguration();
|
||||
znode =
|
||||
|
@ -1321,8 +1313,16 @@ public final class Canary implements Tool {
|
|||
|
||||
@Override public void run() {
|
||||
List<ZookeeperTask> tasks = Lists.newArrayList();
|
||||
ZookeeperStdOutSink zkSink = null;
|
||||
try {
|
||||
zkSink = this.getSink();
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("Run ZooKeeperMonitor failed!", e);
|
||||
this.errorCode = ERROR_EXIT_CODE;
|
||||
}
|
||||
this.initialized = true;
|
||||
for (final String host : hosts) {
|
||||
tasks.add(new ZookeeperTask(connection, host, znode, timeout, getSink()));
|
||||
tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
|
||||
}
|
||||
try {
|
||||
for (Future<Void> future : this.executor.invokeAll(tasks)) {
|
||||
|
@ -1341,7 +1341,6 @@ public final class Canary implements Tool {
|
|||
this.done = true;
|
||||
}
|
||||
|
||||
|
||||
private ZookeeperStdOutSink getSink() {
|
||||
if (!(sink instanceof ZookeeperStdOutSink)) {
|
||||
throw new RuntimeException("Can only write to zookeeper sink");
|
||||
|
@ -1357,22 +1356,32 @@ public final class Canary implements Tool {
|
|||
private boolean allRegions;
|
||||
|
||||
public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
|
||||
ExtendedSink sink, ExecutorService executor, boolean allRegions,
|
||||
StdOutSink sink, ExecutorService executor, boolean allRegions,
|
||||
boolean treatFailureAsError) {
|
||||
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
|
||||
this.allRegions = allRegions;
|
||||
}
|
||||
|
||||
private ExtendedSink getSink() {
|
||||
return (ExtendedSink) this.sink;
|
||||
private RegionServerStdOutSink getSink() {
|
||||
if (!(sink instanceof RegionServerStdOutSink)) {
|
||||
throw new RuntimeException("Can only write to regionserver sink");
|
||||
}
|
||||
return ((RegionServerStdOutSink) sink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (this.initAdmin() && this.checkNoTableNames()) {
|
||||
RegionServerStdOutSink regionServerSink = null;
|
||||
try {
|
||||
regionServerSink = this.getSink();
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("Run RegionServerMonitor failed!", e);
|
||||
this.errorCode = ERROR_EXIT_CODE;
|
||||
}
|
||||
Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
|
||||
this.initialized = true;
|
||||
this.monitorRegionServers(rsAndRMap);
|
||||
this.monitorRegionServers(rsAndRMap, regionServerSink);
|
||||
}
|
||||
this.done = true;
|
||||
}
|
||||
|
@ -1410,7 +1419,7 @@ public final class Canary implements Tool {
|
|||
return foundTableNames.isEmpty();
|
||||
}
|
||||
|
||||
private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
|
||||
private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap, RegionServerStdOutSink regionServerSink) {
|
||||
List<RegionServerTask> tasks = new ArrayList<>();
|
||||
Map<String, AtomicLong> successMap = new HashMap<>();
|
||||
Random rand = new Random();
|
||||
|
@ -1425,7 +1434,7 @@ public final class Canary implements Tool {
|
|||
tasks.add(new RegionServerTask(this.connection,
|
||||
serverName,
|
||||
region,
|
||||
getSink(),
|
||||
regionServerSink,
|
||||
successes));
|
||||
}
|
||||
} else {
|
||||
|
@ -1434,7 +1443,7 @@ public final class Canary implements Tool {
|
|||
tasks.add(new RegionServerTask(this.connection,
|
||||
serverName,
|
||||
region,
|
||||
getSink(),
|
||||
regionServerSink,
|
||||
successes));
|
||||
}
|
||||
}
|
||||
|
@ -1579,4 +1588,4 @@ public final class Canary implements Tool {
|
|||
executor.shutdown();
|
||||
System.exit(exitCode);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue