HBASE-18762 Canary sink type cast error
Changed the type hierarchy of Canary sinks to reduce confusion and avoid cast errors. Testing Done: Ran the TestCanaryTool.java test suite and confirmed that the working is correct. Signed-off-by: Andrew Purtell <apurtell@apache.org> Amending-Author: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
d6d62a5467
commit
414e0ca292
|
@ -114,23 +114,12 @@ public final class Canary implements Tool {
|
|||
public interface Sink {
|
||||
public long getReadFailureCount();
|
||||
public long incReadFailureCount();
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e);
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, HColumnDescriptor column, Exception e);
|
||||
public void updateReadFailedHostList(HRegionInfo region, String serverName);
|
||||
public Map<String,String> getReadFailures();
|
||||
public void publishReadTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime);
|
||||
public void updateReadFailures(String regionName, String serverName);
|
||||
public long getWriteFailureCount();
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, Exception e);
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, HColumnDescriptor column, Exception e);
|
||||
public void publishWriteTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime);
|
||||
public void updateWriteFailedHostList(HRegionInfo region, String serverName);
|
||||
public long incWriteFailureCount();
|
||||
public Map<String,String> getWriteFailures();
|
||||
}
|
||||
// new extended sink for output regionserver mode info
|
||||
// do not change the Sink interface directly due to maintaining the API
|
||||
public interface ExtendedSink extends Sink {
|
||||
public void publishReadFailure(String table, String server);
|
||||
public void publishReadTiming(String table, String server, long msTime);
|
||||
public void updateWriteFailures(String regionName, String serverName);
|
||||
}
|
||||
|
||||
// Simple implementation of canary sink that allows to plot on
|
||||
|
@ -152,38 +141,14 @@ public final class Canary implements Tool {
|
|||
return readFailureCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
||||
readFailureCount.incrementAndGet();
|
||||
LOG.error(String.format("read from region %s on regionserver %s failed", region.getRegionNameAsString(), serverName), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, HColumnDescriptor column, Exception e) {
|
||||
readFailureCount.incrementAndGet();
|
||||
LOG.error(String.format("read from region %s on regionserver %s column family %s failed",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString()), e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateReadFailedHostList(HRegionInfo region, String serverName) {
|
||||
readFailures.put(region.getRegionNameAsString(), serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishReadTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
|
||||
LOG.info(String.format("read from region %s on regionserver %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getReadFailures() {
|
||||
return readFailures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> getWriteFailures() {
|
||||
return writeFailures;
|
||||
public void updateReadFailures(String regionName, String serverName) {
|
||||
readFailures.put(regionName, serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -192,53 +157,42 @@ public final class Canary implements Tool {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
||||
writeFailureCount.incrementAndGet();
|
||||
LOG.error(String.format("write to region %s on regionserver %s failed", region.getRegionNameAsString(), serverName), e);
|
||||
public long incWriteFailureCount() {
|
||||
return writeFailureCount.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, HColumnDescriptor column, Exception e) {
|
||||
writeFailureCount.incrementAndGet();
|
||||
LOG.error(String.format("write to region %s on regionserver %s column family %s failed",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString()), e);
|
||||
public Map<String, String> getWriteFailures() {
|
||||
return writeFailures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishWriteTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
|
||||
LOG.info(String.format("write to region %s on regionserver %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||
public void updateWriteFailures(String regionName, String serverName) {
|
||||
writeFailures.put(regionName, serverName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void updateWriteFailedHostList(HRegionInfo region, String serverName) {
|
||||
writeFailures.put(region.getRegionNameAsString(), serverName);
|
||||
}
|
||||
|
||||
}
|
||||
// a ExtendedSink implementation
|
||||
public static class RegionServerStdOutSink extends StdOutSink implements ExtendedSink {
|
||||
|
||||
@Override
|
||||
public static class RegionServerStdOutSink extends StdOutSink {
|
||||
|
||||
public void publishReadFailure(String table, String server) {
|
||||
incReadFailureCount();
|
||||
LOG.error(String.format("Read from table:%s on region server:%s", table, server));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishReadTiming(String table, String server, long msTime) {
|
||||
LOG.info(String.format("Read from table:%s on region server:%s in %dms",
|
||||
table, server, msTime));
|
||||
}
|
||||
}
|
||||
|
||||
public static class ZookeeperStdOutSink extends StdOutSink implements ExtendedSink {
|
||||
@Override public void publishReadFailure(String zNode, String server) {
|
||||
public static class ZookeeperStdOutSink extends StdOutSink {
|
||||
|
||||
public void publishReadFailure(String zNode, String server) {
|
||||
incReadFailureCount();
|
||||
LOG.error(String.format("Read from zNode:%s on zookeeper instance:%s", zNode, server));
|
||||
}
|
||||
|
||||
@Override public void publishReadTiming(String znode, String server, long msTime) {
|
||||
public void publishReadTiming(String znode, String server, long msTime) {
|
||||
LOG.info(String.format("Read from zNode:%s on zookeeper instance:%s in %dms",
|
||||
znode, server, msTime));
|
||||
}
|
||||
|
@ -249,6 +203,38 @@ public final class Canary implements Tool {
|
|||
private Map<String, AtomicLong> perTableReadLatency = new HashMap<>();
|
||||
private AtomicLong writeLatency = new AtomicLong();
|
||||
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
||||
incReadFailureCount();
|
||||
LOG.error(String.format("read from region %s on regionserver %s failed", region.getRegionNameAsString(), serverName), e);
|
||||
}
|
||||
|
||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, HColumnDescriptor column, Exception e) {
|
||||
incReadFailureCount();
|
||||
LOG.error(String.format("read from region %s on regionserver %s column family %s failed",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString()), e);
|
||||
}
|
||||
|
||||
public void publishReadTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
|
||||
LOG.info(String.format("read from region %s on regionserver %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||
}
|
||||
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
||||
incWriteFailureCount();
|
||||
LOG.error(String.format("write to region %s on regionserver %s failed", region.getRegionNameAsString(), serverName), e);
|
||||
}
|
||||
|
||||
public void publishWriteFailure(ServerName serverName, HRegionInfo region, HColumnDescriptor column, Exception e) {
|
||||
incWriteFailureCount();
|
||||
LOG.error(String.format("write to region %s on regionserver %s column family %s failed",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString()), e);
|
||||
}
|
||||
|
||||
public void publishWriteTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
|
||||
LOG.info(String.format("write to region %s on regionserver %s column family %s in %dms",
|
||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||
}
|
||||
|
||||
public Map<String, AtomicLong> getReadLatencyMap() {
|
||||
return this.perTableReadLatency;
|
||||
}
|
||||
|
@ -414,7 +400,7 @@ public final class Canary implements Tool {
|
|||
sink.publishReadTiming(serverName, region, column, stopWatch.getTime());
|
||||
} catch (Exception e) {
|
||||
sink.publishReadFailure(serverName, region, column, e);
|
||||
sink.updateReadFailedHostList(region, serverName.getHostname());
|
||||
sink.updateReadFailures(region.getRegionNameAsString(), serverName.getHostname());
|
||||
} finally {
|
||||
if (rs != null) {
|
||||
rs.close();
|
||||
|
@ -471,7 +457,7 @@ public final class Canary implements Tool {
|
|||
table.close();
|
||||
} catch (IOException e) {
|
||||
sink.publishWriteFailure(serverName, region, e);
|
||||
sink.updateWriteFailedHostList(region, serverName.getHostname());
|
||||
sink.updateWriteFailures(region.getRegionNameAsString(), serverName.getHostname() );
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -484,11 +470,11 @@ public final class Canary implements Tool {
|
|||
private Connection connection;
|
||||
private String serverName;
|
||||
private HRegionInfo region;
|
||||
private ExtendedSink sink;
|
||||
private RegionServerStdOutSink sink;
|
||||
private AtomicLong successes;
|
||||
|
||||
RegionServerTask(Connection connection, String serverName, HRegionInfo region,
|
||||
ExtendedSink sink, AtomicLong successes) {
|
||||
RegionServerStdOutSink sink, AtomicLong successes) {
|
||||
this.connection = connection;
|
||||
this.serverName = serverName;
|
||||
this.region = region;
|
||||
|
@ -894,21 +880,21 @@ public final class Canary implements Tool {
|
|||
System.arraycopy(args, index, monitorTargets, 0, length);
|
||||
}
|
||||
|
||||
if (this.regionServerMode) {
|
||||
if (this.sink instanceof RegionServerStdOutSink || this.regionServerMode) {
|
||||
monitor =
|
||||
new RegionServerMonitor(connection, monitorTargets, this.useRegExp,
|
||||
(ExtendedSink) this.sink, this.executor, this.regionServerAllRegions,
|
||||
(StdOutSink) this.sink, this.executor, this.regionServerAllRegions,
|
||||
this.treatFailureAsError);
|
||||
} else if (this.zookeeperMode) {
|
||||
} else if (this.sink instanceof ZookeeperStdOutSink || this.zookeeperMode) {
|
||||
monitor =
|
||||
new ZookeeperMonitor(connection, monitorTargets, this.useRegExp,
|
||||
(ZookeeperStdOutSink) this.sink, this.executor, this.treatFailureAsError);
|
||||
(StdOutSink) this.sink, this.executor, this.treatFailureAsError);
|
||||
} else {
|
||||
monitor =
|
||||
new RegionMonitor(connection, monitorTargets, this.useRegExp,
|
||||
(RegionStdOutSink) this.sink, this.executor, this.writeSniffing,
|
||||
this.writeTableName, this.treatFailureAsError, this.configuredReadTableTimeouts,
|
||||
this.configuredWriteTableTimeout);
|
||||
new RegionMonitor(connection, monitorTargets, this.useRegExp,
|
||||
(StdOutSink) this.sink, this.executor, this.writeSniffing,
|
||||
this.writeTableName, this.treatFailureAsError, this.configuredReadTableTimeouts,
|
||||
this.configuredWriteTableTimeout);
|
||||
}
|
||||
return monitor;
|
||||
}
|
||||
|
@ -1003,8 +989,9 @@ public final class Canary implements Tool {
|
|||
private long configuredWriteTableTimeout;
|
||||
|
||||
public RegionMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
|
||||
RegionStdOutSink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
|
||||
boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts, long configuredWriteTableTimeout) {
|
||||
StdOutSink sink, ExecutorService executor, boolean writeSniffing, TableName writeTableName,
|
||||
boolean treatFailureAsError, HashMap<String, Long> configuredReadTableTimeouts,
|
||||
long configuredWriteTableTimeout) {
|
||||
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
|
||||
Configuration conf = connection.getConfiguration();
|
||||
this.writeSniffing = writeSniffing;
|
||||
|
@ -1035,7 +1022,13 @@ public final class Canary implements Tool {
|
|||
if (this.initAdmin()) {
|
||||
try {
|
||||
List<Future<Void>> taskFutures = new LinkedList<>();
|
||||
RegionStdOutSink regionSink = this.getSink();
|
||||
RegionStdOutSink regionSink = null;
|
||||
try {
|
||||
regionSink = this.getSink();
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("Run RegionMonitor failed!", e);
|
||||
this.errorCode = ERROR_EXIT_CODE;
|
||||
}
|
||||
if (this.targets != null && this.targets.length > 0) {
|
||||
String[] tables = generateMonitorTables(this.targets);
|
||||
// Check to see that each table name passed in the -readTableTimeouts argument is also passed as a monitor target.
|
||||
|
@ -1297,7 +1290,7 @@ public final class Canary implements Tool {
|
|||
private final int timeout;
|
||||
|
||||
protected ZookeeperMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
|
||||
ExtendedSink sink, ExecutorService executor, boolean treatFailureAsError) {
|
||||
StdOutSink sink, ExecutorService executor, boolean treatFailureAsError) {
|
||||
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
|
||||
Configuration configuration = connection.getConfiguration();
|
||||
znode =
|
||||
|
@ -1315,8 +1308,16 @@ public final class Canary implements Tool {
|
|||
|
||||
@Override public void run() {
|
||||
List<ZookeeperTask> tasks = Lists.newArrayList();
|
||||
ZookeeperStdOutSink zkSink = null;
|
||||
try {
|
||||
zkSink = this.getSink();
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("Run ZooKeeperMonitor failed!", e);
|
||||
this.errorCode = ERROR_EXIT_CODE;
|
||||
}
|
||||
this.initialized = true;
|
||||
for (final String host : hosts) {
|
||||
tasks.add(new ZookeeperTask(connection, host, znode, timeout, getSink()));
|
||||
tasks.add(new ZookeeperTask(connection, host, znode, timeout, zkSink));
|
||||
}
|
||||
try {
|
||||
for (Future<Void> future : this.executor.invokeAll(tasks)) {
|
||||
|
@ -1335,7 +1336,6 @@ public final class Canary implements Tool {
|
|||
this.done = true;
|
||||
}
|
||||
|
||||
|
||||
private ZookeeperStdOutSink getSink() {
|
||||
if (!(sink instanceof ZookeeperStdOutSink)) {
|
||||
throw new RuntimeException("Can only write to zookeeper sink");
|
||||
|
@ -1351,22 +1351,32 @@ public final class Canary implements Tool {
|
|||
private boolean allRegions;
|
||||
|
||||
public RegionServerMonitor(Connection connection, String[] monitorTargets, boolean useRegExp,
|
||||
ExtendedSink sink, ExecutorService executor, boolean allRegions,
|
||||
StdOutSink sink, ExecutorService executor, boolean allRegions,
|
||||
boolean treatFailureAsError) {
|
||||
super(connection, monitorTargets, useRegExp, sink, executor, treatFailureAsError);
|
||||
this.allRegions = allRegions;
|
||||
}
|
||||
|
||||
private ExtendedSink getSink() {
|
||||
return (ExtendedSink) this.sink;
|
||||
private RegionServerStdOutSink getSink() {
|
||||
if (!(sink instanceof RegionServerStdOutSink)) {
|
||||
throw new RuntimeException("Can only write to regionserver sink");
|
||||
}
|
||||
return ((RegionServerStdOutSink) sink);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (this.initAdmin() && this.checkNoTableNames()) {
|
||||
RegionServerStdOutSink regionServerSink = null;
|
||||
try {
|
||||
regionServerSink = this.getSink();
|
||||
} catch (RuntimeException e) {
|
||||
LOG.error("Run RegionServerMonitor failed!", e);
|
||||
this.errorCode = ERROR_EXIT_CODE;
|
||||
}
|
||||
Map<String, List<HRegionInfo>> rsAndRMap = this.filterRegionServerByName();
|
||||
this.initialized = true;
|
||||
this.monitorRegionServers(rsAndRMap);
|
||||
this.monitorRegionServers(rsAndRMap, regionServerSink);
|
||||
}
|
||||
this.done = true;
|
||||
}
|
||||
|
@ -1404,9 +1414,10 @@ public final class Canary implements Tool {
|
|||
return foundTableNames.size() == 0;
|
||||
}
|
||||
|
||||
private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap) {
|
||||
List<RegionServerTask> tasks = new ArrayList<RegionServerTask>();
|
||||
Map<String, AtomicLong> successMap = new HashMap<String, AtomicLong>();
|
||||
private void monitorRegionServers(Map<String, List<HRegionInfo>> rsAndRMap,
|
||||
RegionServerStdOutSink regionServerSink) {
|
||||
List<RegionServerTask> tasks = new ArrayList<>();
|
||||
Map<String, AtomicLong> successMap = new HashMap<>();
|
||||
Random rand = new Random();
|
||||
for (Map.Entry<String, List<HRegionInfo>> entry : rsAndRMap.entrySet()) {
|
||||
String serverName = entry.getKey();
|
||||
|
@ -1419,7 +1430,7 @@ public final class Canary implements Tool {
|
|||
tasks.add(new RegionServerTask(this.connection,
|
||||
serverName,
|
||||
region,
|
||||
getSink(),
|
||||
regionServerSink,
|
||||
successes));
|
||||
}
|
||||
} else {
|
||||
|
@ -1428,7 +1439,7 @@ public final class Canary implements Tool {
|
|||
tasks.add(new RegionServerTask(this.connection,
|
||||
serverName,
|
||||
region,
|
||||
getSink(),
|
||||
regionServerSink,
|
||||
successes));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue