From 60b8344cf1ae5d481b38b85696cb6ed3c7f1260d Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 26 Apr 2018 17:54:13 +0800 Subject: [PATCH] HBASE-20457 Return immediately for a scan rpc call when we want to switch from pread to stream --- .../RpcRetryingCallerWithReadReplicas.java | 1 + .../hbase/regionserver/ScannerContext.java | 27 ++-- .../hbase/regionserver/StoreScanner.java | 17 ++- .../regionserver/TestSwitchToStreamRead.java | 141 +++++++++++++++++- 4 files changed, 164 insertions(+), 22 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java index 4a31cff4a71..a0be0bfc1bc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java @@ -279,6 +279,7 @@ public class RpcRetryingCallerWithReadReplicas { throws RetriesExhaustedException, DoNotRetryIOException { Throwable t = e.getCause(); assert t != null; // That's what ExecutionException is about: holding an exception + t.printStackTrace(); if (t instanceof RetriesExhaustedException) { throw (RetriesExhaustedException) t; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 9b19a4092d0..4c5923beecb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -99,6 +99,12 @@ public class ScannerContext { private Cell lastPeekedCell = null; + // Set this to true will have the same behavior with reaching the time limit. + // This is used when you want to make the current RSRpcService.scan returns immediately. For + // example, when we want to switch from pread to stream, we can only do it after the rpc call is + // returned. + private boolean returnImmediately; + /** * Tracks the relevant server side metrics during scans. null when metrics should not be tracked */ @@ -278,7 +284,8 @@ public class ScannerContext { * @return true if the time limit can be enforced in the checker's scope */ boolean hasTimeLimit(LimitScope checkerScope) { - return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0; + return limits.canEnforceTimeLimitFromScope(checkerScope) && + (limits.getTime() > 0 || returnImmediately); } /** @@ -338,7 +345,8 @@ public class ScannerContext { * @return true when the limit is enforceable from the checker's scope and it has been reached */ boolean checkTimeLimit(LimitScope checkerScope) { - return hasTimeLimit(checkerScope) && (System.currentTimeMillis() >= limits.getTime()); + return hasTimeLimit(checkerScope) && + (returnImmediately || System.currentTimeMillis() >= limits.getTime()); } /** @@ -358,6 +366,10 @@ public class ScannerContext { this.lastPeekedCell = lastPeekedCell; } + void returnImmediately() { + this.returnImmediately = true; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -570,11 +582,6 @@ public class ScannerContext { LimitFields() { } - LimitFields(int batch, LimitScope sizeScope, long size, long heapSize, LimitScope timeScope, - long time) { - setFields(batch, sizeScope, size, heapSize, timeScope, time); - } - void copy(LimitFields limitsToCopy) { if (limitsToCopy != null) { setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getDataSize(), @@ -722,12 +729,6 @@ public class ScannerContext { // such AND data cells of Cells which are in on heap area. long heapSize = DEFAULT_SIZE; - /** - * Fields keep their default values. - */ - ProgressFields() { - } - ProgressFields(int batch, long size, long heapSize) { setFields(batch, size, heapSize); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index fa9a3de4329..f4cc24d7bd3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -553,7 +553,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner LOOP: do { // Update and check the time limit based on the configured value of cellsPerTimeoutCheck - if ((kvsScanned % cellsPerHeartbeatCheck == 0)) { + // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream in + // the shipped method below. + if (kvsScanned % cellsPerHeartbeatCheck == 0 || (scanUsePread && + scan.getReadType() == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) { if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) { return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); } @@ -565,6 +568,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner checkScanOrder(prevCell, cell, comparator); int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell); bytesRead += cellSize; + if (scanUsePread && scan.getReadType() == Scan.ReadType.DEFAULT && + bytesRead > preadMaxBytes) { + // return immediately if we want to switch from pread to stream. We need this because we can + // only switch in the shipped method, if user use a filter to filter out everything and rpc + // timeout is very large then the shipped method will never be called until the whole scan + // is finished, but at that time we have already scan all the data... + // See HBASE-20457 for more details. + // And there is still a scenario that can not be handled. If we have a very large row, which + // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag + // here, we still need to scan all the qualifiers before returning... + scannerContext.returnImmediately(); + } prevCell = cell; scannerContext.setLastPeekedCell(cell); topChanged = false; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java index 0af297066f2..815643d441e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java @@ -34,13 +34,17 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.ClassRule; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -49,7 +53,7 @@ public class TestSwitchToStreamRead { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestSwitchToStreamRead.class); + HBaseClassTestRule.forClass(TestSwitchToStreamRead.class); private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); @@ -73,18 +77,18 @@ public class TestSwitchToStreamRead { VALUE_PREFIX = sb.append("-").toString(); REGION = UTIL.createLocalHRegion( TableDescriptorBuilder.newBuilder(TABLE_NAME) - .setColumnFamily( - ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build()) - .build(), + .setColumnFamily( + ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build()) + .build(), null, null); for (int i = 0; i < 900; i++) { REGION - .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); + .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); } REGION.flush(true); for (int i = 900; i < 1000; i++) { REGION - .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); + .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); } } @@ -97,8 +101,8 @@ public class TestSwitchToStreamRead { @Test public void test() throws IOException { try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) { - StoreScanner storeScanner = (StoreScanner) (scanner) - .getStoreHeapForTesting().getCurrentForTesting(); + StoreScanner storeScanner = + (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { if (kvs instanceof StoreFileScanner) { StoreFileScanner sfScanner = (StoreFileScanner) kvs; @@ -134,4 +138,125 @@ public class TestSwitchToStreamRead { assertFalse(sf.isReferencedInReads()); } } + + public static final class MatchLastRowKeyFilter extends FilterBase { + + @Override + public boolean filterRowKey(Cell cell) throws IOException { + return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999; + } + } + + private void testFilter(Filter filter) throws IOException { + try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) { + StoreScanner storeScanner = + (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting(); + for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { + if (kvs instanceof StoreFileScanner) { + StoreFileScanner sfScanner = (StoreFileScanner) kvs; + // starting from pread so we use shared reader here. + assertTrue(sfScanner.getReader().shared); + } + } + List cells = new ArrayList<>(); + // should return before finishing the scan as we want to switch from pread to stream + assertTrue(scanner.next(cells, + ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); + assertTrue(cells.isEmpty()); + scanner.shipped(); + + for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { + if (kvs instanceof StoreFileScanner) { + StoreFileScanner sfScanner = (StoreFileScanner) kvs; + // we should have convert to use stream read now. + assertFalse(sfScanner.getReader().shared); + } + } + assertFalse(scanner.next(cells, + ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build())); + Result result = Result.create(cells); + assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL))); + cells.clear(); + scanner.shipped(); + } + // make sure all scanners are closed. + for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { + assertFalse(sf.isReferencedInReads()); + } + } + + // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next + // until the row key is changed. And there we can only use NoLimitScannerContext so we can not + // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to + // an infinite loop. Need to dig more, the code are way too complicated... + @Ignore + @Test + public void testFilterRowKey() throws IOException { + testFilter(new MatchLastRowKeyFilter()); + } + + public static final class MatchLastRowCellNextColFilter extends FilterBase { + + @Override + public ReturnCode filterCell(Cell c) throws IOException { + if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { + return ReturnCode.INCLUDE; + } else { + return ReturnCode.NEXT_COL; + } + } + } + + @Test + public void testFilterCellNextCol() throws IOException { + testFilter(new MatchLastRowCellNextColFilter()); + } + + public static final class MatchLastRowCellNextRowFilter extends FilterBase { + + @Override + public ReturnCode filterCell(Cell c) throws IOException { + if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) { + return ReturnCode.INCLUDE; + } else { + return ReturnCode.NEXT_ROW; + } + } + } + + @Test + public void testFilterCellNextRow() throws IOException { + testFilter(new MatchLastRowCellNextRowFilter()); + } + + public static final class MatchLastRowFilterRowFilter extends FilterBase { + + private boolean exclude; + + @Override + public void filterRowCells(List kvs) throws IOException { + Cell c = kvs.get(0); + exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999; + } + + @Override + public void reset() throws IOException { + exclude = false; + } + + @Override + public boolean filterRow() throws IOException { + return exclude; + } + + @Override + public boolean hasFilterRow() { + return true; + } + } + + @Test + public void testFilterRow() throws IOException { + testFilter(new MatchLastRowFilterRowFilter()); + } }