HBASE-16604 Scanner retries on IOException can cause the scans to miss data - RECOMMIT after revert

This commit is contained in:
Enis Soztutar 2016-09-23 11:27:13 -07:00
parent 39db0cac78
commit eb112783ae
16 changed files with 350 additions and 53 deletions

View File

@ -45,4 +45,8 @@ public class UnknownScannerException extends DoNotRetryIOException {
public UnknownScannerException(String s) { public UnknownScannerException(String s) {
super(s); super(s);
} }
public UnknownScannerException(String s, Exception e) {
super(s, e);
}
} }

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos; import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
@ -428,7 +429,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
if ((cause != null && cause instanceof NotServingRegionException) || if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) || (cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException || e instanceof OutOfOrderScannerNextException ||
e instanceof UnknownScannerException ) { e instanceof UnknownScannerException ||
e instanceof ScannerResetException) {
// Pass. It is easier writing the if loop test as list of what is allowed rather than // Pass. It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw. // as a list of what is not allowed... so if in here, it means we do not throw.
} else { } else {

View File

@ -40,7 +40,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException; import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter; import org.apache.hadoop.hbase.protobuf.RequestConverter;
@ -174,6 +174,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
} }
} }
@Override
protected Result [] rpcCall() throws Exception { protected Result [] rpcCall() throws Exception {
if (Thread.interrupted()) { if (Thread.interrupted()) {
throw new InterruptedIOException(); throw new InterruptedIOException();
@ -245,7 +246,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
if (e instanceof RemoteException) { if (e instanceof RemoteException) {
ioe = ((RemoteException) e).unwrapRemoteException(); ioe = ((RemoteException) e).unwrapRemoteException();
} }
if (logScannerActivity && (ioe instanceof UnknownScannerException)) { if (logScannerActivity) {
if (ioe instanceof UnknownScannerException) {
try { try {
HRegionLocation location = HRegionLocation location =
getConnection().relocateRegion(getTableName(), scan.getStartRow()); getConnection().relocateRegion(getTableName(), scan.getStartRow());
@ -254,6 +256,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
} catch (Throwable t) { } catch (Throwable t) {
LOG.info("Failed to relocate region", t); LOG.info("Failed to relocate region", t);
} }
} else if (ioe instanceof ScannerResetException) {
LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
+ "asked us to reset the scanner state.", ioe);
}
} }
// The below convertion of exceptions into DoNotRetryExceptions is a little strange. // The below convertion of exceptions into DoNotRetryExceptions is a little strange.
// Why not just have these exceptions implment DNRIOE you ask? Well, usually we want // Why not just have these exceptions implment DNRIOE you ask? Well, usually we want

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.exceptions;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Thrown when the server side has received an Exception, and asks the Client to reset the scanner
* state by closing the current region scanner, and reopening from the start of last seen row.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ScannerResetException extends DoNotRetryIOException {
private static final long serialVersionUID = -5649728171144849619L;
/** constructor */
public ScannerResetException() {
super();
}
/**
* Constructor
* @param s message
*/
public ScannerResetException(String s) {
super(s);
}
public ScannerResetException(String s, Exception e) {
super(s, e);
}
}

View File

@ -79,6 +79,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException"; String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException";
String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException"; String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException";
String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException"; String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException";
String EXCEPTIONS_SCANNER_RESET_NAME="exceptions.ScannerResetException";
String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException"; String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException"; String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException"; String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
@ -108,6 +109,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
void movedRegionException(); void movedRegionException();
void notServingRegionException(); void notServingRegionException();
void unknownScannerException(); void unknownScannerException();
void scannerResetException();
void tooBusyException(); void tooBusyException();
void multiActionTooLargeException(); void multiActionTooLargeException();
void callQueueTooBigException(); void callQueueTooBigException();

View File

@ -45,6 +45,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
private final MutableFastCounter exceptionsOOO; private final MutableFastCounter exceptionsOOO;
private final MutableFastCounter exceptionsBusy; private final MutableFastCounter exceptionsBusy;
private final MutableFastCounter exceptionsUnknown; private final MutableFastCounter exceptionsUnknown;
private final MutableFastCounter exceptionsScannerReset;
private final MutableFastCounter exceptionsSanity; private final MutableFastCounter exceptionsSanity;
private final MutableFastCounter exceptionsNSRE; private final MutableFastCounter exceptionsNSRE;
private final MutableFastCounter exceptionsMoved; private final MutableFastCounter exceptionsMoved;
@ -78,6 +79,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L); .newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsUnknown = this.getMetricsRegistry() this.exceptionsUnknown = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L); .newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsScannerReset = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_SCANNER_RESET_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsSanity = this.getMetricsRegistry() this.exceptionsSanity = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L); .newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsMoved = this.getMetricsRegistry() this.exceptionsMoved = this.getMetricsRegistry()
@ -161,6 +164,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
exceptionsUnknown.incr(); exceptionsUnknown.incr();
} }
@Override
public void scannerResetException() {
exceptionsScannerReset.incr();
}
@Override @Override
public void tooBusyException() { public void tooBusyException() {
exceptionsBusy.incr(); exceptionsBusy.incr();

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException; import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@InterfaceAudience.Private @InterfaceAudience.Private
public class MetricsHBaseServer { public class MetricsHBaseServer {
@ -103,6 +104,8 @@ public class MetricsHBaseServer {
source.tooBusyException(); source.tooBusyException();
} else if (throwable instanceof UnknownScannerException) { } else if (throwable instanceof UnknownScannerException) {
source.unknownScannerException(); source.unknownScannerException();
} else if (throwable instanceof ScannerResetException) {
source.scannerResetException();
} else if (throwable instanceof RegionMovedException) { } else if (throwable instanceof RegionMovedException) {
source.movedRegionException(); source.movedRegionException();
} else if (throwable instanceof NotServingRegionException) { } else if (throwable instanceof NotServingRegionException) {

View File

@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException; import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException; import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@ -2901,13 +2902,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
isClientCellBlockSupport(context)); isClientCellBlockSupport(context));
} }
} catch (IOException e) { } catch (IOException e) {
// if we have an exception on scanner next and we are using the callSeq // The scanner state might be left in a dirty state, so we will tell the Client to
// we should rollback because the client will retry with the same callSeq // fail this RPC and close the scanner while opening up another one from the start of
// and get an OutOfOrderScannerNextException if we don't do so. // row that the client has last seen.
if (rsh != null && request.hasNextCallSeq()) { closeScanner(region, scanner, scannerName, context);
rsh.rollbackNextCallSeq();
// We closed the scanner already. Instead of throwing the IOException, and client
// retrying with the same scannerId only to get USE on the next RPC, we directly throw
// a special exception to save an RPC.
if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
// 1.4.0+ clients know how to handle
throw new ScannerResetException("Scanner is closed on the server-side", e);
} else {
// older clients do not know about SRE. Just throw USE, which they will handle
throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
+ " scanner state for clients older than 1.3.", e);
} }
throw e;
} finally { } finally {
if (context != null) { if (context != null) {
context.setCallBack(rsh.shippedCallback); context.setCallBack(rsh.shippedCallback);
@ -2926,31 +2936,10 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!moreResults || closeScanner) { if (!moreResults || closeScanner) {
ttl = 0; ttl = 0;
moreResults = false; moreResults = false;
if (region != null && region.getCoprocessorHost() != null) { if (closeScanner(region, scanner, scannerName, context)) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
return builder.build(); // bypass return builder.build(); // bypass
} }
} }
rsh = scanners.remove(scannerName);
if (rsh != null) {
if (context != null) {
context.setCallBack(rsh.closeCallBack);
} else {
rsh.s.close();
}
try {
regionServer.leases.cancelLease(scannerName);
} catch (LeaseException le) {
// No problem, ignore
if (LOG.isTraceEnabled()) {
LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
}
}
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
}
}
if (ttl > 0) { if (ttl > 0) {
builder.setTtl(ttl); builder.setTtl(ttl);
@ -2980,6 +2969,35 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
} }
} }
private boolean closeScanner(Region region, RegionScanner scanner, String scannerName,
RpcCallContext context) throws IOException {
if (region != null && region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
return true; // bypass
}
}
RegionScannerHolder rsh = scanners.remove(scannerName);
if (rsh != null) {
if (context != null) {
context.setCallBack(rsh.closeCallBack);
} else {
rsh.s.close();
}
try {
regionServer.leases.cancelLease(scannerName);
} catch (LeaseException le) {
// No problem, ignore
if (LOG.isTraceEnabled()) {
LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
}
}
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
}
return false;
}
@Override @Override
public CoprocessorServiceResponse execRegionServerService(RpcController controller, public CoprocessorServiceResponse execRegionServerService(RpcController controller,
CoprocessorServiceRequest request) throws ServiceException { CoprocessorServiceRequest request) throws ServiceException {

View File

@ -1413,12 +1413,8 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public Table createTable(TableName tableName, byte[][] families, public Table createTable(TableName tableName, byte[][] families,
int numVersions, byte[] startKey, byte[] endKey, int numRegions) int numVersions, byte[] startKey, byte[] endKey, int numRegions)
throws IOException{ throws IOException{
HTableDescriptor desc = new HTableDescriptor(tableName); HTableDescriptor desc = createTableDescriptor(tableName, families, numVersions);
for (byte[] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family)
.setMaxVersions(numVersions);
desc.addFamily(hcd);
}
getAdmin().createTable(desc, startKey, endKey, numRegions); getAdmin().createTable(desc, startKey, endKey, numRegions);
// HBaseAdmin only waits for regions to appear in hbase:meta we // HBaseAdmin only waits for regions to appear in hbase:meta we
// should wait until they are assigned // should wait until they are assigned
@ -1781,6 +1777,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED); MAXVERSIONS, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
} }
public HTableDescriptor createTableDescriptor(final TableName tableName,
byte[] family) {
return createTableDescriptor(tableName, new byte[][] {family}, 1);
}
public HTableDescriptor createTableDescriptor(final TableName tableName,
byte[][] families, int maxVersions) {
HTableDescriptor desc = new HTableDescriptor(tableName);
for (byte[] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family)
.setMaxVersions(maxVersions);
desc.addFamily(hcd);
}
return desc;
}
/** /**
* Create an HRegion that writes to the local tmp dirs * Create an HRegion that writes to the local tmp dirs
* @param desc * @param desc
@ -1998,7 +2010,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL); put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
for (int i = 0; i < f.length; i++) { for (int i = 0; i < f.length; i++) {
byte[] value1 = value != null ? value : row; byte[] value1 = value != null ? value : row;
put.addColumn(f[i], null, value1); put.addColumn(f[i], f[i], value1);
} }
puts.add(put); puts.add(put);
} }
@ -3540,6 +3552,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
public PortAllocator(Random random) { public PortAllocator(Random random) {
this.random = random; this.random = random;
this.portChecker = new AvailablePortChecker() { this.portChecker = new AvailablePortChecker() {
@Override
public boolean available(int port) { public boolean available(int port) {
try { try {
ServerSocket sock = new ServerSocket(port); ServerSocket sock = new ServerSocket(port);

View File

@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
@ -37,10 +36,12 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableMap; import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang.ArrayUtils; import org.apache.commons.lang.ArrayUtils;
@ -63,8 +64,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics; import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint; import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter; import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@ -91,10 +95,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest; import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -536,6 +544,71 @@ public class TestFromClientSide {
assertEquals(rowCount - endKeyCount, countGreater); assertEquals(rowCount - endKeyCount, countGreater);
} }
/**
* This is a coprocessor to inject a test failure so that a store scanner.reseek() call will
* fail with an IOException() on the first call.
*/
public static class ExceptionInReseekRegionObserver extends BaseRegionObserver {
static AtomicLong reqCount = new AtomicLong(0);
class MyStoreScanner extends StoreScanner {
public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
super(store, scanInfo, scan, columns, readPt);
}
@Override
protected List<KeyValueScanner> selectScannersFrom(
List<? extends KeyValueScanner> allScanners) {
List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners);
List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
for (KeyValueScanner scanner : scanners) {
newScanners.add(new DelegatingKeyValueScanner(scanner) {
@Override
public boolean reseek(Cell key) throws IOException {
if (reqCount.incrementAndGet() == 1) {
throw new IOException("Injected exception");
}
return super.reseek(key);
}
});
}
return newScanners;
}
}
@Override
public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s,
final long readPt) throws IOException {
return new MyStoreScanner(store, store.getScanInfo(), scan, targetCols, readPt);
}
}
/**
* Tests the case where a Scan can throw an IOException in the middle of the seek / reseek
* leaving the server side RegionScanner to be in dirty state. The client has to ensure that the
* ClientScanner does not get an exception and also sees all the data.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testClientScannerIsResetWhenScanThrowsIOException()
throws IOException, InterruptedException {
TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
TableName name = TableName.valueOf("testClientScannerIsResetWhenScanThrowsIOException");
HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
TEST_UTIL.getAdmin().createTable(htd);
try (Table t = TEST_UTIL.getConnection().getTable(name)) {
int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
TEST_UTIL.getAdmin().flush(name);
int actualRowCount = countRows(t, new Scan().addColumn(FAMILY, FAMILY));
assertEquals(rowCount, actualRowCount);
}
assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
}
/* /*
* @param key * @param key
* @return Scan with RowFilter that does LESS than passed key. * @return Scan with RowFilter that does LESS than passed key.

View File

@ -184,7 +184,7 @@ public class TestTableSnapshotScanner {
} }
for (int j = 0; j < FAMILIES.length; j++) { for (int j = 0; j < FAMILIES.length; j++) {
byte[] actual = result.getValue(FAMILIES[j], null); byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual); + " ,actual:" + Bytes.toString(actual), row, actual);
} }

View File

@ -181,7 +181,7 @@ public abstract class TableSnapshotInputFormatTestBase {
} }
for (int j = 0; j < FAMILIES.length; j++) { for (int j = 0; j < FAMILIES.length; j++) {
byte[] actual = result.getValue(FAMILIES[j], null); byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row) Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual); + " ,actual:" + Bytes.toString(actual), row, actual);
} }

View File

@ -97,6 +97,7 @@ public class TestMultithreadedTableMapper {
* @param context * @param context
* @throws IOException * @throws IOException
*/ */
@Override
public void map(ImmutableBytesWritable key, Result value, public void map(ImmutableBytesWritable key, Result value,
Context context) Context context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -110,7 +111,7 @@ public class TestMultithreadedTableMapper {
Bytes.toString(INPUT_FAMILY) + "'."); Bytes.toString(INPUT_FAMILY) + "'.");
} }
// Get the original value and reverse it // Get the original value and reverse it
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue); StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse(); newValue.reverse();
// Now set the value to be collected // Now set the value to be collected

