HBASE-27048 Server side scanner time limit should account for time in queue (#4562)
Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
parent
a8b253ebb4
commit
8fcb94ae8a
@ -307,7 +307,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
/**
|
/**
|
||||||
* Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
|
* Default value of {@link RSRpcServices#REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA}
|
||||||
*/
|
*/
|
||||||
private static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
|
static final long DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA = 10;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Whether to reject rows with size > threshold defined by
|
* Whether to reject rows with size > threshold defined by
|
||||||
@ -3248,33 +3248,53 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private long getTimeLimit(HBaseRpcController controller, boolean allowHeartbeatMessages) {
|
// visible for testing only
|
||||||
|
long getTimeLimit(RpcCall rpcCall, HBaseRpcController controller,
|
||||||
|
boolean allowHeartbeatMessages) {
|
||||||
// Set the time limit to be half of the more restrictive timeout value (one of the
|
// Set the time limit to be half of the more restrictive timeout value (one of the
|
||||||
// timeout values must be positive). In the event that both values are positive, the
|
// timeout values must be positive). In the event that both values are positive, the
|
||||||
// more restrictive of the two is used to calculate the limit.
|
// more restrictive of the two is used to calculate the limit.
|
||||||
if (allowHeartbeatMessages && (scannerLeaseTimeoutPeriod > 0 || rpcTimeout > 0)) {
|
if (allowHeartbeatMessages) {
|
||||||
long timeLimitDelta;
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
if (scannerLeaseTimeoutPeriod > 0 && rpcTimeout > 0) {
|
long remainingTimeout = getRemainingRpcTimeout(rpcCall, controller, now);
|
||||||
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, rpcTimeout);
|
if (scannerLeaseTimeoutPeriod > 0 || remainingTimeout > 0) {
|
||||||
} else {
|
long timeLimitDelta;
|
||||||
timeLimitDelta = scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : rpcTimeout;
|
if (scannerLeaseTimeoutPeriod > 0 && remainingTimeout > 0) {
|
||||||
|
timeLimitDelta = Math.min(scannerLeaseTimeoutPeriod, remainingTimeout);
|
||||||
|
} else {
|
||||||
|
timeLimitDelta =
|
||||||
|
scannerLeaseTimeoutPeriod > 0 ? scannerLeaseTimeoutPeriod : remainingTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Use half of whichever timeout value was more restrictive... But don't allow
|
||||||
|
// the time limit to be less than the allowable minimum (could cause an
|
||||||
|
// immediate timeout before scanning any data).
|
||||||
|
timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
|
||||||
|
return now + timeLimitDelta;
|
||||||
}
|
}
|
||||||
if (controller != null && controller.getCallTimeout() > 0) {
|
|
||||||
timeLimitDelta = Math.min(timeLimitDelta, controller.getCallTimeout());
|
|
||||||
}
|
|
||||||
// Use half of whichever timeout value was more restrictive... But don't allow
|
|
||||||
// the time limit to be less than the allowable minimum (could cause an
|
|
||||||
// immediatate timeout before scanning any data).
|
|
||||||
timeLimitDelta = Math.max(timeLimitDelta / 2, minimumScanTimeLimitDelta);
|
|
||||||
// XXX: Can not use EnvironmentEdge here because TestIncrementTimeRange use a
|
|
||||||
// ManualEnvironmentEdge. Consider using System.nanoTime instead.
|
|
||||||
return EnvironmentEdgeManager.currentTime() + timeLimitDelta;
|
|
||||||
}
|
}
|
||||||
// Default value of timeLimit is negative to indicate no timeLimit should be
|
// Default value of timeLimit is negative to indicate no timeLimit should be
|
||||||
// enforced.
|
// enforced.
|
||||||
return -1L;
|
return -1L;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private long getRemainingRpcTimeout(RpcCall call, HBaseRpcController controller, long now) {
|
||||||
|
long timeout;
|
||||||
|
if (controller != null && controller.getCallTimeout() > 0) {
|
||||||
|
timeout = controller.getCallTimeout();
|
||||||
|
} else if (rpcTimeout > 0) {
|
||||||
|
timeout = rpcTimeout;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (call != null) {
|
||||||
|
timeout -= (now - call.getReceiveTime());
|
||||||
|
}
|
||||||
|
// getTimeLimit ignores values <= 0, but timeout may now be negative if queue time was high.
|
||||||
|
// return minimum value here in that case so we count this in calculating the final delta.
|
||||||
|
return Math.max(minimumScanTimeLimitDelta, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
|
private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean moreRows,
|
||||||
ScannerContext scannerContext, ScanResponse.Builder builder) {
|
ScannerContext scannerContext, ScanResponse.Builder builder) {
|
||||||
if (numOfCompleteRows >= limitOfRows) {
|
if (numOfCompleteRows >= limitOfRows) {
|
||||||
@ -3289,7 +3309,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
// return whether we have more results in region.
|
// return whether we have more results in region.
|
||||||
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
|
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
|
||||||
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
|
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
|
||||||
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCallContext context)
|
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
HRegion region = rsh.r;
|
HRegion region = rsh.r;
|
||||||
RegionScanner scanner = rsh.s;
|
RegionScanner scanner = rsh.s;
|
||||||
@ -3336,7 +3356,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
// heartbeats AND partials
|
// heartbeats AND partials
|
||||||
boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
|
boolean allowHeartbeatMessages = clientHandlesHeartbeats && allowPartialResults;
|
||||||
|
|
||||||
long timeLimit = getTimeLimit(controller, allowHeartbeatMessages);
|
long timeLimit = getTimeLimit(rpcCall, controller, allowHeartbeatMessages);
|
||||||
|
|
||||||
final LimitScope sizeScope =
|
final LimitScope sizeScope =
|
||||||
allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
|
allowPartialResults ? LimitScope.BETWEEN_CELLS : LimitScope.BETWEEN_ROWS;
|
||||||
@ -3367,7 +3387,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
|
|
||||||
// Collect values to be returned here
|
// Collect values to be returned here
|
||||||
moreRows = scanner.nextRaw(values, scannerContext);
|
moreRows = scanner.nextRaw(values, scannerContext);
|
||||||
if (context == null) {
|
if (rpcCall == null) {
|
||||||
// When there is no RpcCallContext,copy EC to heap, then the scanner would close,
|
// When there is no RpcCallContext,copy EC to heap, then the scanner would close,
|
||||||
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
|
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
|
||||||
// buffers.See more details in HBASE-26036.
|
// buffers.See more details in HBASE-26036.
|
||||||
@ -3405,7 +3425,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
}
|
}
|
||||||
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
|
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
|
||||||
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
|
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
|
||||||
lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
|
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
|
||||||
results.add(r);
|
results.add(r);
|
||||||
numOfResults++;
|
numOfResults++;
|
||||||
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
|
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
|
||||||
@ -3473,7 +3493,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
region.closeRegionOperation();
|
region.closeRegionOperation();
|
||||||
// Update serverside metrics, even on error.
|
// Update serverside metrics, even on error.
|
||||||
long end = EnvironmentEdgeManager.currentTime();
|
long end = EnvironmentEdgeManager.currentTime();
|
||||||
long responseCellSize = context != null ? context.getResponseCellSize() : 0;
|
long responseCellSize = rpcCall != null ? rpcCall.getResponseCellSize() : 0;
|
||||||
region.getMetrics().updateScanTime(end - before);
|
region.getMetrics().updateScanTime(end - before);
|
||||||
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
|
final MetricsRegionServer metricsRegionServer = regionServer.getMetrics();
|
||||||
if (metricsRegionServer != null) {
|
if (metricsRegionServer != null) {
|
||||||
@ -3601,7 +3621,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
} else {
|
} else {
|
||||||
rows = closeScanner ? 0 : 1;
|
rows = closeScanner ? 0 : 1;
|
||||||
}
|
}
|
||||||
RpcCallContext context = RpcServer.getCurrentCall().orElse(null);
|
RpcCall rpcCall = RpcServer.getCurrentCall().orElse(null);
|
||||||
// now let's do the real scan.
|
// now let's do the real scan.
|
||||||
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
|
long maxQuotaResultSize = Math.min(maxScannerResultSize, quota.getReadAvailable());
|
||||||
RegionScanner scanner = rsh.s;
|
RegionScanner scanner = rsh.s;
|
||||||
@ -3624,7 +3644,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
|
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
|
||||||
if (!results.isEmpty()) {
|
if (!results.isEmpty()) {
|
||||||
for (Result r : results) {
|
for (Result r : results) {
|
||||||
lastBlock.setValue(addSize(context, r, lastBlock.getValue()));
|
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (bypass != null && bypass.booleanValue()) {
|
if (bypass != null && bypass.booleanValue()) {
|
||||||
@ -3633,7 +3653,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
}
|
}
|
||||||
if (!done) {
|
if (!done) {
|
||||||
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
|
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
|
||||||
results, builder, lastBlock, context);
|
results, builder, lastBlock, rpcCall);
|
||||||
} else {
|
} else {
|
||||||
builder.setMoreResultsInRegion(!results.isEmpty());
|
builder.setMoreResultsInRegion(!results.isEmpty());
|
||||||
}
|
}
|
||||||
@ -3645,7 +3665,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
quota.addScanResult(results);
|
quota.addScanResult(results);
|
||||||
addResults(builder, results, (HBaseRpcController) controller,
|
addResults(builder, results, (HBaseRpcController) controller,
|
||||||
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
|
RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()),
|
||||||
isClientCellBlockSupport(context));
|
isClientCellBlockSupport(rpcCall));
|
||||||
if (scanner.isFilterDone() && results.isEmpty()) {
|
if (scanner.isFilterDone() && results.isEmpty()) {
|
||||||
// If the scanner's filter - if any - is done with the scan
|
// If the scanner's filter - if any - is done with the scan
|
||||||
// only set moreResults to false if the results is empty. This is used to keep compatible
|
// only set moreResults to false if the results is empty. This is used to keep compatible
|
||||||
@ -3677,7 +3697,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
}
|
}
|
||||||
if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
|
if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
|
||||||
scannerClosed = true;
|
scannerClosed = true;
|
||||||
closeScanner(region, scanner, scannerName, context);
|
closeScanner(region, scanner, scannerName, rpcCall);
|
||||||
}
|
}
|
||||||
return builder.build();
|
return builder.build();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
@ -3687,7 +3707,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
// The scanner state might be left in a dirty state, so we will tell the Client to
|
// The scanner state might be left in a dirty state, so we will tell the Client to
|
||||||
// fail this RPC and close the scanner while opening up another one from the start of
|
// fail this RPC and close the scanner while opening up another one from the start of
|
||||||
// row that the client has last seen.
|
// row that the client has last seen.
|
||||||
closeScanner(region, scanner, scannerName, context);
|
closeScanner(region, scanner, scannerName, rpcCall);
|
||||||
|
|
||||||
// If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
|
// If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
|
||||||
// used in two different semantics.
|
// used in two different semantics.
|
||||||
@ -3713,7 +3733,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
// We closed the scanner already. Instead of throwing the IOException, and client
|
// We closed the scanner already. Instead of throwing the IOException, and client
|
||||||
// retrying with the same scannerId only to get USE on the next RPC, we directly throw
|
// retrying with the same scannerId only to get USE on the next RPC, we directly throw
|
||||||
// a special exception to save an RPC.
|
// a special exception to save an RPC.
|
||||||
if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
|
if (VersionInfoUtil.hasMinimumVersion(rpcCall.getClientVersionInfo(), 1, 4)) {
|
||||||
// 1.4.0+ clients know how to handle
|
// 1.4.0+ clients know how to handle
|
||||||
throw new ScannerResetException("Scanner is closed on the server-side", e);
|
throw new ScannerResetException("Scanner is closed on the server-side", e);
|
||||||
} else {
|
} else {
|
||||||
@ -3728,8 +3748,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
|
|||||||
if (!scannerClosed) {
|
if (!scannerClosed) {
|
||||||
// Adding resets expiration time on lease.
|
// Adding resets expiration time on lease.
|
||||||
// the closeCallBack will be set in closeScanner so here we only care about shippedCallback
|
// the closeCallBack will be set in closeScanner so here we only care about shippedCallback
|
||||||
if (context != null) {
|
if (rpcCall != null) {
|
||||||
context.setCallBack(rsh.shippedCallback);
|
rpcCall.setCallBack(rsh.shippedCallback);
|
||||||
} else {
|
} else {
|
||||||
// If context is null,here we call rsh.shippedCallback directly to reuse the logic in
|
// If context is null,here we call rsh.shippedCallback directly to reuse the logic in
|
||||||
// rsh.shippedCallback to release the internal resources in rsh,and lease is also added
|
// rsh.shippedCallback to release the internal resources in rsh,and lease is also added
|
||||||
|
@ -17,9 +17,11 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.regionserver.RSRpcServices.DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -48,8 +50,11 @@ import org.apache.hadoop.hbase.client.Table;
|
|||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.filter.Filter;
|
import org.apache.hadoop.hbase.filter.Filter;
|
||||||
import org.apache.hadoop.hbase.filter.FilterBase;
|
import org.apache.hadoop.hbase.filter.FilterBase;
|
||||||
|
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcCall;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -59,6 +64,7 @@ import org.junit.BeforeClass;
|
|||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||||
@ -138,8 +144,42 @@ public class TestScannerHeartbeatMessages {
|
|||||||
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
|
TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
|
@Test
|
||||||
byte[][] qualifiers, byte[] cellValue) throws IOException {
|
public void testTimeLimitAccountsForQueueTime() throws IOException, InterruptedException {
|
||||||
|
HRegionServer rs = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
|
||||||
|
RSRpcServices services = new RSRpcServices(rs);
|
||||||
|
RpcCall mockRpcCall = Mockito.mock(RpcCall.class);
|
||||||
|
// first return 180 (minimal queuing), then 120 (more queueing), then 101 (heavy queueing)
|
||||||
|
// finally, 25 is fatal levels of queueing -- exceeding timeout
|
||||||
|
when(mockRpcCall.getReceiveTime()).thenReturn(180L, 120L, 101L, 25L);
|
||||||
|
|
||||||
|
// assume timeout of 100ms
|
||||||
|
HBaseRpcController mockController = Mockito.mock(HBaseRpcController.class);
|
||||||
|
when(mockController.getCallTimeout()).thenReturn(100);
|
||||||
|
|
||||||
|
// current time is 100, which we'll subtract from 90 and 50 to generate some time deltas
|
||||||
|
EnvironmentEdgeManager.injectEdge(() -> 200L);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// we queued for 20ms, leaving 80ms of timeout, which we divide by 2
|
||||||
|
assertEquals(200 + (100 - 20) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
|
||||||
|
// we queued for 80ms, leaving 20ms of timeout, which we divide by 2
|
||||||
|
assertEquals(200 + (100 - 80) / 2, services.getTimeLimit(mockRpcCall, mockController, true));
|
||||||
|
// we queued for 99ms of 100ms timeout, leaving only 1ms. we fall back to default minimum
|
||||||
|
assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
|
||||||
|
services.getTimeLimit(mockRpcCall, mockController, true));
|
||||||
|
// lastly, we queue for 175ms of 100ms timeout. this should be very rare since we drop
|
||||||
|
// timed out calls in the queue. in this case we still fallback on default minimum for now.
|
||||||
|
assertEquals(200 + DEFAULT_REGION_SERVER_RPC_MINIMUM_SCAN_TIME_LIMIT_DELTA,
|
||||||
|
services.getTimeLimit(mockRpcCall, mockController, true));
|
||||||
|
} finally {
|
||||||
|
EnvironmentEdgeManager.reset();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
static Table createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
|
||||||
|
byte[] cellValue) throws IOException {
|
||||||
Table ht = TEST_UTIL.createTable(name, families);
|
Table ht = TEST_UTIL.createTable(name, families);
|
||||||
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
|
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
|
||||||
ht.put(puts);
|
ht.put(puts);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user