HBASE-17187 DoNotRetryExceptions from coprocessors should bubble up to the application
This commit is contained in:
parent
7a6518f05d
commit
4d730244a9
|
@ -21,8 +21,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowAfter;
|
||||||
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
|
import static org.apache.hadoop.hbase.client.ConnectionUtils.createClosestRowBefore;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
@ -55,6 +53,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implements the scanner interface for the HBase client. If there are multiple regions in a table,
|
* Implements the scanner interface for the HBase client. If there are multiple regions in a table,
|
||||||
* this scanner will iterate through them all.
|
* this scanner will iterate through them all.
|
||||||
|
@ -395,22 +395,33 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
|
|
||||||
private Result[] nextScannerWithRetries(int nbRows) throws IOException {
|
private Result[] nextScannerWithRetries(int nbRows) throws IOException {
|
||||||
|
int retriesLeft = getRetries();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
try {
|
try {
|
||||||
return nextScanner(nbRows);
|
return nextScanner(nbRows);
|
||||||
} catch (DoNotRetryIOException e) {
|
} catch (DoNotRetryIOException e) {
|
||||||
handleScanError(e, null);
|
handleScanError(e, null, retriesLeft--);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void handleScanError(DoNotRetryIOException e,
|
private void handleScanError(DoNotRetryIOException e,
|
||||||
MutableBoolean retryAfterOutOfOrderException) throws DoNotRetryIOException {
|
MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
|
||||||
// An exception was thrown which makes any partial results that we were collecting
|
// An exception was thrown which makes any partial results that we were collecting
|
||||||
// invalid. The scanner will need to be reset to the beginning of a row.
|
// invalid. The scanner will need to be reset to the beginning of a row.
|
||||||
clearPartialResults();
|
clearPartialResults();
|
||||||
// DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
|
|
||||||
// to reset the scanner and come back in again.
|
// Unfortunately, DNRIOE is used in two different semantics.
|
||||||
|
// (1) The first is to close the client scanner and bubble up the exception all the way
|
||||||
|
// to the application. This is preferred when the exception is really un-recoverable
|
||||||
|
// (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
|
||||||
|
// bucket usually.
|
||||||
|
// (2) Second semantics is to close the current region scanner only, but continue the
|
||||||
|
// client scanner by overriding the exception. This is usually UnknownScannerException,
|
||||||
|
// OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
|
||||||
|
// application-level ClientScanner has to continue without bubbling up the exception to
|
||||||
|
// the client. See RSRpcServices to see how it throws DNRIOE's.
|
||||||
|
// See also: HBASE-16604, HBASE-17187
|
||||||
|
|
||||||
// If exception is any but the list below throw it back to the client; else setup
|
// If exception is any but the list below throw it back to the client; else setup
|
||||||
// the scanner and retry.
|
// the scanner and retry.
|
||||||
|
@ -421,6 +432,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
||||||
e instanceof ScannerResetException) {
|
e instanceof ScannerResetException) {
|
||||||
// Pass. It is easier writing the if loop test as list of what is allowed rather than
|
// Pass. It is easier writing the if loop test as list of what is allowed rather than
|
||||||
// as a list of what is not allowed... so if in here, it means we do not throw.
|
// as a list of what is not allowed... so if in here, it means we do not throw.
|
||||||
|
if (retriesLeft <= 0) {
|
||||||
|
throw e; // no more retries
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
@ -483,6 +497,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
||||||
// This flag is set when we want to skip the result returned. We do
|
// This flag is set when we want to skip the result returned. We do
|
||||||
// this when we reset scanner because it split under us.
|
// this when we reset scanner because it split under us.
|
||||||
MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true);
|
MutableBoolean retryAfterOutOfOrderException = new MutableBoolean(true);
|
||||||
|
// Even if we are retrying due to UnknownScannerException, ScannerResetException, etc. we should
|
||||||
|
// make sure that we are not retrying indefinitely.
|
||||||
|
int retriesLeft = getRetries();
|
||||||
for (;;) {
|
for (;;) {
|
||||||
try {
|
try {
|
||||||
// Server returns a null values if scanning is to stop. Else,
|
// Server returns a null values if scanning is to stop. Else,
|
||||||
|
@ -511,7 +528,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
|
||||||
}
|
}
|
||||||
retryAfterOutOfOrderException.setValue(true);
|
retryAfterOutOfOrderException.setValue(true);
|
||||||
} catch (DoNotRetryIOException e) {
|
} catch (DoNotRetryIOException e) {
|
||||||
handleScanError(e, retryAfterOutOfOrderException);
|
handleScanError(e, retryAfterOutOfOrderException, retriesLeft--);
|
||||||
// reopen the scanner
|
// reopen the scanner
|
||||||
values = nextScannerWithRetries(countdown);
|
values = nextScannerWithRetries(countdown);
|
||||||
if (values == null) {
|
if (values == null) {
|
||||||
|
|
|
@ -2138,7 +2138,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
if (region.getCoprocessorHost() != null) {
|
if (region.getCoprocessorHost() != null) {
|
||||||
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
|
bypass = region.getCoprocessorHost().preBulkLoadHFile(familyPaths);
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (!bypass) {
|
if (!bypass) {
|
||||||
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
|
map = region.bulkLoadHFiles(familyPaths, request.getAssignSeqNum(), null,
|
||||||
request.getCopyFile());
|
request.getCopyFile());
|
||||||
|
@ -3090,7 +3090,17 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
|
||||||
// row that the client has last seen.
|
// row that the client has last seen.
|
||||||
closeScanner(region, scanner, scannerName, context);
|
closeScanner(region, scanner, scannerName, context);
|
||||||
|
|
||||||
// rethrow DoNotRetryIOException. This can avoid the retry in ClientScanner.
|
// If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
|
||||||
|
// used in two different semantics.
|
||||||
|
// (1) The first is to close the client scanner and bubble up the exception all the way
|
||||||
|
// to the application. This is preferred when the exception is really un-recoverable
|
||||||
|
// (like CorruptHFileException, etc). Plain DoNotRetryIOException also falls into this
|
||||||
|
// bucket usually.
|
||||||
|
// (2) Second semantics is to close the current region scanner only, but continue the
|
||||||
|
// client scanner by overriding the exception. This is usually UnknownScannerException,
|
||||||
|
// OutOfOrderScannerNextException, etc where the region scanner has to be closed, but the
|
||||||
|
// application-level ClientScanner has to continue without bubbling up the exception to
|
||||||
|
// the client. See ClientScanner code to see how it deals with these special exceptions.
|
||||||
if (e instanceof DoNotRetryIOException) {
|
if (e instanceof DoNotRetryIOException) {
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,6 +43,7 @@ import java.util.UUID;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
@ -52,6 +53,7 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -71,6 +73,7 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
|
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
|
||||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
|
||||||
|
import org.apache.hadoop.hbase.exceptions.ScannerResetException;
|
||||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||||
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
|
||||||
|
@ -546,6 +549,15 @@ public class TestFromClientSide {
|
||||||
*/
|
*/
|
||||||
public static class ExceptionInReseekRegionObserver extends BaseRegionObserver {
|
public static class ExceptionInReseekRegionObserver extends BaseRegionObserver {
|
||||||
static AtomicLong reqCount = new AtomicLong(0);
|
static AtomicLong reqCount = new AtomicLong(0);
|
||||||
|
static AtomicBoolean isDoNotRetry = new AtomicBoolean(false); // whether to throw DNRIOE
|
||||||
|
static AtomicBoolean throwOnce = new AtomicBoolean(true); // whether to only throw once
|
||||||
|
|
||||||
|
static void reset() {
|
||||||
|
reqCount.set(0);
|
||||||
|
isDoNotRetry.set(false);
|
||||||
|
throwOnce.set(true);
|
||||||
|
}
|
||||||
|
|
||||||
class MyStoreScanner extends StoreScanner {
|
class MyStoreScanner extends StoreScanner {
|
||||||
public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
|
public MyStoreScanner(Store store, ScanInfo scanInfo, Scan scan, NavigableSet<byte[]> columns,
|
||||||
long readPt) throws IOException {
|
long readPt) throws IOException {
|
||||||
|
@ -561,8 +573,13 @@ public class TestFromClientSide {
|
||||||
newScanners.add(new DelegatingKeyValueScanner(scanner) {
|
newScanners.add(new DelegatingKeyValueScanner(scanner) {
|
||||||
@Override
|
@Override
|
||||||
public boolean reseek(Cell key) throws IOException {
|
public boolean reseek(Cell key) throws IOException {
|
||||||
if (reqCount.incrementAndGet() == 1) {
|
reqCount.incrementAndGet();
|
||||||
throw new IOException("Injected exception");
|
if (!throwOnce.get()|| reqCount.get() == 1) {
|
||||||
|
if (isDoNotRetry.get()) {
|
||||||
|
throw new DoNotRetryIOException("Injected exception");
|
||||||
|
} else {
|
||||||
|
throw new IOException("Injected exception");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return super.reseek(key);
|
return super.reseek(key);
|
||||||
}
|
}
|
||||||
|
@ -596,6 +613,8 @@ public class TestFromClientSide {
|
||||||
HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
|
HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
|
||||||
htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
|
htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
|
||||||
TEST_UTIL.getAdmin().createTable(htd);
|
TEST_UTIL.getAdmin().createTable(htd);
|
||||||
|
ExceptionInReseekRegionObserver.reset();
|
||||||
|
ExceptionInReseekRegionObserver.throwOnce.set(true); // throw exceptions only once
|
||||||
try (Table t = TEST_UTIL.getConnection().getTable(name)) {
|
try (Table t = TEST_UTIL.getConnection().getTable(name)) {
|
||||||
int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
|
int rowCount = TEST_UTIL.loadTable(t, FAMILY, false);
|
||||||
TEST_UTIL.getAdmin().flush(name);
|
TEST_UTIL.getAdmin().flush(name);
|
||||||
|
@ -605,6 +624,60 @@ public class TestFromClientSide {
|
||||||
assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
|
assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case where a coprocessor throws a DoNotRetryIOException in the scan. The expectation
|
||||||
|
* is that the exception will bubble up to the client scanner instead of being retried.
|
||||||
|
*/
|
||||||
|
@Test (timeout = 180000)
|
||||||
|
public void testScannerThrowsExceptionWhenCoprocessorThrowsDNRIOE()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
|
||||||
|
TableName name = TableName.valueOf("testClientScannerIsNotRetriedWhenCoprocessorThrowsDNRIOE");
|
||||||
|
|
||||||
|
HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
|
||||||
|
htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
|
||||||
|
TEST_UTIL.getAdmin().createTable(htd);
|
||||||
|
ExceptionInReseekRegionObserver.reset();
|
||||||
|
ExceptionInReseekRegionObserver.isDoNotRetry.set(true);
|
||||||
|
try (Table t = TEST_UTIL.getConnection().getTable(name)) {
|
||||||
|
TEST_UTIL.loadTable(t, FAMILY, false);
|
||||||
|
TEST_UTIL.getAdmin().flush(name);
|
||||||
|
TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
|
||||||
|
fail("Should have thrown an exception");
|
||||||
|
} catch (DoNotRetryIOException expected) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
assertTrue(ExceptionInReseekRegionObserver.reqCount.get() > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the case where a coprocessor throws a regular IOException in the scan. The expectation
|
||||||
|
* is that the we will keep on retrying, but fail after the retries are exhausted instead of
|
||||||
|
* retrying indefinitely.
|
||||||
|
*/
|
||||||
|
@Test (timeout = 180000)
|
||||||
|
public void testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE()
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
|
||||||
|
TableName name = TableName.valueOf("testScannerFailsAfterRetriesWhenCoprocessorThrowsIOE");
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 3);
|
||||||
|
HTableDescriptor htd = TEST_UTIL.createTableDescriptor(name, FAMILY);
|
||||||
|
htd.addCoprocessor(ExceptionInReseekRegionObserver.class.getName());
|
||||||
|
TEST_UTIL.getAdmin().createTable(htd);
|
||||||
|
ExceptionInReseekRegionObserver.reset();
|
||||||
|
ExceptionInReseekRegionObserver.throwOnce.set(false); // throw exceptions in every retry
|
||||||
|
try (Table t = TEST_UTIL.getConnection().getTable(name)) {
|
||||||
|
TEST_UTIL.loadTable(t, FAMILY, false);
|
||||||
|
TEST_UTIL.getAdmin().flush(name);
|
||||||
|
TEST_UTIL.countRows(t, new Scan().addColumn(FAMILY, FAMILY));
|
||||||
|
fail("Should have thrown an exception");
|
||||||
|
} catch (DoNotRetryIOException expected) {
|
||||||
|
assertTrue(expected instanceof ScannerResetException);
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
assertTrue(ExceptionInReseekRegionObserver.reqCount.get() >= 3);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* @param key
|
* @param key
|
||||||
* @return Scan with RowFilter that does LESS than passed key.
|
* @return Scan with RowFilter that does LESS than passed key.
|
||||||
|
@ -5454,7 +5527,7 @@ public class TestFromClientSide {
|
||||||
put.addColumn(FAMILY, QUALIFIER, VALUE);
|
put.addColumn(FAMILY, QUALIFIER, VALUE);
|
||||||
table.put(put);
|
table.put(put);
|
||||||
|
|
||||||
List<Result> scanResults = new LinkedList<>();
|
List<Result> scanResults = new LinkedList<>();
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.setFilter(new FilterList());
|
scan.setFilter(new FilterList());
|
||||||
try (ResultScanner scanner = table.getScanner(scan)) {
|
try (ResultScanner scanner = table.getScanner(scan)) {
|
||||||
|
|
Loading…
Reference in New Issue