HBASE-26869 RSRpcServices.scan should deep clone cells when RpcCallCo… (#4249)

This commit is contained in:
chenglei 2022-03-23 17:14:07 +08:00 committed by GitHub
parent b67c16a763
commit ab4351a155
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 252 additions and 7 deletions

View File

@ -24,6 +24,7 @@ import static org.apache.hadoop.hbase.KeyValue.getDelimiter;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -849,4 +850,18 @@ public final class CellUtil {
if (diff != 0) return diff; if (diff != 0) return diff;
return compareQualifiers(left, right, rqoffset, rqlength); return compareQualifiers(left, right, rqoffset, rqlength);
} }
public static void cloneIfNecessary(ArrayList<Cell> cells) {
if (cells == null || cells.isEmpty()) {
return;
}
for (int i = 0; i < cells.size(); i++) {
Cell cell = cells.get(i);
cells.set(i, cloneIfNecessary(cell));
}
}
public static Cell cloneIfNecessary(Cell cell) {
return (cell instanceof ByteBufferExtendedCell ? KeyValueUtil.copyToNewKeyValue(cell) : cell);
}
} }

View File

@ -78,7 +78,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellComparator;
@ -94,7 +93,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MetaCellComparator; import org.apache.hadoop.hbase.MetaCellComparator;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.NotServingRegionException;
@ -7871,8 +7869,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers. // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers.
// See more details in HBASE-26036. // See more details in HBASE-26036.
for (Cell cell : tmp) { for (Cell cell : tmp) {
results.add(cell instanceof ByteBufferExtendedCell ? results.add(
KeyValueUtil.copyToNewKeyValue(cell) : cell); CellUtil.cloneIfNecessary(cell));
} }
} }

View File

