From 1a37f3be82f3d4e111ff846a79583472da86da4d Mon Sep 17 00:00:00 2001 From: zhangduo Date: Thu, 25 May 2017 11:02:09 +0800 Subject: [PATCH] HBASE-18042 Client Compatibility breaks between versions 1.2 and 1.3 --- .../hbase/regionserver/RSRpcServices.java | 48 ++++--- .../hbase/client/TestAlwaysSetScannerId.java | 5 +- .../hadoop/hbase/client/TestLeaseRenewal.java | 3 +- .../client/TestScanWithoutFetchingData.java | 133 ++++++++++++++++++ 4 files changed, 167 insertions(+), 22 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 3f23d2be958..69b8ac12ac0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; import com.google.protobuf.ByteString; import com.google.protobuf.Message; import com.google.protobuf.RpcController; @@ -44,6 +46,7 @@ import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang.mutable.MutableObject; @@ -197,8 +200,6 @@ import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.zookeeper.KeeperException; -import com.google.common.annotations.VisibleForTesting; - /** * Implements the regionserver RPC services. */ @@ -252,7 +253,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler, private final AtomicLong scannerIdGen = new AtomicLong(0L); private final ConcurrentMap scanners = new ConcurrentHashMap<>(); - + // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients + // which may send next or close request to a region scanner which has already been exhausted. The + // entries will be removed automatically after scannerLeaseTimeoutPeriod. + private final Cache closedScanners; /** * The lease timeout period for client scanners (milliseconds). */ @@ -1072,6 +1076,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); rpcServer.setErrorHandler(this); rs.setName(name); + + closedScanners = CacheBuilder.newBuilder() + .expireAfterAccess(scannerLeaseTimeoutPeriod, TimeUnit.MILLISECONDS).build(); } @Override @@ -2492,18 +2499,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler, String scannerName = Long.toString(request.getScannerId()); RegionScannerHolder rsh = scanners.get(scannerName); if (rsh == null) { - // just ignore the close request if scanner does not exists. - if (request.hasCloseScanner() && request.getCloseScanner()) { + // just ignore the next or close request if scanner does not exists. + if (closedScanners.getIfPresent(scannerName) != null) { throw SCANNER_ALREADY_CLOSED; } else { LOG.warn("Client tried to access missing scanner " + scannerName); throw new UnknownScannerException( - "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " - + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " - + "long wait between consecutive client checkins, c) Server may be closing down, " - + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " - + "possible fix would be increasing the value of" - + "'hbase.client.scanner.timeout.period' configuration."); + "Unknown scanner '" + scannerName + "'. This can happen due to any of the following " + + "reasons: a) Scanner id given is wrong, b) Scanner lease expired because of " + + "long wait between consecutive client checkins, c) Server may be closing down, " + + "d) RegionServer restart during upgrade.\nIf the issue is due to reason (b), a " + + "possible fix would be increasing the value of" + + "'hbase.client.scanner.timeout.period' configuration."); } } HRegionInfo hri = rsh.s.getRegionInfo(); @@ -2761,13 +2768,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } values.clear(); } - if (limitReached || moreRows) { - // We stopped prematurely - builder.setMoreResultsInRegion(true); - } else { - // We didn't get a single batch - builder.setMoreResultsInRegion(false); - } + builder.setMoreResultsInRegion(moreRows); // Check to see if the client requested that we track metrics server side. If the // client requested metrics, retrieve the metrics from the scanner context. if (trackMetrics) { @@ -2938,7 +2939,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!done) { scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows, results, builder, lastBlock, context); + } else { + builder.setMoreResultsInRegion(!results.isEmpty()); } + } else { + // This is a open scanner call with numberOfRow = 0, so set more results in region to true. + builder.setMoreResultsInRegion(true); } quota.addScanResult(results); @@ -2951,6 +2957,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, // is false. Can remove the isEmpty check after we get rid of the old implementation. builder.setMoreResults(false); } + // Later we may close the scanner depending on this flag so here we need to make sure that we + // have already set this flag. + assert builder.hasMoreResultsInRegion(); // we only set moreResults to false in the above code, so set it to true if we haven't set it // yet. if (!builder.hasMoreResults()) { @@ -2975,7 +2984,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, closeScanner(region, scanner, scannerName, context); } return builder.build(); - } catch (Exception e) { + } catch (IOException e) { try { // scanner is closed here scannerClosed = true; @@ -3042,6 +3051,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(scanner); } + closedScanners.put(scannerName, scannerName); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java index 8e1e37fd1d6..8396c28c03d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAlwaysSetScannerId.java @@ -17,7 +17,8 @@ */ package org.apache.hadoop.hbase.client; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import java.io.IOException; @@ -86,7 +87,7 @@ public class TestAlwaysSetScannerId { long scannerId = resp.getScannerId(); int nextCallSeq = 0; // test next - for (int i = 0; i < 5; i++) { + for (int i = 0; i < COUNT / 2; i++) { req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1); resp = STUB.scan(null, req); assertTrue(resp.hasScannerId()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java index 288872e7977..b0830ab7a04 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestLeaseRenewal.java @@ -119,7 +119,8 @@ public class TestLeaseRenewal { assertTrue(((AbstractClientScanner)rs).renewLease()); // make sure we haven't advanced the scanner assertTrue(Arrays.equals(rs.next().getRow(), ROW_BYTES)); - assertTrue(((AbstractClientScanner)rs).renewLease()); + // renewLease should return false now as we have read all the data already + assertFalse(((AbstractClientScanner) rs).renewLease()); // make sure scanner is exhausted now assertNull(rs.next()); // renewLease should return false now diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java new file mode 100644 index 00000000000..53adcc07cfe --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScanWithoutFetchingData.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.ServiceException; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.HBaseRpcControllerImpl; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.ResponseConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Testcase to make sure that we do not close scanners if ScanRequest.numberOfRows is zero. See + * HBASE-18042 for more details. + */ +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestScanWithoutFetchingData { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName TABLE_NAME = TableName.valueOf("test"); + + private static final byte[] CF = Bytes.toBytes("cf"); + + private static final byte[] CQ = Bytes.toBytes("cq"); + + private static final int COUNT = 10; + + private static HRegionInfo HRI; + + private static ClientProtos.ClientService.BlockingInterface STUB; + + @BeforeClass + public static void setUp() throws Exception { + UTIL.startMiniCluster(1); + try (Table table = UTIL.createTable(TABLE_NAME, CF)) { + for (int i = 0; i < COUNT; i++) { + table.put(new Put(Bytes.toBytes(i)).addColumn(CF, CQ, Bytes.toBytes(i))); + } + } + HRI = UTIL.getHBaseAdmin().getTableRegions(TABLE_NAME).get(0); + STUB = ((HConnectionImplementation) UTIL.getConnection()) + .getClient(UTIL.getHBaseCluster().getRegionServer(0).getServerName()); + } + + @AfterClass + public static void tearDown() throws Exception { + UTIL.shutdownMiniCluster(); + } + + private void assertResult(int row, Result result) { + assertEquals(row, Bytes.toInt(result.getRow())); + assertEquals(row, Bytes.toInt(result.getValue(CF, CQ))); + } + + @Test + public void test() throws ServiceException, IOException { + Scan scan = new Scan(); + ScanRequest req = RequestConverter.buildScanRequest(HRI.getRegionName(), scan, 0, false); + HBaseRpcController hrc = new HBaseRpcControllerImpl(); + ScanResponse resp = STUB.scan(hrc, req); + assertTrue(resp.getMoreResults()); + assertTrue(resp.getMoreResultsInRegion()); + assertEquals(0, ResponseConverter.getResults(hrc.cellScanner(), resp).length); + long scannerId = resp.getScannerId(); + int nextCallSeq = 0; + // test normal next + for (int i = 0; i < COUNT / 2; i++) { + req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1); + hrc.reset(); + resp = STUB.scan(hrc, req); + assertTrue(resp.getMoreResults()); + assertTrue(resp.getMoreResultsInRegion()); + Result[] results = ResponseConverter.getResults(hrc.cellScanner(), resp); + assertEquals(1, results.length); + assertResult(i, results[0]); + } + // test zero next + req = RequestConverter.buildScanRequest(scannerId, 0, false, nextCallSeq++, false, false, -1); + hrc.reset(); + resp = STUB.scan(hrc, req); + assertTrue(resp.getMoreResults()); + assertTrue(resp.getMoreResultsInRegion()); + assertEquals(0, ResponseConverter.getResults(hrc.cellScanner(), resp).length); + for (int i = COUNT / 2; i < COUNT; i++) { + req = RequestConverter.buildScanRequest(scannerId, 1, false, nextCallSeq++, false, false, -1); + hrc.reset(); + resp = STUB.scan(hrc, req); + assertTrue(resp.getMoreResults()); + assertEquals(i != COUNT - 1, resp.getMoreResultsInRegion()); + Result[] results = ResponseConverter.getResults(hrc.cellScanner(), resp); + assertEquals(1, results.length); + assertResult(i, results[0]); + } + // close + req = RequestConverter.buildScanRequest(scannerId, 0, true, false); + resp = STUB.scan(null, req); + } +}