HBASE-22036 Rewrite TestScannerHeartbeatMessages

This commit is contained in:
Duo Zhang 2019-04-25 18:18:58 +08:00 committed by zhangduo
parent a185be8a13
commit 6855d58379
2 changed files with 187 additions and 31 deletions

View File

@ -0,0 +1,147 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayDeque;
import java.util.Queue;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
/**
* A ResultScanner which will only send request to RS when there are no cached results when calling
* next, just like the ResultScanner in the old time. Mainly used for writing UTs, that we can
* control when to send request to RS. The default ResultScanner implementation will fetch in
* background.
*/
@InterfaceAudience.Private
public class ScanPerNextResultScanner implements ResultScanner, AdvancedScanResultConsumer {
private final AsyncTable<AdvancedScanResultConsumer> table;
private final Scan scan;
private final Queue<Result> queue = new ArrayDeque<>();
private ScanMetrics scanMetrics;
private boolean closed = false;
private Throwable error;
private ScanResumer resumer;
public ScanPerNextResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan) {
this.table = table;
this.scan = scan;
}
@Override
public synchronized void onError(Throwable error) {
this.error = error;
notifyAll();
}
@Override
public synchronized void onComplete() {
closed = true;
notifyAll();
}
@Override
public void onScanMetricsCreated(ScanMetrics scanMetrics) {
this.scanMetrics = scanMetrics;
}
@Override
public synchronized void onNext(Result[] results, ScanController controller) {
assert results.length > 0;
if (closed) {
controller.terminate();
return;
}
for (Result result : results) {
queue.add(result);
}
notifyAll();
resumer = controller.suspend();
}
@Override
public synchronized void onHeartbeat(ScanController controller) {
if (closed) {
controller.terminate();
return;
}
if (scan.isNeedCursorResult()) {
controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
}
}
@Override
public synchronized Result next() throws IOException {
if (queue.isEmpty()) {
if (resumer != null) {
resumer.resume();
resumer = null;
} else {
table.scan(scan, this);
}
}
while (queue.isEmpty()) {
if (closed) {
return null;
}
if (error != null) {
Throwables.propagateIfPossible(error, IOException.class);
throw new IOException(error);
}
try {
wait();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
return queue.poll();
}
@Override
public synchronized void close() {
closed = true;
queue.clear();
if (resumer != null) {
resumer.resume();
resumer = null;
}
notifyAll();
}
@Override
public boolean renewLease() {
// The renew lease operation will be handled in background
return false;
}
@Override
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
}

View File

@ -39,11 +39,16 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTestConst; import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
@ -58,10 +63,10 @@ import org.junit.AfterClass;
import org.junit.Before; import org.junit.Before;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
@ -75,11 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRespon
* the client when the server has exceeded the time limit during the processing of the scan. When * the client when the server has exceeded the time limit during the processing of the scan. When
* the time limit is reached, the server will return to the Client whatever Results it has * the time limit is reached, the server will return to the Client whatever Results it has
* accumulated (potentially empty). * accumulated (potentially empty).
* <p/>
* TODO: with async client based sync client, we will fetch result in background which makes this
* test broken. We need to find another way to implement the test.
*/ */
@Ignore
@Category(MediumTests.class) @Category(MediumTests.class)
public class TestScannerHeartbeatMessages { public class TestScannerHeartbeatMessages {
@ -89,7 +90,7 @@ public class TestScannerHeartbeatMessages {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Table TABLE = null; private static AsyncConnection CONN;
/** /**
* Table configuration * Table configuration
@ -141,16 +142,19 @@ public class TestScannerHeartbeatMessages {
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1); conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
TEST_UTIL.startMiniCluster(1); TEST_UTIL.startMiniCluster(1);
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
Configuration newConf = new Configuration(conf);
newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
CONN = ConnectionFactory.createAsyncConnection(newConf).get();
} }
static Table createTestTable(TableName name, byte[][] rows, byte[][] families, static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[][] qualifiers, byte[] cellValue) throws IOException { byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families); Table ht = TEST_UTIL.createTable(name, families);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue); List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts); ht.put(puts);
ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
return ht;
} }
/** /**
@ -177,6 +181,7 @@ public class TestScannerHeartbeatMessages {
@AfterClass @AfterClass
public static void tearDownAfterClass() throws Exception { public static void tearDownAfterClass() throws Exception {
Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster(); TEST_UTIL.shutdownMiniCluster();
} }
@ -311,26 +316,28 @@ public class TestScannerHeartbeatMessages {
scan.setMaxResultSize(Long.MAX_VALUE); scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseCellFilter()); scan.setFilter(new SparseCellFilter());
ResultScanner scanner = TABLE.getScanner(scan); try (ScanPerNextResultScanner scanner =
int num = 0; new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
while (scanner.next() != null) { int num = 0;
num++; while (scanner.next() != null) {
num++;
}
assertEquals(1, num);
} }
assertEquals(1, num);
scanner.close();
scan = new Scan(); scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE); scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseCellFilter()); scan.setFilter(new SparseCellFilter());
scan.setAllowPartialResults(true); scan.setAllowPartialResults(true);
scanner = TABLE.getScanner(scan); try (ScanPerNextResultScanner scanner =
num = 0; new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
while (scanner.next() != null) { int num = 0;
num++; while (scanner.next() != null) {
num++;
}
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
} }
assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
scanner.close();
return null; return null;
} }
@ -349,13 +356,14 @@ public class TestScannerHeartbeatMessages {
scan.setMaxResultSize(Long.MAX_VALUE); scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE); scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseRowFilter()); scan.setFilter(new SparseRowFilter());
ResultScanner scanner = TABLE.getScanner(scan); try (ScanPerNextResultScanner scanner =
int num = 0; new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
while (scanner.next() != null) { int num = 0;
num++; while (scanner.next() != null) {
num++;
}
assertEquals(1, num);
} }
assertEquals(1, num);
scanner.close();
return null; return null;
} }
@ -374,8 +382,9 @@ public class TestScannerHeartbeatMessages {
private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime, private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
int cfSleepTime, boolean sleepBeforeCf) throws Exception { int cfSleepTime, boolean sleepBeforeCf) throws Exception {
disableSleeping(); disableSleeping();
final ResultScanner scanner = TABLE.getScanner(scan); AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan); final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);
Result r1 = null; Result r1 = null;
Result r2 = null; Result r2 = null;