HBASE-16604 Scanner retries on IOException can cause the scans to miss data

This commit is contained in:
Enis Soztutar 2016-09-22 17:41:01 -07:00
parent 6973304026
commit 8a797e81b8
16 changed files with 337 additions and 40 deletions

View File

@ -45,4 +45,8 @@ public class UnknownScannerException extends DoNotRetryIOException {
public UnknownScannerException(String s) {
super(s);
}
public UnknownScannerException(String s, Exception e) {
super(s, e);
}
}

View File

@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
@ -433,7 +434,8 @@ public class ClientScanner extends AbstractClientScanner {
if ((cause != null && cause instanceof NotServingRegionException) ||
(cause != null && cause instanceof RegionServerStoppedException) ||
e instanceof OutOfOrderScannerNextException ||
e instanceof UnknownScannerException ) {
e instanceof UnknownScannerException ||
e instanceof ScannerResetException) {
// Pass. It is easier writing the if loop test as list of what is allowed rather than
// as a list of what is not allowed... so if in here, it means we do not throw.
} else {

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@ -275,7 +276,8 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
if (e instanceof RemoteException) {
ioe = RemoteExceptionHandler.decodeRemoteException((RemoteException)e);
}
if (logScannerActivity && (ioe instanceof UnknownScannerException)) {
if (logScannerActivity) {
if (ioe instanceof UnknownScannerException) {
try {
HRegionLocation location =
getConnection().relocateRegion(getTableName(), scan.getStartRow());
@ -284,6 +286,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
} catch (Throwable t) {
LOG.info("Failed to relocate region", t);
}
} else if (ioe instanceof ScannerResetException) {
LOG.info("Scanner=" + scannerId + " has received an exception, and the server "
+ "asked us to reset the scanner state.", ioe);
}
}
// The below convertion of exceptions into DoNotRetryExceptions is a little strange.
// Why not just have these exceptions implment DNRIOE you ask? Well, usually we want

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.exceptions;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* Thrown when the server side has received an Exception, and asks the Client to reset the scanner
* state by closing the current region scanner, and reopening from the start of last seen row.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ScannerResetException extends DoNotRetryIOException {
private static final long serialVersionUID = -5649728171144849619L;
/** constructor */
public ScannerResetException() {
super();
}
/**
* Constructor
* @param s message
*/
public ScannerResetException(String s) {
super(s);
}
public ScannerResetException(String s, Exception e) {
super(s, e);
}
}

View File

@ -77,6 +77,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
String EXCEPTIONS_OOO_NAME="exceptions.OutOfOrderScannerNextException";
String EXCEPTIONS_BUSY_NAME="exceptions.RegionTooBusyException";
String EXCEPTIONS_UNKNOWN_NAME="exceptions.UnknownScannerException";
String EXCEPTIONS_SCANNER_RESET_NAME="exceptions.ScannerResetException";
String EXCEPTIONS_SANITY_NAME="exceptions.FailedSanityCheckException";
String EXCEPTIONS_MOVED_NAME="exceptions.RegionMovedException";
String EXCEPTIONS_NSRE_NAME="exceptions.NotServingRegionException";
@ -106,6 +107,7 @@ public interface MetricsHBaseServerSource extends BaseSource {
void movedRegionException();
void notServingRegionException();
void unknownScannerException();
void scannerResetException();
void tooBusyException();
void multiActionTooLargeException();
void callQueueTooBigException();

View File

@ -45,6 +45,7 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
private final MutableFastCounter exceptionsOOO;
private final MutableFastCounter exceptionsBusy;
private final MutableFastCounter exceptionsUnknown;
private final MutableFastCounter exceptionsScannerReset;
private final MutableFastCounter exceptionsSanity;
private final MutableFastCounter exceptionsNSRE;
private final MutableFastCounter exceptionsMoved;
@ -78,6 +79,8 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
.newCounter(EXCEPTIONS_BUSY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsUnknown = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_UNKNOWN_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsScannerReset = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_SCANNER_RESET_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsSanity = this.getMetricsRegistry()
.newCounter(EXCEPTIONS_SANITY_NAME, EXCEPTIONS_TYPE_DESC, 0L);
this.exceptionsMoved = this.getMetricsRegistry()
@ -161,6 +164,11 @@ public class MetricsHBaseServerSourceImpl extends BaseSourceImpl
exceptionsUnknown.incr();
}
@Override
public void scannerResetException() {
exceptionsScannerReset.incr();
}
@Override
public void tooBusyException() {
exceptionsBusy.incr();

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
@InterfaceAudience.Private
public class MetricsHBaseServer {
@ -103,6 +104,8 @@ public class MetricsHBaseServer {
source.tooBusyException();
} else if (throwable instanceof UnknownScannerException) {
source.unknownScannerException();
} else if (throwable instanceof ScannerResetException) {
source.scannerResetException();
} else if (throwable instanceof RegionMovedException) {
source.movedRegionException();
} else if (throwable instanceof NotServingRegionException) {

View File

@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.exceptions.FailedSanityCheckException;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.exceptions.OperationConflictException;
import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException;
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@ -2733,13 +2734,22 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
addResults(builder, results, controller, RegionReplicaUtil.isDefaultReplica(region.getRegionInfo()));
}
} catch (IOException e) {
// if we have an exception on scanner next and we are using the callSeq
// we should rollback because the client will retry with the same callSeq
// and get an OutOfOrderScannerNextException if we don't do so.
if (rsh != null && request.hasNextCallSeq()) {
rsh.rollbackNextCallSeq();
// The scanner state might be left in a dirty state, so we will tell the Client to
// fail this RPC and close the scanner while opening up another one from the start of
// row that the client has last seen.
closeScanner(region, scanner, scannerName);
// We closed the scanner already. Instead of throwing the IOException, and client
// retrying with the same scannerId only to get USE on the next RPC, we directly throw
// a special exception to save an RPC.
if (VersionInfoUtil.hasMinimumVersion(context.getClientVersionInfo(), 1, 4)) {
// 1.4.0+ clients know how to handle
throw new ScannerResetException("Scanner is closed on the server-side", e);
} else {
// older clients do not know about SRE. Just throw USE, which they will handle
throw new UnknownScannerException("Throwing UnknownScannerException to reset the client"
+ " scanner state for clients older than 1.3.", e);
}
throw e;
} finally {
// We're done. On way out re-add the above removed lease.
// Adding resets expiration time on lease.
@ -2753,20 +2763,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
if (!moreResults || closeScanner) {
ttl = 0;
moreResults = false;
if (region != null && region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
return builder.build(); // bypass
}
}
rsh = scanners.remove(scannerName);
if (rsh != null) {
scanner = rsh.s;
scanner.close();
regionServer.leases.cancelLease(scannerName);
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
}
closeScanner(region, scanner, scannerName);
}
if (ttl > 0) {
@ -2797,6 +2794,32 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
private boolean closeScanner(Region region, RegionScanner scanner, String scannerName)
throws IOException {
if (region != null && region.getCoprocessorHost() != null) {
if (region.getCoprocessorHost().preScannerClose(scanner)) {
return true; // bypass
}
}
RegionScannerHolder rsh = scanners.remove(scannerName);
if (rsh != null) {
scanner = rsh.s;
scanner.close();
try {
regionServer.leases.cancelLease(scannerName);
} catch (LeaseException le) {
// No problem, ignore
if (LOG.isTraceEnabled()) {
LOG.trace("Un-able to cancel lease of scanner. It could already be closed.");
}
}
if (region != null && region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postScannerClose(scanner);
}
}
return false;
}
@Override
public CoprocessorServiceResponse execRegionServerService(RpcController controller,
CoprocessorServiceRequest request) throws ServiceException {

View File

@ -2007,6 +2007,22 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
return HRegion.createHRegion(info, rootDir, conf, htd);
}
public HTableDescriptor createTableDescriptor(final TableName tableName,
byte[] family) {
return createTableDescriptor(tableName, new byte[][] {family}, 1);
}
public HTableDescriptor createTableDescriptor(final TableName tableName,
byte[][] families, int maxVersions) {
HTableDescriptor desc = new HTableDescriptor(tableName);
for (byte[] family : families) {
HColumnDescriptor hcd = new HColumnDescriptor(family)
.setMaxVersions(maxVersions);
desc.addFamily(hcd);
}
return desc;
}
/**
* Create an HRegion that writes to the local tmp dirs
* @param desc
@ -2221,7 +2237,7 @@ public class HBaseTestingUtility extends HBaseCommonTestingUtility {
Put put = new Put(row);
put.setDurability(writeToWAL ? Durability.USE_DEFAULT : Durability.SKIP_WAL);
for (int i = 0; i < f.length; i++) {
put.add(f[i], null, value != null ? value : row);
put.add(f[i], f[i], value != null ? value : row);
}
puts.add(put);
}

View File

@ -26,7 +26,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.ArrayList;
@ -38,13 +37,14 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.NavigableSet;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.log4j.Level;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
@ -66,8 +66,11 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
@ -93,11 +96,14 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsRequest;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.DelegatingKeyValueScanner;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -632,6 +638,71 @@ public class TestFromClientSide {
assertEquals(rowCount - endKeyCount, countGreater);
}
/**
* This is a coprocessor to inject a test failure so that a store scanner.reseek() call will
* fail with an IOException() on the first call.
*/
public static class ExceptionInReseekRegionObserver extends BaseRegionObserver {
static AtomicLong reqCount = new AtomicLong(0);
class MyStoreScanner extends StoreScanner {
public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
long readPt) throws IOException {
super(store, scanInfo, scan, columns, readPt);
}
@Override
protected List<KeyValueScanner> selectScannersFrom(
List<? extends KeyValueScanner> allScanners) {
List<KeyValueScanner> scanners = super.selectScannersFrom(allScanners);
List<KeyValueScanner> newScanners = new ArrayList<>(scanners.size());
for (KeyValueScanner scanner : scanners) {
newScanners.add(new DelegatingKeyValueScanner(scanner) {
@Override
public boolean reseek(Cell key) throws IOException {
if (reqCount.incrementAndGet() == 1) {
throw new IOException("Injected exception");
}
return super.reseek(key);
}
});
}
return newScanners;
}
}
@Override
public KeyValueScanner preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, Scan scan, NavigableSet<byte[]> targetCols, KeyValueScanner s)
throws IOException {
return new MyStoreScanner(store, store.getScanInfo(), scan, targetCols, Long.MAX_VALUE);
}
}
/**
* Tests the case where a Scan can throw an IOException in the middle of the seek / reseek
* leaving the server side RegionScanner to be in dirty state. The client has to ensure that the
* ClientScanner does not get an exception and also sees all the data.
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testClientScannerIsResetWhenScanThrowsIOException()
throws IOException, InterruptedException {
TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
TableName name = TableName.valueOf("testClientScannerIsResetWhenScanThrowsIOException");
HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
TEST_UTIL.getHBaseAdmin().createTable(htd);
try (Table t = TEST_UTIL.getConnection().getTable(name)) {
int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
TEST_UTIL.getHBaseAdmin().flush(name);
int actualRowCount = countRows(t, new Scan().addColumn(FAMILY, FAMILY));
assertEquals(rowCount, actualRowCount);
}
assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
}
/*
* @param key
* @return Scan with RowFilter that does LESS than passed key.

View File

@ -183,7 +183,7 @@ public class TestTableSnapshotScanner {
}
for (int j = 0; j < FAMILIES.length; j++) {
byte[] actual = result.getValue(FAMILIES[j], null);
byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}

View File

@ -182,7 +182,7 @@ public abstract class TableSnapshotInputFormatTestBase {
}
for (int j = 0; j < FAMILIES.length; j++) {
byte[] actual = result.getValue(FAMILIES[j], null);
byte[] actual = result.getValue(FAMILIES[j], FAMILIES[j]);
Assert.assertArrayEquals("Row in snapshot does not match, expected:" + Bytes.toString(row)
+ " ,actual:" + Bytes.toString(actual), row, actual);
}

View File

@ -100,6 +100,7 @@ public class TestMultithreadedTableMapper {
* @param context
* @throws IOException
*/
@Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@ -113,7 +114,7 @@ public class TestMultithreadedTableMapper {
Bytes.toString(INPUT_FAMILY) + "'.");
}
// Get the original value and reverse it
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected

View File

@ -56,6 +56,7 @@ import org.junit.experimental.categories.Category;
public class TestTableMapReduce extends TestTableMapReduceBase {
private static final Log LOG = LogFactory.getLog(TestTableMapReduce.class);
@Override
protected Log getLog() { return LOG; }
/**
@ -71,6 +72,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
* @param context
* @throws IOException
*/
@Override
public void map(ImmutableBytesWritable key, Result value,
Context context)
throws IOException, InterruptedException {
@ -85,7 +87,7 @@ public class TestTableMapReduce extends TestTableMapReduceBase {
}
// Get the original value and reverse it
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();
// Now set the value to be collected

View File

@ -128,7 +128,7 @@ public abstract class TestTableMapReduceBase {
// Get the original value and reverse it
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, null));
String originalValue = Bytes.toString(value.getValue(INPUT_FAMILY, INPUT_FAMILY));
StringBuilder newValue = new StringBuilder(originalValue);
newValue.reverse();

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Store;
public class DelegatingKeyValueScanner implements KeyValueScanner {
protected KeyValueScanner delegate;
public DelegatingKeyValueScanner(KeyValueScanner delegate) {
this.delegate = delegate;
}
@Override
public Cell peek() {
return delegate.peek();
}
@Override
public Cell next() throws IOException {
return delegate.next();
}
@Override
public boolean seek(Cell key) throws IOException {
return delegate.seek(key);
}
@Override
public boolean reseek(Cell key) throws IOException {
return delegate.reseek(key);
}
@Override
public long getScannerOrder() {
return delegate.getScannerOrder();
}
@Override
public void close() {
delegate.close();
}
@Override
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
return delegate.shouldUseScanner(scan, store, oldestUnexpiredTS);
}
@Override
public boolean requestSeek(Cell kv, boolean forward, boolean useBloom) throws IOException {
return delegate.requestSeek(kv, forward, useBloom);
}
@Override
public boolean realSeekDone() {
return delegate.realSeekDone();
}
@Override
public void enforceSeek() throws IOException {
delegate.enforceSeek();
}
@Override
public boolean isFileScanner() {
return delegate.isFileScanner();
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
return delegate.backwardSeek(key);
}
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
return delegate.seekToPreviousRow(key);
}
@Override
public boolean seekToLastRow() throws IOException {
return delegate.seekToLastRow();
}
@Override
public Cell getNextIndexedKey() {
return delegate.getNextIndexedKey();
}
}