diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 00daacfa00e..35b9d7851f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -238,14 +238,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * Holder class which holds the RegionScanner and nextCallSeq together. */ private static class RegionScannerHolder { + private AtomicLong nextCallSeq = new AtomicLong(0); private RegionScanner s; - private long nextCallSeq = 0L; private Region r; public RegionScannerHolder(RegionScanner s, Region r) { this.s = s; this.r = r; } + + private long getNextCallSeq() { + return nextCallSeq.get(); + } + + private void incNextCallSeq() { + nextCallSeq.incrementAndGet(); + } + + private void rollbackNextCallSeq() { + nextCallSeq.decrementAndGet(); + } } /** @@ -902,7 +914,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, String scannerIdString = Long.toString(scannerId); RegionScannerHolder scannerHolder = scanners.get(scannerIdString); if (scannerHolder != null) { - return scannerHolder.nextCallSeq; + return scannerHolder.getNextCallSeq(); } return 0L; } @@ -2225,13 +2237,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, rsh = scanners.get(scannerName); } if (rsh != null) { - if (request.getNextCallSeq() != rsh.nextCallSeq) { - throw new OutOfOrderScannerNextException("Expected nextCallSeq: " + rsh.nextCallSeq + if (request.getNextCallSeq() != rsh.getNextCallSeq()) { + throw new OutOfOrderScannerNextException( + "Expected nextCallSeq: " + rsh.getNextCallSeq() + " But the nextCallSeq got from client: " + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); } // Increment the nextCallSeq value which is the next expected from client. - rsh.nextCallSeq++; + rsh.incNextCallSeq(); } } try { @@ -2411,6 +2424,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } else { addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); } + } catch (IOException e) { + // if we have an exception on scanner next and we are using the callSeq + // we should rollback because the client will retry with the same callSeq + // and get an OutOfOrderScannerNextException if we don't do so. + if (rsh != null && request.hasNextCallSeq()) { + rsh.rollbackNextCallSeq(); + } + throw e; } finally { // We're done. On way out re-add the above removed lease. // Adding resets expiration time on lease. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerRetriableFailure.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerRetriableFailure.java new file mode 100644 index 00000000000..afeaa1dc705 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerRetriableFailure.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.FSVisitor; +import org.apache.hadoop.hbase.util.TestTableName; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({RegionServerTests.class, LargeTests.class}) +public class TestScannerRetriableFailure { + private static final Log LOG = LogFactory.getLog(TestScannerRetriableFailure.class); + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final String FAMILY_NAME_STR = "f"; + private static final byte[] FAMILY_NAME = Bytes.toBytes(FAMILY_NAME_STR); + + @Rule public TestTableName TEST_TABLE = new TestTableName(); + + public static class FaultyScannerObserver extends BaseRegionObserver { + private int faults = 0; + + @Override + public boolean preScannerNext(final ObserverContext e, + final InternalScanner s, final List results, + final int limit, final boolean hasMore) throws IOException { + final TableName tableName = e.getEnvironment().getRegionInfo().getTable(); + if (!tableName.isSystemTable() && (faults++ % 2) == 0) { + LOG.debug(" Injecting fault in table=" + tableName + " scanner"); + throw new IOException("injected fault"); + } + return hasMore; + } + } + + private static void setupConf(Configuration conf) { + conf.setLong("hbase.hstore.compaction.min", 20); + conf.setLong("hbase.hstore.compaction.max", 39); + conf.setLong("hbase.hstore.blockingStoreFiles", 40); + + conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, FaultyScannerObserver.class.getName()); + } + + @BeforeClass + public static void setup() throws Exception { + setupConf(UTIL.getConfiguration()); + UTIL.startMiniCluster(1); + } + + @AfterClass + public static void tearDown() throws Exception { + try { + UTIL.shutdownMiniCluster(); + } catch (Exception e) { + LOG.warn("failure shutting down cluster", e); + } + } + + @Test(timeout=180000) + public void testFaultyScanner() throws Exception { + TableName tableName = TEST_TABLE.getTableName(); + Table table = UTIL.createTable(tableName, FAMILY_NAME); + try { + final int NUM_ROWS = 100; + loadTable(table, NUM_ROWS); + checkTableRows(table, NUM_ROWS); + } finally { + table.close(); + } + } + + // ========================================================================== + // Helpers + // ========================================================================== + private FileSystem getFileSystem() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getFileSystem(); + } + + private Path getRootDir() { + return UTIL.getHBaseCluster().getMaster().getMasterFileSystem().getRootDir(); + } + + public void loadTable(final Table table, int numRows) throws IOException { + List puts = new ArrayList(numRows); + for (int i = 0; i < numRows; ++i) { + byte[] row = Bytes.toBytes(String.format("%09d", i)); + Put put = new Put(row); + put.setDurability(Durability.SKIP_WAL); + put.add(FAMILY_NAME, null, row); + table.put(put); + } + } + + private void checkTableRows(final Table table, int numRows) throws Exception { + Scan scan = new Scan(); + scan.setCaching(1); + scan.setCacheBlocks(false); + ResultScanner scanner = table.getScanner(scan); + try { + int count = 0; + for (int i = 0; i < numRows; ++i) { + byte[] row = Bytes.toBytes(String.format("%09d", i)); + Result result = scanner.next(); + assertTrue(result != null); + assertTrue(Bytes.equals(row, result.getRow())); + count++; + } + + while (true) { + Result result = scanner.next(); + if (result == null) break; + count++; + } + assertEquals(numRows, count); + } finally { + scanner.close(); + } + } +}