[HBASE-22804] Provide an API to get list of successful regions and total expected regions in Canary (#612)
Signed-off-by: Xu Cang <xucang@apache.org>
This commit is contained in:
parent
3f84591439
commit
de9b1d403c
|
@ -120,13 +120,19 @@ public final class Canary implements Tool {
|
||||||
public long incWriteFailureCount();
|
public long incWriteFailureCount();
|
||||||
public Map<String,String> getWriteFailures();
|
public Map<String,String> getWriteFailures();
|
||||||
public void updateWriteFailures(String regionName, String serverName);
|
public void updateWriteFailures(String regionName, String serverName);
|
||||||
|
public long getReadSuccessCount();
|
||||||
|
public long incReadSuccessCount();
|
||||||
|
public long getWriteSuccessCount();
|
||||||
|
public long incWriteSuccessCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Simple implementation of canary sink that allows to plot on
|
// Simple implementation of canary sink that allows to plot on
|
||||||
// file or standard output timings or failures.
|
// file or standard output timings or failures.
|
||||||
public static class StdOutSink implements Sink {
|
public static class StdOutSink implements Sink {
|
||||||
private AtomicLong readFailureCount = new AtomicLong(0),
|
private AtomicLong readFailureCount = new AtomicLong(0),
|
||||||
writeFailureCount = new AtomicLong(0);
|
writeFailureCount = new AtomicLong(0),
|
||||||
|
readSuccessCount = new AtomicLong(0),
|
||||||
|
writeSuccessCount = new AtomicLong(0);
|
||||||
|
|
||||||
private Map<String, String> readFailures = new ConcurrentHashMap<String, String>();
|
private Map<String, String> readFailures = new ConcurrentHashMap<String, String>();
|
||||||
private Map<String, String> writeFailures = new ConcurrentHashMap<String, String>();
|
private Map<String, String> writeFailures = new ConcurrentHashMap<String, String>();
|
||||||
|
@ -170,6 +176,26 @@ public final class Canary implements Tool {
|
||||||
public void updateWriteFailures(String regionName, String serverName) {
|
public void updateWriteFailures(String regionName, String serverName) {
|
||||||
writeFailures.put(regionName, serverName);
|
writeFailures.put(regionName, serverName);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getReadSuccessCount() {
|
||||||
|
return readSuccessCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incReadSuccessCount() {
|
||||||
|
return readSuccessCount.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getWriteSuccessCount() {
|
||||||
|
return writeSuccessCount.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long incWriteSuccessCount() {
|
||||||
|
return writeSuccessCount.incrementAndGet();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class RegionServerStdOutSink extends StdOutSink {
|
public static class RegionServerStdOutSink extends StdOutSink {
|
||||||
|
@ -202,6 +228,7 @@ public final class Canary implements Tool {
|
||||||
|
|
||||||
private Map<String, AtomicLong> perTableReadLatency = new HashMap<>();
|
private Map<String, AtomicLong> perTableReadLatency = new HashMap<>();
|
||||||
private AtomicLong writeLatency = new AtomicLong();
|
private AtomicLong writeLatency = new AtomicLong();
|
||||||
|
private Map<String, RegionTaskResult> regionMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
public void publishReadFailure(ServerName serverName, HRegionInfo region, Exception e) {
|
||||||
incReadFailureCount();
|
incReadFailureCount();
|
||||||
|
@ -215,6 +242,10 @@ public final class Canary implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void publishReadTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
|
public void publishReadTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
|
||||||
|
incReadSuccessCount();
|
||||||
|
RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
|
||||||
|
res.setReadSuccess();
|
||||||
|
res.setReadLatency(msTime);
|
||||||
LOG.info(String.format("read from region %s on regionserver %s column family %s in %dms",
|
LOG.info(String.format("read from region %s on regionserver %s column family %s in %dms",
|
||||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||||
}
|
}
|
||||||
|
@ -231,6 +262,10 @@ public final class Canary implements Tool {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void publishWriteTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
|
public void publishWriteTiming(ServerName serverName, HRegionInfo region, HColumnDescriptor column, long msTime) {
|
||||||
|
incWriteSuccessCount();
|
||||||
|
RegionTaskResult res = this.regionMap.get(region.getRegionNameAsString());
|
||||||
|
res.setWriteSuccess();
|
||||||
|
res.setWriteLatency(msTime);
|
||||||
LOG.info(String.format("write to region %s on regionserver %s column family %s in %dms",
|
LOG.info(String.format("write to region %s on regionserver %s column family %s in %dms",
|
||||||
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
region.getRegionNameAsString(), serverName, column.getNameAsString(), msTime));
|
||||||
}
|
}
|
||||||
|
@ -252,6 +287,14 @@ public final class Canary implements Tool {
|
||||||
public AtomicLong getWriteLatency() {
|
public AtomicLong getWriteLatency() {
|
||||||
return this.writeLatency;
|
return this.writeLatency;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<String, RegionTaskResult> getRegionMap() {
|
||||||
|
return this.regionMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getTotalExpectedRegions() {
|
||||||
|
return this.regionMap.size();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class ZookeeperTask implements Callable<Void> {
|
static class ZookeeperTask implements Callable<Void> {
|
||||||
|
@ -883,6 +926,96 @@ public final class Canary implements Tool {
|
||||||
System.exit(USAGE_EXIT_CODE);
|
System.exit(USAGE_EXIT_CODE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Canary region mode-specific data structure which stores information about each region
|
||||||
|
* to be scanned
|
||||||
|
*/
|
||||||
|
public static class RegionTaskResult {
|
||||||
|
private HRegionInfo region;
|
||||||
|
private TableName tableName;
|
||||||
|
private ServerName serverName;
|
||||||
|
private AtomicLong readLatency = null;
|
||||||
|
private AtomicLong writeLatency = null;
|
||||||
|
private boolean readSuccess = false;
|
||||||
|
private boolean writeSuccess = false;
|
||||||
|
|
||||||
|
public RegionTaskResult(HRegionInfo region, TableName tableName, ServerName serverName) {
|
||||||
|
this.region = region;
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.serverName = serverName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HRegionInfo getRegionInfo() {
|
||||||
|
return this.region;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRegionNameAsString() {
|
||||||
|
return this.region.getRegionNameAsString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public TableName getTableName() {
|
||||||
|
return this.tableName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTableNameAsString() {
|
||||||
|
return this.tableName.getNameAsString();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ServerName getServerName() {
|
||||||
|
return this.serverName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getServerNameAsString() {
|
||||||
|
return this.serverName.getServerName();
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getReadLatency() {
|
||||||
|
if (this.readLatency == null) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return this.readLatency.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReadLatency(long readLatency) {
|
||||||
|
if (this.readLatency != null) {
|
||||||
|
this.readLatency.set(readLatency);
|
||||||
|
} else {
|
||||||
|
this.readLatency = new AtomicLong(readLatency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getWriteLatency() {
|
||||||
|
if (this.writeLatency == null) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return this.writeLatency.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWriteLatency(long writeLatency) {
|
||||||
|
if (this.writeLatency != null) {
|
||||||
|
this.writeLatency.set(writeLatency);
|
||||||
|
} else {
|
||||||
|
this.writeLatency = new AtomicLong(writeLatency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isReadSuccess() {
|
||||||
|
return this.readSuccess;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setReadSuccess() {
|
||||||
|
this.readSuccess = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isWriteSuccess() {
|
||||||
|
return this.writeSuccess;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setWriteSuccess() {
|
||||||
|
this.writeSuccess = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A Factory method for {@link Monitor}.
|
* A Factory method for {@link Monitor}.
|
||||||
* Can be overridden by user.
|
* Can be overridden by user.
|
||||||
|
@ -1295,6 +1428,9 @@ public final class Canary implements Tool {
|
||||||
HRegionInfo region = location.getRegionInfo();
|
HRegionInfo region = location.getRegionInfo();
|
||||||
tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, taskType, rawScanEnabled,
|
tasks.add(new RegionTask(admin.getConnection(), region, rs, (RegionStdOutSink) sink, taskType, rawScanEnabled,
|
||||||
rwLatency));
|
rwLatency));
|
||||||
|
Map<String, RegionTaskResult> regionMap = ((RegionStdOutSink) sink).getRegionMap();
|
||||||
|
regionMap.put(region.getRegionNameAsString(), new RegionTaskResult(region,
|
||||||
|
region.getTable(), rs));
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (regionLocator != null) {
|
if (regionLocator != null) {
|
||||||
|
|
|
@ -19,6 +19,25 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.tool;
|
package org.apache.hadoop.hbase.tool;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.anyLong;
|
||||||
|
import static org.mockito.Matchers.argThat;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Matchers.isA;
|
||||||
|
import static org.mockito.Mockito.atLeastOnce;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -34,7 +53,6 @@ import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.apache.log4j.Appender;
|
import org.apache.log4j.Appender;
|
||||||
import org.apache.log4j.LogManager;
|
import org.apache.log4j.LogManager;
|
||||||
import org.apache.log4j.spi.LoggingEvent;
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
import org.junit.Ignore;
|
||||||
|
@ -45,16 +63,7 @@ import org.mockito.ArgumentMatcher;
|
||||||
import org.mockito.Mock;
|
import org.mockito.Mock;
|
||||||
import org.mockito.runners.MockitoJUnitRunner;
|
import org.mockito.runners.MockitoJUnitRunner;
|
||||||
|
|
||||||
import java.util.concurrent.ExecutorService;
|
import com.google.common.collect.Iterables;
|
||||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertNotEquals;
|
|
||||||
import static org.mockito.Matchers.anyLong;
|
|
||||||
import static org.mockito.Matchers.eq;
|
|
||||||
import static org.mockito.Matchers.isA;
|
|
||||||
import static org.mockito.Matchers.argThat;
|
|
||||||
import static org.junit.Assert.assertEquals;
|
|
||||||
import static org.mockito.Mockito.*;
|
|
||||||
|
|
||||||
@RunWith(MockitoJUnitRunner.class)
|
@RunWith(MockitoJUnitRunner.class)
|
||||||
@Category({MediumTests.class})
|
@Category({MediumTests.class})
|
||||||
|
@ -113,6 +122,55 @@ public class TestCanaryTool {
|
||||||
verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(HRegionInfo.class), isA(HColumnDescriptor.class), anyLong());
|
verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(HRegionInfo.class), isA(HColumnDescriptor.class), anyLong());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCanaryRegionTaskResult() throws Exception {
|
||||||
|
TableName tableName = TableName.valueOf("testCanaryRegionTaskResult");
|
||||||
|
HTable table = testingUtility.createTable(tableName, new byte[][]{FAMILY});
|
||||||
|
// insert some test rows
|
||||||
|
for (int i = 0; i < 1000; i++) {
|
||||||
|
byte[] iBytes = Bytes.toBytes(i);
|
||||||
|
Put p = new Put(iBytes);
|
||||||
|
p.addColumn(FAMILY, COLUMN, iBytes);
|
||||||
|
table.put(p);
|
||||||
|
}
|
||||||
|
ExecutorService executor = new ScheduledThreadPoolExecutor(1);
|
||||||
|
Canary.RegionStdOutSink sink = spy(new Canary.RegionStdOutSink());
|
||||||
|
Canary canary = new Canary(executor, sink);
|
||||||
|
String[] args = {"-writeSniffing", "-t", "10000", "testCanaryRegionTaskResult"};
|
||||||
|
assertEquals(0, ToolRunner.run(testingUtility.getConfiguration(), canary, args));
|
||||||
|
|
||||||
|
assertTrue("verify read success count > 0", sink.getReadSuccessCount() > 0);
|
||||||
|
assertTrue("verify write success count > 0", sink.getWriteSuccessCount() > 0);
|
||||||
|
verify(sink, atLeastOnce()).publishReadTiming(isA(ServerName.class), isA(HRegionInfo.class),
|
||||||
|
isA(HColumnDescriptor.class), anyLong());
|
||||||
|
verify(sink, atLeastOnce()).publishWriteTiming(isA(ServerName.class), isA(HRegionInfo.class),
|
||||||
|
isA(HColumnDescriptor.class), anyLong());
|
||||||
|
|
||||||
|
assertTrue("canary should expect to scan at least 1 region",
|
||||||
|
sink.getTotalExpectedRegions() > 0);
|
||||||
|
Map<String, Canary.RegionTaskResult> regionMap = sink.getRegionMap();
|
||||||
|
assertFalse("verify region map has size > 0", regionMap.isEmpty());
|
||||||
|
|
||||||
|
for (String regionName : regionMap.keySet()) {
|
||||||
|
Canary.RegionTaskResult res = regionMap.get(regionName);
|
||||||
|
assertNotNull("verify each expected region has a RegionTaskResult object in the map", res);
|
||||||
|
assertNotNull("verify getRegionNameAsString()", regionName);
|
||||||
|
assertNotNull("verify getRegionInfo()", res.getRegionInfo());
|
||||||
|
assertNotNull("verify getTableName()", res.getTableName());
|
||||||
|
assertNotNull("verify getTableNameAsString()", res.getTableNameAsString());
|
||||||
|
assertNotNull("verify getServerName()", res.getServerName());
|
||||||
|
assertNotNull("verify getServerNameAsString()", res.getServerNameAsString());
|
||||||
|
|
||||||
|
if (regionName.contains(Canary.DEFAULT_WRITE_TABLE_NAME.getNameAsString())) {
|
||||||
|
assertTrue("write to region " + regionName + " succeeded", res.isWriteSuccess());
|
||||||
|
assertTrue("write took some time", res.getWriteLatency() > -1);
|
||||||
|
} else {
|
||||||
|
assertTrue("read from region " + regionName + " succeeded", res.isReadSuccess());
|
||||||
|
assertTrue("read took some time", res.getReadLatency() > -1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@Ignore("Intermittent argument matching failures, see HBASE-18813")
|
@Ignore("Intermittent argument matching failures, see HBASE-18813")
|
||||||
public void testReadTableTimeouts() throws Exception {
|
public void testReadTableTimeouts() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue