From 8a797e81b83ded184f9ecaeecf26954a27348974 Mon Sep 17 00:00:00 2001 From: Enis Soztutar Date: Thu, 22 Sep 2016 17:41:01 -0700 Subject: [PATCH] HBASE-16604 Scanner retries on IOException can cause the scans to miss data --- .../hadoop/hbase/UnknownScannerException.java | 4 + .../hadoop/hbase/client/ClientScanner.java | 4 +- .../hadoop/hbase/client/ScannerCallable.java | 26 +++-- .../exceptions/ScannerResetException.java | 50 ++++++++ .../hbase/ipc/MetricsHBaseServerSource.java | 2 + .../ipc/MetricsHBaseServerSourceImpl.java | 8 ++ .../hadoop/hbase/ipc/MetricsHBaseServer.java | 3 + .../hbase/regionserver/RSRpcServices.java | 63 ++++++---- .../hadoop/hbase/HBaseTestingUtility.java | 18 ++- .../hbase/client/TestFromClientSide.java | 77 ++++++++++++- .../client/TestTableSnapshotScanner.java | 2 +- .../TableSnapshotInputFormatTestBase.java | 2 +- .../TestMultithreadedTableMapper.java | 3 +- .../hbase/mapreduce/TestTableMapReduce.java | 4 +- .../mapreduce/TestTableMapReduceBase.java | 2 +- .../DelegatingKeyValueScanner.java | 109 ++++++++++++++++++ 16 files changed, 337 insertions(+), 40 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java index b951221bc21..3e7b22d8025 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java @@ -45,4 +45,8 @@ public class UnknownScannerException extends DoNotRetryIOException { public UnknownScannerException(String s) { super(s); } + + public UnknownScannerException(String s, Exception e) { + super(s, e); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 1a9df97accd..944f44eaa02 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; @@ -433,7 +434,8 @@ public class ClientScanner extends AbstractClientScanner { if ((cause != null && cause instanceof NotServingRegionException) || (cause != null && cause instanceof RegionServerStoppedException) || e instanceof OutOfOrderScannerNextException || - e instanceof UnknownScannerException ) { + e instanceof UnknownScannerException || + e instanceof ScannerResetException) { // Pass. It is easier writing the if loop test as list of what is allowed rather than // as a list of what is not allowed... so if in here, it means we do not throw. } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index 8912e58536c..7d3b9e95258 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; @@ -107,9 +108,9 @@ public class ScannerCallable extends RegionServerCallable { * @param connection which connection * @param tableName table callable is on * @param scan the scan to execute - * @param scanMetrics the ScanMetrics to used, if it is null, + * @param scanMetrics the ScanMetrics to used, if it is null, * ScannerCallable won't collect metrics - * @param rpcControllerFactory factory to use when creating + * @param rpcControllerFactory factory to use when creating * {@link com.google.protobuf.RpcController} */ public ScannerCallable (ClusterConnection connection, TableName tableName, Scan scan, @@ -275,14 +276,19 @@ public class ScannerCallable extends RegionServerCallable { if (e instanceof RemoteException) { ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e); } - if (logScannerActivity && (ioe instanceof UnknownScannerException)) { - try { - HRegionLocation location = - getConnection().relocateRegion(getTableName(), scan.getStartRow()); - LOG.info("Scanner=" + scannerId - + " expired, current region location is " + location.toString()); - } catch (Throwable t) { - LOG.info("Failed to relocate region", t); + if (logScannerActivity) { + if (ioe instanceof UnknownScannerException) { + try { + HRegionLocation location = + getConnection().relocateRegion(getTableName(), scan.getStartRow()); + LOG.info("Scanner=" + scannerId + + " expired, current region location is " + location.toString()); + } catch (Throwable t) { + LOG.info("Failed to relocate region", t); + } + } else if (ioe instanceof ScannerResetException) { + LOG.info("Scanner=" + scannerId + " has received an exception, and the server " + + "asked us to reset the scanner state.", ioe); } } // The below convertion of exceptions into DoNotRetryExceptions is a little strange. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java new file mode 100644 index 00000000000..7689eb104f5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.exceptions; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Thrown when the server side has received an Exception, and asks the Client to reset the scanner + * state by closing the current region scanner, and reopening from the start of last seen row. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class ScannerResetException extends DoNotRetryIOException { + private static final long serialVersionUID = -5649728171144849619L; + + /** constructor */ + public ScannerResetException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public ScannerResetException(String s) { + super(s); + } + + public ScannerResetException(String s, Exception e) { + super(s, e); + } +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index 3305f65c8fe..ac14bd8b762 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -77,6 +77,7 @@ public interface MetricsHBaseServerSource extends BaseSource { String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException"; String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException"; String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException"; + String EXCEPTIONS_SCANNER_RESET_NAME="exceptions.ScannerResetException"; String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException"; String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException"; String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException"; @@ -106,6 +107,7 @@ public interface MetricsHBaseServerSource extends BaseSource { void movedRegionException(); void notServingRegionException(); void unknownScannerException(); + void scannerResetException(); void tooBusyException(); void multiActionTooLargeException(); void callQueueTooBigException(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index fafa9d00735..d372b1b6ad6 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -45,6 +45,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl private final MutableFastCounter exceptionsOOO; private final MutableFastCounter exceptionsBusy; private final MutableFastCounter exceptionsUnknown; + private final MutableFastCounter exceptionsScannerReset; private final MutableFastCounter exceptionsSanity; private final MutableFastCounter exceptionsNSRE; private final MutableFastCounter exceptionsMoved; @@ -78,6 +79,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl .newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L); this.exceptionsUnknown = this.getMetricsRegistry() .newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L); + this.exceptionsScannerReset = this.getMetricsRegistry() + .newCounter(EXCEPTIONS_SCANNER_RESET_NAME, EXCEPTIONS_TYPE_DESC, 0L); this.exceptionsSanity = this.getMetricsRegistry() .newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L); this.exceptionsMoved = this.getMetricsRegistry() @@ -161,6 +164,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl exceptionsUnknown.incr(); } + @Override + public void scannerResetException() { + exceptionsScannerReset.incr(); + } + @Override public void tooBusyException() { exceptionsBusy.incr(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index 838bdf6cd78..fe03d4f4211 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; @InterfaceAudience.Private public class MetricsHBaseServer { @@ -103,6 +104,8 @@ public class MetricsHBaseServer { source.tooBusyException(); } else if (throwable instanceof UnknownScannerException) { source.unknownScannerException(); + } else if (throwable instanceof ScannerResetException) { + source.scannerResetException(); } else if (throwable instanceof RegionMovedException) { source.movedRegionException(); } else if (throwable instanceof NotServingRegionException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index f19cded28f7..835f1b2c096 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.OperationConflictException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; @@ -2733,13 +2734,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())); } } catch (IOException e) { - // if we have an exception on scanner next and we are using the callSeq - // we should rollback because the client will retry with the same callSeq - // and get an OutOfOrderScannerNextException if we don't do so. - if (rsh != null && request.hasNextCallSeq()) { - rsh.rollbackNextCallSeq(); + // The scanner state might be left in a dirty state, so we will tell the Client to + // fail this RPC and close the scanner while opening up another one from the start of + // row that the client has last seen. + closeScanner(region, scanner, scannerName); + + // We closed the scanner already. Instead of throwing the IOException, and client + // retrying with the same scannerId only to get USE on the next RPC, we directly throw + // a special exception to save an RPC. + if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { + // 1.4.0+ clients know how to handle + throw new ScannerResetException("Scanner is closed on the server-side", e); + } else { + // older clients do not know about SRE. Just throw USE, which they will handle + throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" + + " scanner state for clients older than 1.3.", e); } - throw e; } finally { // We're done. On way out re-add the above removed lease. // Adding resets expiration time on lease. @@ -2753,20 +2763,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!moreResults || closeScanner) { ttl = 0; moreResults = false; - if (region != null && region.getCoprocessorHost() != null) { - if (region.getCoprocessorHost().preScannerClose(scanner)) { - return builder.build(); // bypass - } - } - rsh = scanners.remove(scannerName); - if (rsh != null) { - scanner = rsh.s; - scanner.close(); - regionServer.leases.cancelLease(scannerName); - if (region != null && region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerClose(scanner); - } - } + closeScanner(region, scanner, scannerName); } if (ttl > 0) { @@ -2797,6 +2794,32 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private boolean closeScanner(Region region, RegionScanner scanner, String scannerName) + throws IOException { + if (region != null && region.getCoprocessorHost() != null) { + if (region.getCoprocessorHost().preScannerClose(scanner)) { + return true; // bypass + } + } + RegionScannerHolder rsh = scanners.remove(scannerName); + if (rsh != null) { + scanner = rsh.s; + scanner.close(); + try { + regionServer.leases.cancelLease(scannerName); + } catch (LeaseException le) { + // No problem, ignore + if (LOG.isTraceEnabled()) { + LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); + } + } + if (region != null && region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postScannerClose(scanner); + } + } + return false; + } + @Override public CoprocessorServiceResponse execRegionServerService(RpcController controller, CoprocessorServiceRequest request) throws ServiceException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 15d4777bc43..99e87417c68 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -2007,6 +2007,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { return HRegion.createHRegion(info, rootDir, conf, htd); } + public HTableDescriptor createTableDescriptor(final TableName tableName, + byte[] family) { + return createTableDescriptor(tableName, new byte[][] {family}, 1); + } + + public HTableDescriptor createTableDescriptor(final TableName tableName, + byte[][] families, int maxVersions) { + HTableDescriptor desc = new HTableDescriptor(tableName); + for (byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family) + .setMaxVersions(maxVersions); + desc.addFamily(hcd); + } + return desc; + } + /** * Create an HRegion that writes to the local tmp dirs * @param desc @@ -2221,7 +2237,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { Put put = new Put(row); put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); for (int i = 0; i < f.length; i++) { - put.add(f[i], null, value != null ? value : row); + put.add(f[i], f[i], value != null ? value : row); } puts.add(put); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index fc47ae26eb3..039c78138f3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.lang.reflect.Method; import java.util.ArrayList; @@ -38,13 +37,14 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.NavigableSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.mapreduce.Cluster; import org.apache.log4j.Level; import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; @@ -66,8 +66,11 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -93,11 +96,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; -import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -632,6 +638,71 @@ public class TestFromClientSide { assertEquals(rowCount - endKeyCount, countGreater); } + /** + * This is a coprocessor to inject a test failure so that a store scanner.reseek() call will + * fail with an IOException() on the first call. + */ + public static class ExceptionInReseekRegionObserver extends BaseRegionObserver { + static AtomicLong reqCount = new AtomicLong(0); + class MyStoreScanner extends StoreScanner { + public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet columns, + long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + } + + @Override + protected List selectScannersFrom( + List allScanners) { + List scanners = super.selectScannersFrom(allScanners); + List newScanners = new ArrayList<>(scanners.size()); + for (KeyValueScanner scanner : scanners) { + newScanners.add(new DelegatingKeyValueScanner(scanner) { + @Override + public boolean reseek(Cell key) throws IOException { + if (reqCount.incrementAndGet() == 1) { + throw new IOException("Injected exception"); + } + return super.reseek(key); + } + }); + } + return newScanners; + } + } + + @Override + public KeyValueScanner preStoreScannerOpen(ObserverContext c, + Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s) + throws IOException { + return new MyStoreScanner(store, store.getScanInfo(), scan, targetCols, Long.MAX_VALUE); + } + } + + /** + * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek + * leaving the server side RegionScanner to be in dirty state. The client has to ensure that the + * ClientScanner does not get an exception and also sees all the data. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testClientScannerIsResetWhenScanThrowsIOException() + throws IOException, InterruptedException { + TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); + TableName name = TableName.valueOf("testClientScannerIsResetWhenScanThrowsIOException"); + + HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY); + htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); + TEST_UTIL.getHBaseAdmin().createTable(htd); + try (Table t = TEST_UTIL.getConnection().getTable(name)) { + int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getHBaseAdmin().flush(name); + int actualRowCount = countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + assertEquals(rowCount, actualRowCount); + } + assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0); + } + /* * @param key * @return Scan with RowFilter that does LESS than passed key. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java index 3e915e1b376..0e2b670bf28 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java @@ -183,7 +183,7 @@ public class TestTableSnapshotScanner { } for (int j = 0; j < FAMILIES.length; j++) { - byte[] actual = result.getValue(FAMILIES[j], null); + byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + " ,actual:" + Bytes.toString(actual), row, actual); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java index 7d1267a5bb3..3df4a8f052b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -182,7 +182,7 @@ public abstract class TableSnapshotInputFormatTestBase { } for (int j = 0; j < FAMILIES.length; j++) { - byte[] actual = result.getValue(FAMILIES[j], null); + byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + " ,actual:" + Bytes.toString(actual), row, actual); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java index 9a81990688d..1cd2432ff9c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java @@ -100,6 +100,7 @@ public class TestMultithreadedTableMapper { * @param context * @throws IOException */ + @Override public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { @@ -113,7 +114,7 @@ public class TestMultithreadedTableMapper { Bytes.toString(INPUT_FAMILY) + "'."); } // Get the original value and reverse it - String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); StringBuilder newValue = new StringBuilder(originalValue); newValue.reverse(); // Now set the value to be collected diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java index 6c19a99a619..299e2261245 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java @@ -56,6 +56,7 @@ import org.junit.experimental.categories.Category; public class TestTableMapReduce extends TestTableMapReduceBase { private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); + @Override protected Log getLog() { return LOG; } /** @@ -71,6 +72,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase { * @param context * @throws IOException */ + @Override public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { @@ -85,7 +87,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase { } // Get the original value and reverse it - String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); StringBuilder newValue = new StringBuilder(originalValue); newValue.reverse(); // Now set the value to be collected diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java index cbb4fb2752a..9b7f0624901 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java @@ -128,7 +128,7 @@ public abstract class TestTableMapReduceBase { // Get the original value and reverse it - String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); StringBuilder newValue = new StringBuilder(originalValue); newValue.reverse(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java new file mode 100644 index 00000000000..10432b9402e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.Store; + +public class DelegatingKeyValueScanner implements KeyValueScanner { + protected KeyValueScanner delegate; + + public DelegatingKeyValueScanner(KeyValueScanner delegate) { + this.delegate = delegate; + } + + @Override + public Cell peek() { + return delegate.peek(); + } + + @Override + public Cell next() throws IOException { + return delegate.next(); + } + + @Override + public boolean seek(Cell key) throws IOException { + return delegate.seek(key); + } + + @Override + public boolean reseek(Cell key) throws IOException { + return delegate.reseek(key); + } + + @Override + public long getScannerOrder() { + return delegate.getScannerOrder(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { + return delegate.shouldUseScanner(scan, store, oldestUnexpiredTS); + } + + @Override + public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException { + return delegate.requestSeek(kv, forward, useBloom); + } + + @Override + public boolean realSeekDone() { + return delegate.realSeekDone(); + } + + @Override + public void enforceSeek() throws IOException { + delegate.enforceSeek(); + } + + @Override + public boolean isFileScanner() { + return delegate.isFileScanner(); + } + + @Override + public boolean backwardSeek(Cell key) throws IOException { + return delegate.backwardSeek(key); + } + + @Override + public boolean seekToPreviousRow(Cell key) throws IOException { + return delegate.seekToPreviousRow(key); + } + + @Override + public boolean seekToLastRow() throws IOException { + return delegate.seekToLastRow(); + } + + @Override + public Cell getNextIndexedKey() { + return delegate.getNextIndexedKey(); + } +} \ No newline at end of file