HBASE-22238 Fix TestRpcControllerFactory
This commit is contained in:
parent
d1487fcfad
commit
a50154dd0a
|
@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
|
import org.apache.hadoop.hbase.coprocessor.ProtobufCoprocessorService;
|
||||||
import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
|
import org.apache.hadoop.hbase.ipc.DelegatingHBaseRpcController;
|
||||||
|
@ -85,7 +86,6 @@ public class TestRpcControllerFactory {
|
||||||
|
|
||||||
private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
|
private static Multiset<Integer> GROUPED_PRIORITY = ConcurrentHashMultiset.create();
|
||||||
private static AtomicInteger INT_PRIORITY = new AtomicInteger();
|
private static AtomicInteger INT_PRIORITY = new AtomicInteger();
|
||||||
private static AtomicInteger TABLE_PRIORITY = new AtomicInteger();
|
|
||||||
|
|
||||||
public CountingRpcController(HBaseRpcController delegate) {
|
public CountingRpcController(HBaseRpcController delegate) {
|
||||||
super(delegate);
|
super(delegate);
|
||||||
|
@ -93,34 +93,18 @@ public class TestRpcControllerFactory {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void setPriority(int priority) {
|
public void setPriority(int priority) {
|
||||||
int oldPriority = getPriority();
|
|
||||||
super.setPriority(priority);
|
|
||||||
int newPriority = getPriority();
|
|
||||||
if (newPriority != oldPriority) {
|
|
||||||
INT_PRIORITY.incrementAndGet();
|
INT_PRIORITY.incrementAndGet();
|
||||||
GROUPED_PRIORITY.add(priority);
|
GROUPED_PRIORITY.add(priority);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setPriority(TableName tn) {
|
|
||||||
super.setPriority(tn);
|
|
||||||
// ignore counts for system tables - it could change and we really only want to check on what
|
|
||||||
// the client should change
|
|
||||||
if (tn != null && !tn.isSystemTable()) {
|
|
||||||
TABLE_PRIORITY.incrementAndGet();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
@Rule
|
@Rule
|
||||||
public TestName name = new TestName();
|
public TestName name = new TestName();
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setup() throws Exception {
|
public static void setUp() throws Exception {
|
||||||
// load an endpoint so we have an endpoint to test - it doesn't matter which one, but
|
// load an endpoint so we have an endpoint to test - it doesn't matter which one, but
|
||||||
// this is already in tests, so we can just use it.
|
// this is already in tests, so we can just use it.
|
||||||
Configuration conf = UTIL.getConfiguration();
|
Configuration conf = UTIL.getConfiguration();
|
||||||
|
@ -131,7 +115,7 @@ public class TestRpcControllerFactory {
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
@AfterClass
|
||||||
public static void teardown() throws Exception {
|
public static void tearDown() throws Exception {
|
||||||
UTIL.shutdownMiniCluster();
|
UTIL.shutdownMiniCluster();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,8 +138,8 @@ public class TestRpcControllerFactory {
|
||||||
// change one of the connection properties so we get a new Connection with our configuration
|
// change one of the connection properties so we get a new Connection with our configuration
|
||||||
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
|
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT + 1);
|
||||||
|
|
||||||
Connection connection = ConnectionFactory.createConnection(conf);
|
try (Connection connection = ConnectionFactory.createConnection(conf);
|
||||||
Table table = connection.getTable(tableName);
|
Table table = connection.getTable(tableName)) {
|
||||||
byte[] row = Bytes.toBytes("row");
|
byte[] row = Bytes.toBytes("row");
|
||||||
Put p = new Put(row);
|
Put p = new Put(row);
|
||||||
p.addColumn(fam1, fam1, Bytes.toBytes("val0"));
|
p.addColumn(fam1, fam1, Bytes.toBytes("val0"));
|
||||||
|
@ -196,9 +180,9 @@ public class TestRpcControllerFactory {
|
||||||
counter = verifyCount(counter);
|
counter = verifyCount(counter);
|
||||||
|
|
||||||
// make sure all the scanner types are covered
|
// make sure all the scanner types are covered
|
||||||
Scan scanInfo = new Scan(row);
|
Scan scanInfo = new Scan().withStartRow(row);
|
||||||
// regular small
|
// regular small
|
||||||
scanInfo.setSmall(true);
|
scanInfo.setReadType(ReadType.PREAD);
|
||||||
counter = doScan(table, scanInfo, counter);
|
counter = doScan(table, scanInfo, counter);
|
||||||
|
|
||||||
// reversed, small
|
// reversed, small
|
||||||
|
@ -206,7 +190,7 @@ public class TestRpcControllerFactory {
|
||||||
counter = doScan(table, scanInfo, counter);
|
counter = doScan(table, scanInfo, counter);
|
||||||
|
|
||||||
// reversed, regular
|
// reversed, regular
|
||||||
scanInfo.setSmall(false);
|
scanInfo.setReadType(ReadType.STREAM);
|
||||||
counter = doScan(table, scanInfo, counter + 1);
|
counter = doScan(table, scanInfo, counter + 1);
|
||||||
|
|
||||||
// make sure we have no priority count
|
// make sure we have no priority count
|
||||||
|
@ -215,23 +199,22 @@ public class TestRpcControllerFactory {
|
||||||
Get get = new Get(row);
|
Get get = new Get(row);
|
||||||
get.setPriority(HConstants.ADMIN_QOS);
|
get.setPriority(HConstants.ADMIN_QOS);
|
||||||
table.get(get);
|
table.get(get);
|
||||||
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 1);
|
// we will reset the controller for setting the call timeout so it will lead to an extra
|
||||||
|
// setPriority
|
||||||
table.close();
|
verifyPriorityGroupCount(HConstants.ADMIN_QOS, 2);
|
||||||
connection.close();
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int doScan(Table table, Scan scan, int expectedCount) throws IOException {
|
int doScan(Table table, Scan scan, int expectedCount) throws IOException {
|
||||||
ResultScanner results = table.getScanner(scan);
|
try (ResultScanner results = table.getScanner(scan)) {
|
||||||
results.next();
|
results.next();
|
||||||
results.close();
|
}
|
||||||
return verifyCount(expectedCount);
|
return verifyCount(expectedCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
int verifyCount(Integer counter) {
|
int verifyCount(Integer counter) {
|
||||||
assertTrue(CountingRpcController.TABLE_PRIORITY.get() >= counter);
|
assertTrue(CountingRpcController.INT_PRIORITY.get() >= counter);
|
||||||
assertEquals(0, CountingRpcController.INT_PRIORITY.get());
|
return CountingRpcController.GROUPED_PRIORITY.count(HConstants.NORMAL_QOS) + 1;
|
||||||
return CountingRpcController.TABLE_PRIORITY.get() + 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void verifyPriorityGroupCount(int priorityLevel, int count) {
|
void verifyPriorityGroupCount(int priorityLevel, int count) {
|
||||||
|
|
Loading…
Reference in New Issue