View File

@ -57,6 +57,7 @@ import org.junit.experimental.categories.Category;
public class TestTableMapReduce extends TestTableMapReduceBase { public class TestTableMapReduce extends TestTableMapReduceBase {
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class); private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
@Override
protected Log getLog() { return LOG; } protected Log getLog() { return LOG; }
/** /**
@ -72,6 +73,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
* @param context * @param context
* @throws IOException * @throws IOException
*/ */
@Override
public void map(ImmutableBytesWritable key, Result value, public void map(ImmutableBytesWritable key, Result value,
Context context) Context context)
throws IOException, InterruptedException { throws IOException, InterruptedException {
@ -86,7 +88,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
} }
// Get the original value and reverse it // Get the original value and reverse it
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue); StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse(); newValue.reverse();
// Now set the value to be collected // Now set the value to be collected
@ -96,6 +98,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
} }
} }
@Override
protected void runTestOnTable(Table table) throws IOException { protected void runTestOnTable(Table table) throws IOException {
Job job = null; Job job = null;
try { try {

View File

@ -126,7 +126,7 @@ public abstract class TestTableMapReduceBase {
// Get the original value and reverse it // Get the original value and reverse it
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null)); String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue); StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse(); newValue.reverse();

View File

@ -0,0 +1,114 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Store;
public class DelegatingKeyValueScanner implements KeyValueScanner {
protected KeyValueScanner delegate;
public DelegatingKeyValueScanner(KeyValueScanner delegate) {
this.delegate = delegate;
}
@Override
public void shipped() throws IOException {
delegate.shipped();
}
@Override
public Cell peek() {
return delegate.peek();
}
@Override
public Cell next() throws IOException {
return delegate.next();
}
@Override
public boolean seek(Cell key) throws IOException {
return delegate.seek(key);
}
@Override
public boolean reseek(Cell key) throws IOException {
return delegate.reseek(key);
}
@Override
public long getScannerOrder() {
return delegate.getScannerOrder();
}
@Override
public void close() {
delegate.close();
}
@Override
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
return delegate.shouldUseScanner(scan, store, oldestUnexpiredTS);
}
@Override
public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException {
return delegate.requestSeek(kv, forward, useBloom);
}
@Override
public boolean realSeekDone() {
return delegate.realSeekDone();
}
@Override
public void enforceSeek() throws IOException {
delegate.enforceSeek();
}
@Override
public boolean isFileScanner() {
return delegate.isFileScanner();
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
return delegate.backwardSeek(key);
}
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
return delegate.seekToPreviousRow(key);
}
@Override
public boolean seekToLastRow() throws IOException {
return delegate.seekToLastRow();
}
@Override
public Cell getNextIndexedKey() {
return delegate.getNextIndexedKey();
}
}