HBASE-5924 In the client code, don't wait for all the requests to be executed before resubmitting a request in error (N Keywal)

Submitted by:	N Keywal
Reviewed by:	Stack, Ted


git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1350105 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2012-06-14 06:54:18 +00:00
parent 12a1613508
commit 1c8d541b6d
10 changed files with 581 additions and 349 deletions

View File

@ -290,7 +290,9 @@ public interface HConnection extends Abortable, Closeable {
* processed successfully.
* @throws IOException if there are problems talking to META. Per-item
* exceptions are stored in the results array.
* @deprecated since 0.96 - Use {@link HTableInterface#batch} instead
*/
@Deprecated
public void processBatch(List<? extends Row> actions, final byte[] tableName,
ExecutorService pool, Object[] results)
throws IOException, InterruptedException;
@ -298,7 +300,9 @@ public interface HConnection extends Abortable, Closeable {
/**
* Parameterized batch processing, allowing varying return types for different
* {@link Row} implementations.
* @deprecated since 0.96 - Use {@link HTableInterface#batchCallback} instead
*/
@Deprecated
public <R> void processBatchCallback(List<? extends Row> list,
byte[] tableName,
ExecutorService pool,

View File

@ -27,16 +27,8 @@ import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@ -67,8 +59,6 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.AdminProtocol;
import org.apache.hadoop.hbase.client.ClientProtocol;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitorBase;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
@ -82,13 +72,8 @@ import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsResponse;
import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRequest;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
@ -1709,23 +1694,25 @@ public class HConnectionManager {
// way around.
final HConnection connection = this;
return new Callable<MultiResponse>() {
public MultiResponse call() throws IOException {
ServerCallable<MultiResponse> callable =
new ServerCallable<MultiResponse>(connection, tableName, null) {
public MultiResponse call() throws IOException {
return ProtobufUtil.multi(server, multi);
}
@Override
public void connect(boolean reload) throws IOException {
server = connection.getClient(
loc.getHostname(), loc.getPort());
}
};
return callable.withoutRetries();
}
};
public MultiResponse call() throws IOException {
ServerCallable<MultiResponse> callable =
new ServerCallable<MultiResponse>(connection, tableName, null) {
public MultiResponse call() throws IOException {
return ProtobufUtil.multi(server, multi);
}
@Override
public void connect(boolean reload) throws IOException {
server = connection.getClient(
loc.getHostname(), loc.getPort());
}
};
return callable.withoutRetries();
}
};
}
void updateCachedLocation(HRegionLocation hrl, String hostname, int port) {
HRegionLocation newHrl = new HRegionLocation(hrl.getRegionInfo(), hostname, port);
synchronized (this.cachedRegionLocations) {
@ -1741,30 +1728,20 @@ public class HConnectionManager {
}
}
private void updateCachedLocations(
UpdateHistory updateHistory, HRegionLocation hrl, Object t) {
updateCachedLocations(updateHistory, hrl, null, null, t);
}
private void updateCachedLocations(UpdateHistory updateHistory, byte[] tableName, Row row,
Object t) {
updateCachedLocations(updateHistory, null, tableName, row, t);
private void updateCachedLocations(byte[] tableName, Row row, Object t) {
updateCachedLocations(null, tableName, row, t);
}
/**
* Update the location with the new value (if the exception is a RegionMovedException) or delete
* it from the cache.
* We need to keep an history of the modifications, because we can have first an update then a
* delete. The delete would remove the update.
* @param updateHistory - The set used for the history
* @param hrl - can be null. If it's the case, tableName and row should not be null
* @param tableName - can be null if hrl is not null.
* @param row - can be null if hrl is not null.
* @param exception - An object (to simplify user code) on which we will try to find a nested
* or wrapped or both RegionMovedException
*/
private void updateCachedLocations(
UpdateHistory updateHistory, final HRegionLocation hrl, final byte[] tableName,
private void updateCachedLocations(final HRegionLocation hrl, final byte[] tableName,
Row row, final Object exception) {
if ((row == null || tableName == null) && hrl == null){
@ -1781,15 +1758,6 @@ public class HConnectionManager {
return;
}
final String regionName = myLoc.getRegionInfo().getRegionNameAsString();
if (updateHistory.contains(regionName)) {
// Already updated/deleted => nothing to do
return;
}
updateHistory.add(regionName);
final RegionMovedException rme = RegionMovedException.find(exception);
if (rme != null) {
LOG.info("Region " + myLoc.getRegionInfo().getRegionNameAsString() + " moved from " +
@ -1814,10 +1782,331 @@ public class HConnectionManager {
throw new IllegalArgumentException(
"argument results must be the same size as argument list");
}
processBatchCallback(list, tableName, pool, results, null);
}
/**
* Send the queries in parallel on the different region servers. Retries on failures.
* If the method returns it means that there is no error, and the 'results' array will
* contain no exception. On error, an exception is thrown, and the 'results' array will
* contain results and exceptions.
* @deprecated since 0.96 - Use {@link HTable#processBatchCallback} instead
*/
@Override
@Deprecated
public <R> void processBatchCallback(
List<? extends Row> list,
byte[] tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
Process<R> p = new Process<R>(this, list, tableName, pool, results, callback);
p.processBatchCallback();
}
/**
* Methods and attributes to manage a batch process are grouped into this single class.
* This allows, by creating a Process<R> per batch process to ensure multithread safety.
*
* This code should be move to HTable once processBatchCallback is not supported anymore in
* the HConnection interface.
*/
private static class Process<R> {
// Info on the queries and their context
private final HConnectionImplementation hci;
private final List<? extends Row> rows;
private final byte[] tableName;
private final ExecutorService pool;
private final Object[] results;
private final Batch.Callback<R> callback;
// Error management: these lists are filled by the errors on the final try. Indexes
// are consistent, i.e. exceptions[i] matches failedActions[i] and failedAddresses[i]
private final List<Throwable> exceptions;
private final List<Row> failedActions;
private final List<String> failedAddresses;
// Used during the batch process
private final List<Action<R>> toReplay;
private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
inProgress;
private int curNumRetries;
// Notified when a tasks is done
private final List<MultiAction<R>> finishedTasks = new ArrayList<MultiAction<R>>();
private Process(HConnectionImplementation hci, List<? extends Row> list,
byte[] tableName, ExecutorService pool, Object[] results,
Batch.Callback<R> callback){
this.hci = hci;
this.rows = list;
this.tableName = tableName;
this.pool = pool;
this.results = results;
this.callback = callback;
this.toReplay = new ArrayList<Action<R>>();
this.inProgress =
new LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>();
this.exceptions = new ArrayList<Throwable>();
this.failedActions = new ArrayList<Row>();
this.failedAddresses = new ArrayList<String>();
this.curNumRetries = 0;
}
/**
* Group a list of actions per region servers, and send them. The created MultiActions are
* added to the inProgress list.
* @param actionsList
* @param sleepTime - sleep time before actually executing the actions. Can be zero.
* @throws IOException - if we can't locate a region after multiple retries.
*/
private void submit(List<Action<R>> actionsList, final long sleepTime) throws IOException {
// group per location => regions server
final Map<HRegionLocation, MultiAction<R>> actionsByServer =
new HashMap<HRegionLocation, MultiAction<R>>();
for (Action<R> aAction : actionsList) {
final Row row = aAction.getAction();
if (row != null) {
final HRegionLocation loc = hci.locateRegion(this.tableName, row.getRow(), true);
if (loc == null) {
throw new IOException("No location found, aborting submit.");
}
final byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction<R> actions = actionsByServer.get(loc);
if (actions == null) {
actions = new MultiAction<R>();
actionsByServer.put(loc, actions);
}
actions.add(regionName, aAction);
}
}
// Send the queries and add them to the inProgress list
for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
Callable<MultiResponse> callable =
createDelayedCallable(sleepTime, e.getKey(), e.getValue());
Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p =
new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
e.getValue(), e.getKey(), this.pool.submit(callable));
this.inProgress.addLast(p);
}
}
private void addToErrorsLists(Exception ex, Row row, Triple<MultiAction<R>,
HRegionLocation, Future<MultiResponse>> obj) {
this.exceptions.add(ex);
this.failedActions.add(row);
this.failedAddresses.add(obj.getSecond().getHostnamePort());
}
/**
* Resubmit the actions which have failed, after a sleep time.
* @throws IOException
*/
private void doRetry() throws IOException{
final long sleepTime = ConnectionUtils.getPauseTime(hci.pause, this.curNumRetries);
submit(this.toReplay, sleepTime);
this.toReplay.clear();
}
/**
* Parameterized batch processing, allowing varying return types for
* different {@link Row} implementations.
* Throws an exception on error. If there are no exceptions, it means that the 'results'
* array is clean.
*/
private void processBatchCallback() throws IOException, InterruptedException {
if (this.results.length != this.rows.size()) {
throw new IllegalArgumentException(
"argument results (size="+results.length+") must be the same size as " +
"argument list (size="+this.rows.size()+")");
}
if (this.rows.isEmpty()) {
return;
}
// We keep the number of retry per action.
int[] nbRetries = new int[this.results.length];
// Build the action list. This list won't change after being created, hence the
// indexes will remain constant, allowing a direct lookup.
final List<Action<R>> listActions = new ArrayList<Action<R>>(this.rows.size());
for (int i = 0; i < this.rows.size(); i++) {
Action<R> action = new Action<R>(this.rows.get(i), i);
listActions.add(action);
}
// execute the actions. We will analyze and resubmit the actions in a 'while' loop.
submit(listActions, 0);
// LastRetry is true if, either:
// we had an exception 'DoNotRetry'
// we had more than numRetries for any action
// In this case, we will finish the current retries but we won't start new ones.
boolean lastRetry = false;
// despite its name numRetries means number of tries. So if numRetries == 1 it means we
// won't retry. And we compare vs. 2 in case someone set it to zero.
boolean noRetry = (hci.numRetries < 2);
// Analyze and resubmit until all actions are done successfully or failed after numRetries
while (!this.inProgress.isEmpty()) {
// We need the original multi action to find out what actions to replay if
// we have a 'total' failure of the Future<MultiResponse>
// We need the HRegionLocation as we give it back if we go out of retries
Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> currentTask =
removeFirstDone();
// Get the answer, keep the exception if any as we will use it for the analysis
MultiResponse responses = null;
ExecutionException exception = null;
try {
responses = currentTask.getThird().get();
} catch (ExecutionException e) {
exception = e;
}
// Error case: no result at all for this multi action. We need to redo all actions
if (responses == null) {
for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
for (Action<R> action : actions) {
Row row = action.getAction();
hci.updateCachedLocations(this.tableName, row, exception);
if (noRetry) {
addToErrorsLists(exception, row, currentTask);
} else {
lastRetry = addToReplay(nbRetries, action);
}
}
}
} else { // Success or partial success
// Analyze detailed results. We can still have individual failures to be redo.
// two specific exceptions are managed:
// - DoNotRetryIOException: we continue to retry for other actions
// - RegionMovedException: we update the cache with the new region location
for (Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
responses.getResults().entrySet()) {
for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
Action<R> correspondingAction = listActions.get(regionResult.getFirst());
Object result = regionResult.getSecond();
this.results[correspondingAction.getOriginalIndex()] = result;
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = correspondingAction.getAction();
hci.updateCachedLocations(this.tableName, row, result);
if (result instanceof DoNotRetryIOException || noRetry) {
addToErrorsLists((Exception)result, row, currentTask);
} else {
lastRetry = addToReplay(nbRetries, correspondingAction);
}
} else // success
if (callback != null) {
this.callback.update(resultsForRS.getKey(),
this.rows.get(regionResult.getFirst()).getRow(),
(R) result);
}
}
}
}
// Retry all actions in toReplay then clear it.
if (!noRetry && !toReplay.isEmpty()) {
doRetry();
if (lastRetry) {
noRetry = true;
}
}
}
if (!exceptions.isEmpty()) {
throw new RetriesExhaustedWithDetailsException(this.exceptions,
this.failedActions,
this.failedAddresses);
}
}
/**
* Put the action that has to be retried in the Replay list.
* @return true if we're out of numRetries and it's the last retry.
*/
private boolean addToReplay(int[] nbRetries, Action<R> action) {
this.toReplay.add(action);
nbRetries[action.getOriginalIndex()]++;
if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
this.curNumRetries = nbRetries[action.getOriginalIndex()];
}
// numRetries means number of tries, while curNumRetries means current number of retries. So
// we need to add 1 to make them comparable. And as we look for the last try we compare
// with '>=' and no '>'. And we need curNumRetries to means what it says as we don't want
// to initialize it to 1.
return ( (this.curNumRetries +1) >= hci.numRetries);
}
/**
* Wait for one of tasks to be done, and remove it from the list.
* @return the tasks done.
*/
private Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>
removeFirstDone() throws InterruptedException {
while (true) {
synchronized (finishedTasks) {
if (!finishedTasks.isEmpty()) {
MultiAction<R> done = finishedTasks.remove(finishedTasks.size() - 1);
// We now need to remove it from the inProgress part.
Iterator<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>> it =
inProgress.iterator();
while (it.hasNext()) {
Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> task = it.next();
if (task.getFirst() == done) { // We have the exact object. No java equals here.
it.remove();
return task;
}
}
LOG.error("Development error: We didn't see a task in the list. " +
done.getRegions());
}
finishedTasks.wait(10);
}
}
}
private Callable<MultiResponse> createDelayedCallable(
final long delay, final HRegionLocation loc, final MultiAction<R> multi) {
final Callable<MultiResponse> delegate = hci.createCallable(loc, multi, tableName);
return new Callable<MultiResponse>() {
private final long creationTime = System.currentTimeMillis();
@Override
public MultiResponse call() throws Exception {
try {
final long waitingTime = delay + creationTime - System.currentTimeMillis();
if (waitingTime > 0) {
Thread.sleep(waitingTime);
}
return delegate.call();
} finally {
synchronized (finishedTasks) {
finishedTasks.add(multi);
finishedTasks.notifyAll();
}
}
}
};
}
}
/**
* Executes the given
* {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call}
@ -1884,182 +2173,6 @@ public class HConnectionManager {
}
}
private static class UpdateHistory{
private final Set<String> updateHistory = new HashSet<String>(100); // size: if we're doing a
// rolling restart we may have 100 regions with a wrong location if we're really unlucky
public boolean contains(String regionName) {
return updateHistory.contains(regionName);
}
public void add(String regionName) {
updateHistory.add(regionName);
}
}
/**
* Parameterized batch processing, allowing varying return types for
* different {@link Row} implementations.
*/
@Override
public <R> void processBatchCallback(
List<? extends Row> list,
byte[] tableName,
ExecutorService pool,
Object[] results,
Batch.Callback<R> callback)
throws IOException, InterruptedException {
// This belongs in HTable!!! Not in here. St.Ack
// results must be the same size as list
if (results.length != list.size()) {
throw new IllegalArgumentException(
"argument results must be the same size as argument list");
}
if (list.isEmpty()) {
return;
}
// Keep track of the most recent servers for any given item for better
// exceptional reporting. We keep HRegionLocation to save on parsing.
// Later below when we use lastServers, we'll pull what we need from
// lastServers.
HRegionLocation [] lastServers = new HRegionLocation[results.length];
List<Row> workingList = new ArrayList<Row>(list);
boolean retry = true;
// count that helps presize actions array
int actionCount = 0;
for (int tries = 0; tries < numRetries && retry; ++tries) {
UpdateHistory updateHistory = new UpdateHistory();
// sleep first, if this is a retry
if (tries >= 1) {
long sleepTime = ConnectionUtils.getPauseTime(this.pause, tries);
LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
Thread.sleep(sleepTime);
}
// step 1: break up into regionserver-sized chunks and build the data structs
Map<HRegionLocation, MultiAction<R>> actionsByServer =
new HashMap<HRegionLocation, MultiAction<R>>();
for (int i = 0; i < workingList.size(); i++) {
Row row = workingList.get(i);
if (row != null) {
HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction<R> actions = actionsByServer.get(loc);
if (actions == null) {
actions = new MultiAction<R>();
actionsByServer.put(loc, actions);
}
Action<R> action = new Action<R>(row, i);
lastServers[i] = loc;
actions.add(regionName, action);
}
}
// step 2: make the requests
Map<HRegionLocation, Future<MultiResponse>> futures =
new HashMap<HRegionLocation, Future<MultiResponse>>(
actionsByServer.size());
for (Entry<HRegionLocation, MultiAction<R>> e: actionsByServer.entrySet()) {
futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
}
// step 3: collect the failures and successes and prepare for retry
for (Entry<HRegionLocation, Future<MultiResponse>> responsePerServer
: futures.entrySet()) {
HRegionLocation loc = responsePerServer.getKey();
try {
Future<MultiResponse> future = responsePerServer.getValue();
MultiResponse resp = future.get();
if (resp == null) {
// Entire server failed
LOG.debug("Failed all for server: " + loc.getHostnamePort() +
", removing from cache");
continue;
}
for (Entry<byte[], List<Pair<Integer,Object>>> e : resp.getResults().entrySet()) {
byte[] regionName = e.getKey();
List<Pair<Integer, Object>> regionResults = e.getValue();
for (Pair<Integer, Object> regionResult : regionResults) {
if (regionResult == null) {
// if the first/only record is 'null' the entire region failed.
LOG.debug("Failures for region: " +
Bytes.toStringBinary(regionName) +
", removing from cache");
} else {
// Result might be an Exception, including DNRIOE
results[regionResult.getFirst()] = regionResult.getSecond();
if (callback != null && !(regionResult.getSecond() instanceof Throwable)) {
callback.update(e.getKey(),
list.get(regionResult.getFirst()).getRow(),
(R)regionResult.getSecond());
}
}
}
}
} catch (ExecutionException e) {
LOG.debug("Failed all from " + loc, e);
updateCachedLocations(updateHistory, loc, e);
}
}
// step 4: identify failures and prep for a retry (if applicable).
// Find failures (i.e. null Result), and add them to the workingList (in
// order), so they can be retried.
retry = false;
workingList.clear();
actionCount = 0;
for (int i = 0; i < results.length; i++) {
// if null (fail) or instanceof Throwable && not instanceof DNRIOE
// then retry that row. else dont.
if (results[i] == null ||
(results[i] instanceof Throwable &&
!(results[i] instanceof DoNotRetryIOException))) {
retry = true;
actionCount++;
Row row = list.get(i);
workingList.add(row);
updateCachedLocations(updateHistory, tableName, row, results[i]);
} else {
if (results[i] != null && results[i] instanceof Throwable) {
actionCount++;
}
// add null to workingList, so the order remains consistent with the original list argument.
workingList.add(null);
}
}
}
List<Throwable> exceptions = new ArrayList<Throwable>(actionCount);
List<Row> actions = new ArrayList<Row>(actionCount);
List<String> addresses = new ArrayList<String>(actionCount);
for (int i = 0 ; i < results.length; i++) {
if (results[i] == null || results[i] instanceof Throwable) {
exceptions.add((Throwable)results[i]);
actions.add(list.get(i));
addresses.add(lastServers[i].getHostnamePort());
}
}
if (!exceptions.isEmpty()) {
throw new RetriesExhaustedWithDetailsException(exceptions,
actions,
addresses);
}
}
/*
* Return the number of cached region for a table. It will only be called
@ -2078,6 +2191,8 @@ public class HConnectionManager {
}
}
/**
* Check the region cache to see whether a region is cached yet or not.
* Called by unit tests.

View File

@ -23,6 +23,7 @@ import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
@ -707,22 +708,33 @@ public class HTable implements HTableInterface {
}
}
/**
* {@inheritDoc}
*/
@Override
public synchronized void batch(final List<?extends Row> actions, final Object[] results)
public void batch(final List<?extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
connection.processBatch(actions, tableName, pool, results);
connection.processBatchCallback(actions, tableName, pool, results, null);
}
/**
* {@inheritDoc}
*/
@Override
public synchronized Object[] batch(final List<? extends Row> actions) throws InterruptedException, IOException {
public Object[] batch(final List<? extends Row> actions)
throws InterruptedException, IOException {
Object[] results = new Object[actions.size()];
connection.processBatch(actions, tableName, pool, results);
connection.processBatchCallback(actions, tableName, pool, results, null);
return results;
}
@Override
public <R> void batchCallback(
final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
throws IOException, InterruptedException {
connection.processBatchCallback(actions, tableName, pool, results, callback);
}
@Override
public <R> Object[] batchCallback(
final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
InterruptedException {
Object[] results = new Object[actions.size()];
connection.processBatchCallback(actions, tableName, pool, results, callback);
return results;
}
@ -985,41 +997,63 @@ public class HTable implements HTableInterface {
*/
@Override
public void flushCommits() throws IOException {
Object[] results = new Object[writeBuffer.size()];
boolean success = false;
try {
Object[] results = new Object[writeBuffer.size()];
try {
this.connection.processBatch(writeBuffer, tableName, pool, results);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
// mutate list so that it is empty for complete success, or contains
// only failed records results are returned in the same order as the
// requests in list walk the list backwards, so we can remove from list
// without impacting the indexes of earlier members
for (int i = results.length - 1; i>=0; i--) {
if (results[i] instanceof Result) {
// successful Puts are removed from the list here.
writeBuffer.remove(i);
}
}
}
this.connection.processBatch(writeBuffer, tableName, pool, results);
success = true;
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} finally {
if (clearBufferOnFail) {
// mutate list so that it is empty for complete success, or contains
// only failed records. Results are returned in the same order as the
// requests in list. Walk the list backwards, so we can remove from list
// without impacting the indexes of earlier members
currentWriteBufferSize = 0;
if (success || clearBufferOnFail) {
writeBuffer.clear();
currentWriteBufferSize = 0;
} else {
// the write buffer was adjusted by processBatchOfPuts
currentWriteBufferSize = 0;
for (Put aPut : writeBuffer) {
currentWriteBufferSize += aPut.heapSize();
for (int i = results.length - 1; i >= 0; i--) {
if (results[i] instanceof Result) {
writeBuffer.remove(i);
} else {
currentWriteBufferSize += writeBuffer.get(i).heapSize();
}
}
}
}
}
/**
* {@inheritDoc}
* Process a mixed batch of Get, Put and Delete actions. All actions for a
* RegionServer are forwarded in one RPC call. Queries are executed in parallel.
*
*
* @param actions The collection of actions.
* @param results An empty array, same size as list. If an exception is thrown,
* you can test here for partial results, and to determine which actions
* processed successfully.
* @throws IOException if there are problems talking to META. Per-item
* exceptions are stored in the results array.
*/
public <R> void processBatchCallback(
final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
throws IOException, InterruptedException {
connection.processBatchCallback(list, tableName, pool, results, callback);
}
/**
* Parameterized batch processing, allowing varying return types for different
* {@link Row} implementations.
*/
public void processBatch(final List<? extends Row> list, final Object[] results)
throws IOException, InterruptedException {
this.processBatchCallback(list, results, null);
}
@Override
public void close() throws IOException {
if (this.closed) {

View File

@ -106,6 +106,23 @@ public interface HTableInterface extends Closeable {
*/
Object[] batch(final List<? extends Row> actions) throws IOException, InterruptedException;
/**
* Same as {@link #batch(List, Object[])}, but with a callback.
* @since 0.96.0
*/
public <R> void batchCallback(
final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
throws IOException, InterruptedException;
/**
* Same as {@link #batch(List)}, but with a callback.
* @since 0.96.0
*/
public <R> Object[] batchCallback(
List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
InterruptedException;
/**
* Extracts certain cells from a given row.
* @param get The object that specifies what data to fetch and from which row.

View File

@ -500,6 +500,18 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
return table.batch(actions);
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
Batch.Callback<R> callback) throws IOException, InterruptedException {
table.batchCallback(actions, results, callback);
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions,
Batch.Callback<R> callback) throws IOException, InterruptedException {
return table.batchCallback(actions, callback);
}
@Override
public Result[] get(List<Get> gets) throws IOException {
return table.get(gets);

View File

@ -693,6 +693,18 @@ public class RemoteHTable implements HTableInterface {
throw new IOException("batch not supported");
}
@Override
public <R> void batchCallback(List<? extends Row> actions, Object[] results,
Batch.Callback<R> callback) throws IOException, InterruptedException {
throw new IOException("batchCallback not supported");
}
@Override
public <R> Object[] batchCallback(List<? extends Row> actions, Batch.Callback<R> callback)
throws IOException, InterruptedException {
throw new IOException("batchCallback not supported");
}
@Override
public Result[] get(List<Get> gets) throws IOException {
throw new IOException("get(List<Get>) not supported");

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
/**
* Utility class to manage a triple.
*/
public class Triple<A, B, C> {
private A first;
private B second;
private C third;
public Triple(A first, B second, C third) {
this.first = first;
this.second = second;
this.third = third;
}
public int hashCode() {
int hashFirst = (first != null ? first.hashCode() : 0);
int hashSecond = (second != null ? second.hashCode() : 0);
int hashThird = (third != null ? third.hashCode() : 0);
return (hashFirst >> 1) ^ hashSecond ^ (hashThird << 1);
}
public boolean equals(Object obj) {
if (!(obj instanceof Triple)) {
return false;
}
Triple<?, ?, ?> otherTriple = (Triple<?, ?, ?>) obj;
if (first != otherTriple.first && (first != null && !(first.equals(otherTriple.first))))
return false;
if (second != otherTriple.second && (second != null && !(second.equals(otherTriple.second))))
return false;
if (third != otherTriple.third && (third != null && !(third.equals(otherTriple.third))))
return false;
return true;
}
public String toString() {
return "(" + first + ", " + second + "," + third + " )";
}
public A getFirst() {
return first;
}
public void setFirst(A first) {
this.first = first;
}
public B getSecond() {
return second;
}
public void setSecond(B second) {
this.second = second;
}
public C getThird() {
return third;
}
public void setThird(C third) {
this.third = third;
}
}

View File

@ -27,6 +27,7 @@ import static org.junit.Assert.assertFalse;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.SynchronousQueue;
@ -39,7 +40,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.HTable.DaemonThreadFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.rest.protobuf.generated.ScannerMessage;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.Assert;
@ -222,7 +222,12 @@ public class TestHCM {
table.put(put3);
Assert.assertFalse("Unreachable point", true);
}catch (Throwable e){
LOG.info("Put done, expected exception caught: "+e.getClass());
LOG.info("Put done, exception caught: "+e.getClass());
// Now check that we have the exception we wanted
Assert.assertTrue(e instanceof RetriesExhaustedWithDetailsException);
RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException)e;
Assert.assertTrue(re.getNumExceptions() == 1);
Assert.assertTrue(Arrays.equals(re.getRow(0).getRow(), ROW));
}
Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
Assert.assertEquals(
@ -452,6 +457,7 @@ public class TestHCM {
conn.close();
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();

View File

@ -204,6 +204,8 @@ public class TestMasterCoprocessorExceptionWithRemove {
String loadedCoprocessors = master.getLoadedCoprocessors();
assertTrue(loadedCoprocessors.equals("[" + coprocessorName + "]"));
// Verify that BuggyMasterObserver has been removed due to its misbehavior
// by creating another table: should not have a problem this time.
HTableDescriptor htd2 = new HTableDescriptor(TEST_TABLE2);

View File

@ -21,7 +21,9 @@
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.io.InterruptedIOException;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -49,47 +51,16 @@ import static org.junit.Assert.*;
*/
@Category(MediumTests.class)
public class TestRegionServerCoprocessorExceptionWithAbort {
static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);
private class zkwAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
throw new RuntimeException("Fatal ZK rs tracker error, why=", e);
}
@Override
public boolean isAborted() {
return false;
}
};
private class RSTracker extends ZooKeeperNodeTracker {
public boolean regionZKNodeWasDeleted = false;
public String rsNode;
private Thread mainThread;
public RSTracker(ZooKeeperWatcher zkw, String rsNode, Thread mainThread) {
super(zkw, rsNode, new zkwAbortable());
this.rsNode = rsNode;
this.mainThread = mainThread;
}
@Override
public synchronized void nodeDeleted(String path) {
if (path.equals(rsNode)) {
regionZKNodeWasDeleted = true;
mainThread.interrupt();
}
}
}
private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
static final int timeout = 30000;
static final Log LOG = LogFactory.getLog(TestRegionServerCoprocessorExceptionWithAbort.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final String TABLE_NAME = "observed_table";
@BeforeClass
public static void setupBeforeClass() throws Exception {
// set configure to indicate which cp should be loaded
Configuration conf = TEST_UTIL.getConfiguration();
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
BuggyRegionObserver.class.getName());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); // Let's fail fast.
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, BuggyRegionObserver.class.getName());
conf.set("hbase.coprocessor.abortonerror", "true");
TEST_UTIL.startMiniCluster(2);
}
@ -101,59 +72,31 @@ public class TestRegionServerCoprocessorExceptionWithAbort {
@Test
public void testExceptionFromCoprocessorDuringPut()
throws IOException {
throws IOException {
// When we try to write to TEST_TABLE, the buggy coprocessor will
// cause a NullPointerException, which will cause the regionserver (which
// hosts the region we attempted to write to) to abort.
byte[] TEST_TABLE = Bytes.toBytes("observed_table");
byte[] TEST_TABLE = Bytes.toBytes(TABLE_NAME);
byte[] TEST_FAMILY = Bytes.toBytes("aaa");
HTable table = TEST_UTIL.createTable(TEST_TABLE, TEST_FAMILY);
TEST_UTIL.waitUntilAllRegionsAssigned(
TEST_UTIL.createMultiRegions(table, TEST_FAMILY));
TEST_UTIL.waitUntilAllRegionsAssigned(TEST_UTIL.createMultiRegions(table, TEST_FAMILY));
// Note which regionServer will abort (after put is attempted).
final HRegionServer regionServer =
TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE);
final HRegionServer regionServer = TEST_UTIL.getRSForFirstRegionInTable(TEST_TABLE);
// add watch so we can know when this regionserver aborted.
ZooKeeperWatcher zkw = new ZooKeeperWatcher(TEST_UTIL.getConfiguration(),
"unittest", new zkwAbortable());
final byte[] ROW = Bytes.toBytes("aaa");
Put put = new Put(ROW);
put.add(TEST_FAMILY, ROW, ROW);
RSTracker rsTracker = new RSTracker(zkw,
"/hbase/rs/"+regionServer.getServerName(), Thread.currentThread());
rsTracker.start();
zkw.registerListener(rsTracker);
boolean caughtInterruption = false;
Assert.assertFalse("The region server should be available", regionServer.isAborted());
try {
final byte[] ROW = Bytes.toBytes("aaa");
Put put = new Put(ROW);
put.add(TEST_FAMILY, ROW, ROW);
table.put(put);
} catch (IOException e) {
// Depending on exact timing of the threads involved, zkw's interruption
// might be caught here ...
if (e.getCause().getClass().equals(InterruptedException.class)) {
LOG.debug("caught interruption here (during put()).");
caughtInterruption = true;
} else {
fail("put() failed: " + e);
}
fail("The put should have failed, as the coprocessor is buggy");
} catch (IOException ignored) {
// Expected.
}
if (caughtInterruption == false) {
try {
Thread.sleep(timeout);
fail("RegionServer did not abort within 30 seconds.");
} catch (InterruptedException e) {
// .. or it might be caught here.
LOG.debug("caught interruption here (during sleep()).");
caughtInterruption = true;
}
}
assertTrue("Main thread caught interruption.",caughtInterruption);
assertTrue("RegionServer aborted on coprocessor exception, as expected.",
rsTracker.regionZKNodeWasDeleted);
Assert.assertTrue("The region server should have aborted", regionServer.isAborted());
table.close();
}
@ -162,11 +105,9 @@ public class TestRegionServerCoprocessorExceptionWithAbort {
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Put put, final WALEdit edit,
final boolean writeToWAL) {
String tableName =
c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();
if (tableName.equals("observed_table")) {
Integer i = null;
i = i + 1;
String tableName = c.getEnvironment().getRegion().getRegionInfo().getTableNameAsString();
if (TABLE_NAME.equals(tableName)) {
throw new NullPointerException("Buggy coprocessor");
}
}
}
@ -175,4 +116,3 @@ public class TestRegionServerCoprocessorExceptionWithAbort {
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}