diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java index b951221bc21..3e7b22d8025 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UnknownScannerException.java @@ -45,4 +45,8 @@ public class UnknownScannerException extends DoNotRetryIOException { public UnknownScannerException(String s) { super(s); } + + public UnknownScannerException(String s, Exception e) { + super(s, e); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java index 3e676c719ff..de8bfcca75b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; @@ -428,7 +429,8 @@ public abstract class ClientScanner extends AbstractClientScanner { if ((cause != null && cause instanceof NotServingRegionException) || (cause != null && cause instanceof RegionServerStoppedException) || e instanceof OutOfOrderScannerNextException || - e instanceof UnknownScannerException ) { + e instanceof UnknownScannerException || + e instanceof ScannerResetException) { // Pass. It is easier writing the if loop test as list of what is allowed rather than // as a list of what is not allowed... so if in here, it means we do not throw. } else { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java index adf11537ef2..8345aa1261f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java @@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.RequestConverter; @@ -102,7 +102,7 @@ public class ScannerCallable extends RegionServerCallable { * @param scan the scan to execute * @param scanMetrics the ScanMetrics to used, if it is null, ScannerCallable won't collect * metrics - * @param rpcControllerFactory factory to use when creating + * @param rpcControllerFactory factory to use when creating * {@link com.google.protobuf.RpcController} */ public ScannerCallable(ClusterConnection connection, TableName tableName, Scan scan, @@ -174,6 +174,7 @@ public class ScannerCallable extends RegionServerCallable { } } + @Override protected Result [] rpcCall() throws Exception { if (Thread.interrupted()) { throw new InterruptedIOException(); @@ -245,14 +246,19 @@ public class ScannerCallable extends RegionServerCallable { if (e instanceof RemoteException) { ioe = ((RemoteException) e).unwrapRemoteException(); } - if (logScannerActivity && (ioe instanceof UnknownScannerException)) { - try { - HRegionLocation location = - getConnection().relocateRegion(getTableName(), scan.getStartRow()); - LOG.info("Scanner=" + scannerId + " expired, current region location is " + - location.toString()); - } catch (Throwable t) { - LOG.info("Failed to relocate region", t); + if (logScannerActivity) { + if (ioe instanceof UnknownScannerException) { + try { + HRegionLocation location = + getConnection().relocateRegion(getTableName(), scan.getStartRow()); + LOG.info("Scanner=" + scannerId + " expired, current region location is " + + location.toString()); + } catch (Throwable t) { + LOG.info("Failed to relocate region", t); + } + } else if (ioe instanceof ScannerResetException) { + LOG.info("Scanner=" + scannerId + " has received an exception, and the server " + + "asked us to reset the scanner state.", ioe); } } // The below convertion of exceptions into DoNotRetryExceptions is a little strange. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java new file mode 100644 index 00000000000..7689eb104f5 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/exceptions/ScannerResetException.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.exceptions; + +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; + +/** + * Thrown when the server side has received an Exception, and asks the Client to reset the scanner + * state by closing the current region scanner, and reopening from the start of last seen row. + */ +@InterfaceAudience.Public +@InterfaceStability.Stable +public class ScannerResetException extends DoNotRetryIOException { + private static final long serialVersionUID = -5649728171144849619L; + + /** constructor */ + public ScannerResetException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public ScannerResetException(String s) { + super(s); + } + + public ScannerResetException(String s, Exception e) { + super(s, e); + } +} diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index ffbe6fe9baf..cf9c6c7d57c 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -79,6 +79,7 @@ public interface MetricsHBaseServerSource extends BaseSource { String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException"; String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException"; String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException"; + String EXCEPTIONS_SCANNER_RESET_NAME="exceptions.ScannerResetException"; String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException"; String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException"; String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException"; @@ -108,6 +109,7 @@ public interface MetricsHBaseServerSource extends BaseSource { void movedRegionException(); void notServingRegionException(); void unknownScannerException(); + void scannerResetException(); void tooBusyException(); void multiActionTooLargeException(); void callQueueTooBigException(); diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index fafa9d00735..d372b1b6ad6 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -45,6 +45,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl private final MutableFastCounter exceptionsOOO; private final MutableFastCounter exceptionsBusy; private final MutableFastCounter exceptionsUnknown; + private final MutableFastCounter exceptionsScannerReset; private final MutableFastCounter exceptionsSanity; private final MutableFastCounter exceptionsNSRE; private final MutableFastCounter exceptionsMoved; @@ -78,6 +79,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl .newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L); this.exceptionsUnknown = this.getMetricsRegistry() .newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L); + this.exceptionsScannerReset = this.getMetricsRegistry() + .newCounter(EXCEPTIONS_SCANNER_RESET_NAME, EXCEPTIONS_TYPE_DESC, 0L); this.exceptionsSanity = this.getMetricsRegistry() .newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L); this.exceptionsMoved = this.getMetricsRegistry() @@ -161,6 +164,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl exceptionsUnknown.incr(); } + @Override + public void scannerResetException() { + exceptionsScannerReset.incr(); + } + @Override public void tooBusyException() { exceptionsBusy.incr(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java index 838bdf6cd78..fe03d4f4211 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServer.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.RegionMovedException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; @InterfaceAudience.Private public class MetricsHBaseServer { @@ -103,6 +104,8 @@ public class MetricsHBaseServer { source.tooBusyException(); } else if (throwable instanceof UnknownScannerException) { source.unknownScannerException(); + } else if (throwable instanceof ScannerResetException) { + source.scannerResetException(); } else if (throwable instanceof RegionMovedException) { source.movedRegionException(); } else if (throwable instanceof NotServingRegionException) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 6f92f9d0a8b..5ba8afdccb2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; +import org.apache.hadoop.hbase.exceptions.ScannerResetException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; @@ -2901,13 +2902,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler, isClientCellBlockSupport(context)); } } catch (IOException e) { - // if we have an exception on scanner next and we are using the callSeq - // we should rollback because the client will retry with the same callSeq - // and get an OutOfOrderScannerNextException if we don't do so. - if (rsh != null && request.hasNextCallSeq()) { - rsh.rollbackNextCallSeq(); + // The scanner state might be left in a dirty state, so we will tell the Client to + // fail this RPC and close the scanner while opening up another one from the start of + // row that the client has last seen. + closeScanner(region, scanner, scannerName, context); + + // We closed the scanner already. Instead of throwing the IOException, and client + // retrying with the same scannerId only to get USE on the next RPC, we directly throw + // a special exception to save an RPC. + if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) { + // 1.4.0+ clients know how to handle + throw new ScannerResetException("Scanner is closed on the server-side", e); + } else { + // older clients do not know about SRE. Just throw USE, which they will handle + throw new UnknownScannerException("Throwing UnknownScannerException to reset the client" + + " scanner state for clients older than 1.3.", e); } - throw e; } finally { if (context != null) { context.setCallBack(rsh.shippedCallback); @@ -2926,29 +2936,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, if (!moreResults || closeScanner) { ttl = 0; moreResults = false; - if (region != null && region.getCoprocessorHost() != null) { - if (region.getCoprocessorHost().preScannerClose(scanner)) { - return builder.build(); // bypass - } - } - rsh = scanners.remove(scannerName); - if (rsh != null) { - if (context != null) { - context.setCallBack(rsh.closeCallBack); - } else { - rsh.s.close(); - } - try { - regionServer.leases.cancelLease(scannerName); - } catch (LeaseException le) { - // No problem, ignore - if (LOG.isTraceEnabled()) { - LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); - } - } - if (region != null && region.getCoprocessorHost() != null) { - region.getCoprocessorHost().postScannerClose(scanner); - } + if (closeScanner(region, scanner, scannerName, context)) { + return builder.build(); // bypass } } @@ -2980,6 +2969,35 @@ public class RSRpcServices implements HBaseRPCErrorHandler, } } + private boolean closeScanner(Region region, RegionScanner scanner, String scannerName, + RpcCallContext context) throws IOException { + if (region != null && region.getCoprocessorHost() != null) { + if (region.getCoprocessorHost().preScannerClose(scanner)) { + return true; // bypass + } + } + RegionScannerHolder rsh = scanners.remove(scannerName); + if (rsh != null) { + if (context != null) { + context.setCallBack(rsh.closeCallBack); + } else { + rsh.s.close(); + } + try { + regionServer.leases.cancelLease(scannerName); + } catch (LeaseException le) { + // No problem, ignore + if (LOG.isTraceEnabled()) { + LOG.trace("Un-able to cancel lease of scanner. It could already be closed."); + } + } + if (region != null && region.getCoprocessorHost() != null) { + region.getCoprocessorHost().postScannerClose(scanner); + } + } + return false; + } + @Override public CoprocessorServiceResponse execRegionServerService(RpcController controller, CoprocessorServiceRequest request) throws ServiceException { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java index 5a4da4565f6..829661cbb54 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java @@ -1413,12 +1413,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public Table createTable(TableName tableName, byte[][] families, int numVersions, byte[] startKey, byte[] endKey, int numRegions) throws IOException{ - HTableDescriptor desc = new HTableDescriptor(tableName); - for (byte[] family : families) { - HColumnDescriptor hcd = new HColumnDescriptor(family) - .setMaxVersions(numVersions); - desc.addFamily(hcd); - } + HTableDescriptor desc = createTableDescriptor(tableName, families, numVersions); + getAdmin().createTable(desc, startKey, endKey, numRegions); // HBaseAdmin only waits for regions to appear in hbase:meta we // should wait until they are assigned @@ -1781,6 +1777,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED); } + public HTableDescriptor createTableDescriptor(final TableName tableName, + byte[] family) { + return createTableDescriptor(tableName, new byte[][] {family}, 1); + } + + public HTableDescriptor createTableDescriptor(final TableName tableName, + byte[][] families, int maxVersions) { + HTableDescriptor desc = new HTableDescriptor(tableName); + for (byte[] family : families) { + HColumnDescriptor hcd = new HColumnDescriptor(family) + .setMaxVersions(maxVersions); + desc.addFamily(hcd); + } + return desc; + } + /** * Create an HRegion that writes to the local tmp dirs * @param desc @@ -1998,7 +2010,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); for (int i = 0; i < f.length; i++) { byte[] value1 = value != null ? value : row; - put.addColumn(f[i], null, value1); + put.addColumn(f[i], f[i], value1); } puts.add(put); } @@ -3540,6 +3552,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility { public PortAllocator(Random random) { this.random = random; this.portChecker = new AvailablePortChecker() { + @Override public boolean available(int port) { try { ServerSocket sock = new ServerSocket(port); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java index f46562540e8..33a53156b3a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java @@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; - import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -37,10 +36,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NavigableMap; +import java.util.NavigableSet; import java.util.UUID; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.lang.ArrayUtils; @@ -63,8 +64,11 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; @@ -91,10 +95,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; +import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner; import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.ScanInfo; import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -536,6 +544,71 @@ public class TestFromClientSide { assertEquals(rowCount - endKeyCount, countGreater); } + /** + * This is a coprocessor to inject a test failure so that a store scanner.reseek() call will + * fail with an IOException() on the first call. + */ + public static class ExceptionInReseekRegionObserver extends BaseRegionObserver { + static AtomicLong reqCount = new AtomicLong(0); + class MyStoreScanner extends StoreScanner { + public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet columns, + long readPt) throws IOException { + super(store, scanInfo, scan, columns, readPt); + } + + @Override + protected List selectScannersFrom( + List allScanners) { + List scanners = super.selectScannersFrom(allScanners); + List newScanners = new ArrayList<>(scanners.size()); + for (KeyValueScanner scanner : scanners) { + newScanners.add(new DelegatingKeyValueScanner(scanner) { + @Override + public boolean reseek(Cell key) throws IOException { + if (reqCount.incrementAndGet() == 1) { + throw new IOException("Injected exception"); + } + return super.reseek(key); + } + }); + } + return newScanners; + } + } + + @Override + public KeyValueScanner preStoreScannerOpen(ObserverContext c, + Store store, Scan scan, NavigableSet targetCols, KeyValueScanner s, + final long readPt) throws IOException { + return new MyStoreScanner(store, store.getScanInfo(), scan, targetCols, readPt); + } + } + + /** + * Tests the case where a Scan can throw an IOException in the middle of the seek / reseek + * leaving the server side RegionScanner to be in dirty state. The client has to ensure that the + * ClientScanner does not get an exception and also sees all the data. + * @throws IOException + * @throws InterruptedException + */ + @Test + public void testClientScannerIsResetWhenScanThrowsIOException() + throws IOException, InterruptedException { + TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true); + TableName name = TableName.valueOf("testClientScannerIsResetWhenScanThrowsIOException"); + + HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY); + htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName()); + TEST_UTIL.getAdmin().createTable(htd); + try (Table t = TEST_UTIL.getConnection().getTable(name)) { + int rowCount = TEST_UTIL.loadTable(t, FAMILY, false); + TEST_UTIL.getAdmin().flush(name); + int actualRowCount = countRows(t, new Scan().addColumn(FAMILY, FAMILY)); + assertEquals(rowCount, actualRowCount); + } + assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0); + } + /* * @param key * @return Scan with RowFilter that does LESS than passed key. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java index 0f0baffc0eb..8b9428fe85a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestTableSnapshotScanner.java @@ -184,7 +184,7 @@ public class TestTableSnapshotScanner { } for (int j = 0; j < FAMILIES.length; j++) { - byte[] actual = result.getValue(FAMILIES[j], null); + byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + " ,actual:" + Bytes.toString(actual), row, actual); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java index 26e58975621..66d290a43d2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TableSnapshotInputFormatTestBase.java @@ -181,7 +181,7 @@ public abstract class TableSnapshotInputFormatTestBase { } for (int j = 0; j < FAMILIES.length; j++) { - byte[] actual = result.getValue(FAMILIES[j], null); + byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]); Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) + " ,actual:" + Bytes.toString(actual), row, actual); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java index 5110ef79b3e..694a359a5a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestMultithreadedTableMapper.java @@ -97,6 +97,7 @@ public class TestMultithreadedTableMapper { * @param context * @throws IOException */ + @Override public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { @@ -110,7 +111,7 @@ public class TestMultithreadedTableMapper { Bytes.toString(INPUT_FAMILY) + "'."); } // Get the original value and reverse it - String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); StringBuilder newValue = new StringBuilder(originalValue); newValue.reverse(); // Now set the value to be collected diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java index fa5b9a48f40..690e776077a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduce.java @@ -57,6 +57,7 @@ import org.junit.experimental.categories.Category; public class TestTableMapReduce extends TestTableMapReduceBase { private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); + @Override protected Log getLog() { return LOG; } /** @@ -72,6 +73,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase { * @param context * @throws IOException */ + @Override public void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { @@ -86,7 +88,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase { } // Get the original value and reverse it - String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); StringBuilder newValue = new StringBuilder(originalValue); newValue.reverse(); // Now set the value to be collected @@ -96,6 +98,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase { } } + @Override protected void runTestOnTable(Table table) throws IOException { Job job = null; try { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java index e78bf4f9e6e..27bf0637ecb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestTableMapReduceBase.java @@ -126,7 +126,7 @@ public abstract class TestTableMapReduceBase { // Get the original value and reverse it - String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); + String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY)); StringBuilder newValue = new StringBuilder(originalValue); newValue.reverse(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java new file mode 100644 index 00000000000..51a2a978f80 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.KeyValueScanner; +import org.apache.hadoop.hbase.regionserver.Store; + +public class DelegatingKeyValueScanner implements KeyValueScanner { + protected KeyValueScanner delegate; + + public DelegatingKeyValueScanner(KeyValueScanner delegate) { + this.delegate = delegate; + } + + @Override + public void shipped() throws IOException { + delegate.shipped(); + } + + @Override + public Cell peek() { + return delegate.peek(); + } + + @Override + public Cell next() throws IOException { + return delegate.next(); + } + + @Override + public boolean seek(Cell key) throws IOException { + return delegate.seek(key); + } + + @Override + public boolean reseek(Cell key) throws IOException { + return delegate.reseek(key); + } + + @Override + public long getScannerOrder() { + return delegate.getScannerOrder(); + } + + @Override + public void close() { + delegate.close(); + } + + @Override + public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { + return delegate.shouldUseScanner(scan, store, oldestUnexpiredTS); + } + + @Override + public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException { + return delegate.requestSeek(kv, forward, useBloom); + } + + @Override + public boolean realSeekDone() { + return delegate.realSeekDone(); + } + + @Override + public void enforceSeek() throws IOException { + delegate.enforceSeek(); + } + + @Override + public boolean isFileScanner() { + return delegate.isFileScanner(); + } + + @Override + public boolean backwardSeek(Cell key) throws IOException { + return delegate.backwardSeek(key); + } + + @Override + public boolean seekToPreviousRow(Cell key) throws IOException { + return delegate.seekToPreviousRow(key); + } + + @Override + public boolean seekToLastRow() throws IOException { + return delegate.seekToLastRow(); + } + + @Override + public Cell getNextIndexedKey() { + return delegate.getNextIndexedKey(); + } +} \ No newline at end of file