@ -3307,7 +3307,7 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
// This is cells inside a row. Default size is 10 so if many versions or many cfs, // This is cells inside a row. Default size is 10 so if many versions or many cfs,
// then we'll resize. Resizings show in profiler. Set it higher than 10. For now // then we'll resize. Resizings show in profiler. Set it higher than 10. For now
// arbitrary 32. TODO: keep record of general size of results being returned. // arbitrary 32. TODO: keep record of general size of results being returned.
List<Cell> values = new ArrayList<>(32); ArrayList<Cell> values = new ArrayList<>(32);
region.startRegionOperation(Operation.SCAN); region.startRegionOperation(Operation.SCAN);
long before = EnvironmentEdgeManager.currentTime(); long before = EnvironmentEdgeManager.currentTime();
// Used to check if we've matched the row limit set on the Scan // Used to check if we've matched the row limit set on the Scan
@ -3368,9 +3368,16 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
// reset the batch progress between nextRaw invocations since we don't want the // reset the batch progress between nextRaw invocations since we don't want the
// batch progress from previous calls to affect future calls // batch progress from previous calls to affect future calls
scannerContext.setBatchProgress(0); scannerContext.setBatchProgress(0);
assert values.isEmpty();
// Collect values to be returned here // Collect values to be returned here
moreRows = scanner.nextRaw(values, scannerContext); moreRows = scanner.nextRaw(values, scannerContext);
if (context == null) {
// When there is no RpcCallContext,copy EC to heap, then the scanner would close,
// This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap
// buffers.See more details in HBASE-26036.
CellUtil.cloneIfNecessary(values);
}
numOfNextRawCalls++; numOfNextRawCalls++;
if (!values.isEmpty()) { if (!values.isEmpty()) {
@ -3727,14 +3734,25 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
if (context != null) { if (context != null) {
context.setCallBack(rsh.shippedCallback); context.setCallBack(rsh.shippedCallback);
} else { } else {
// When context != null, adding back the lease will be done in callback set above. // If context is null,here we call rsh.shippedCallback directly to reuse the logic in
addScannerLeaseBack(lease); // rsh.shippedCallback to release the internal resources in rsh,and lease is also added
// back to regionserver's LeaseManager in rsh.shippedCallback.
runShippedCallback(rsh);
} }
} }
quota.close(); quota.close();
} }
} }
private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException {
assert rsh.shippedCallback != null;
try {
rsh.shippedCallback.run();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
RpcCallContext context) throws IOException { RpcCallContext context) throws IOException {
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {

View File

@ -0,0 +1,214 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator;
import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@Category({ RegionServerTests.class, LargeTests.class })
public class TestRegionServerScan {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestRegionServerScan.class);
@Rule
public TestName name = new TestName();
private static final byte[] CF = Bytes.toBytes("CF");
private static final byte[] CQ = Bytes.toBytes("CQ");
private static final byte[] VALUE = new byte[1200];
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final Configuration conf = TEST_UTIL.getConfiguration();
private static Admin admin = null;
static final TableName tableName = TableName.valueOf("TestRegionServerScan");
static final byte[] r0 = Bytes.toBytes("row-0");
static final byte[] r1 = Bytes.toBytes("row-1");
static final byte[] r2 = Bytes.toBytes("row-2");
@BeforeClass
public static void setupBeforeClass() throws Exception {
/**
* Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the bytebuffers right after
* released.
*/
conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS,
DeallocateRewriteByteBuffAllocator.class.getName());
conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20);
conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048);
conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);
conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());
conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000);
conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000);
conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000);
conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 1024);
TEST_UTIL.startMiniCluster(1);
admin = TEST_UTIL.getAdmin();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testScannWhenRpcCallContextNull() throws Exception {
ResultScanner resultScanner = null;
Table table = null;
try {
table =
TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null);
putToTable(table, r0);
putToTable(table, r1);
putToTable(table, r2);
admin.flush(table.getName());
Scan scan = new Scan();
scan.setCaching(2);
scan.withStartRow(r0, true).withStopRow(r2, true);
MyRSRpcServices.inTest = true;
resultScanner = table.getScanner(scan);
Result result = resultScanner.next();
byte[] rowKey = result.getRow();
assertTrue(Bytes.equals(r0, rowKey));
result = resultScanner.next();
rowKey = result.getRow();
assertTrue(Bytes.equals(r1, rowKey));
result = resultScanner.next();
rowKey = result.getRow();
assertTrue(Bytes.equals(r2, rowKey));
assertNull(resultScanner.next());
assertTrue(MyRSRpcServices.exceptionRef.get() == null);
} finally {
MyRSRpcServices.inTest = false;
if (resultScanner != null) {
resultScanner.close();
}
if (table != null) {
table.close();
}
}
}
private static void putToTable(Table table, byte[] rowkey) throws IOException {
Put put = new Put(rowkey);
put.addColumn(CF, CQ, VALUE);
table.put(put);
}
private static class MyRegionServer extends MiniHBaseClusterRegionServer {
public MyRegionServer(Configuration conf) throws IOException, InterruptedException {
super(conf);
}
@Override
protected RSRpcServices createRpcServices() throws IOException {
return new MyRSRpcServices(this);
}
}
private static class MyRSRpcServices extends RSRpcServices {
private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null);
private static volatile boolean inTest = false;
public MyRSRpcServices(HRegionServer rs) throws IOException {
super(rs);
}
@Override
public ScanResponse scan(RpcController controller, ScanRequest request)
throws ServiceException {
try {
if (!inTest) {
return super.scan(controller, request);
}
HRegion region = null;
if (request.hasRegion()) {
region = this.getRegion(request.getRegion());
}
if (region != null
&& !tableName.equals(region.getTableDescriptor().getTableName())) {
return super.scan(controller, request);
}
ScanResponse result = null;
//Simulate RpcCallContext is null for test.
Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
try {
result = super.scan(controller, request);
} finally {
rpcCall.ifPresent(RpcServer::setCurrentCall);
}
return result;
} catch (Throwable e) {
exceptionRef.set(e);
throw new ServiceException(e);
}
}
}
}