HBASE-6295 Possible performance improvement in client batch operations: presplit and send in background

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1496156 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
nkeywal 2013-06-24 18:41:55 +00:00
parent 6847d07934
commit fe92821ec3
7 changed files with 284 additions and 584 deletions

View File

@ -32,7 +32,6 @@ public class Action<R> implements Comparable<R> {
// TODO: This class should not be visible outside of the client package.
private Row action;
private int originalIndex;
private R result;
public Action(Row action, int originalIndex) {
super();
@ -40,13 +39,6 @@ public class Action<R> implements Comparable<R> {
this.originalIndex = originalIndex;
}
public R getResult() {
return result;
}
public void setResult(R result) {
this.result = result;
}
public Row getAction() {
return action;

View File

@ -171,6 +171,17 @@ public interface HConnection extends Abortable, Closeable {
final byte [] row)
throws IOException;
/**
* Update the location cache. This is used internally by HBase, in most cases it should not be
* used by the client application.
* @param tableName the table name
* @param rowkey the row
* @param exception the exception if any. Can be null.
* @param source the previous location
*/
public void updateCachedLocations(byte[] tableName, byte[] rowkey,
Object exception, HRegionLocation source);
/**
* Gets the location of the region of <i>regionName</i>.
* @param regionName name of the region to locate
@ -354,7 +365,7 @@ public interface HConnection extends Abortable, Closeable {
/**
* Clear any caches that pertain to server name <code>sn</code>
* Clear any caches that pertain to server name <code>sn</code>.
* @param sn A server name
*/
public void clearCaches(final ServerName sn);

View File

@ -26,20 +26,16 @@ import java.net.SocketException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -141,16 +137,13 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.IsMasterRunningRe
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.SoftValueSortedMap;
import org.apache.hadoop.hbase.util.Triple;
import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@ -440,8 +433,6 @@ public class HConnectionManager {
private final int numTries;
final int rpcTimeout;
private final int prefetchRegionLimit;
private final boolean useServerTrackerForRetries;
private final long serverTrackerTimeout;
private volatile boolean closed;
private volatile boolean aborted;
@ -490,12 +481,12 @@ public class HConnectionManager {
private int refCount;
// indicates whether this connection's life cycle is managed (by us)
private final boolean managed;
private boolean managed;
/**
* Cluster registry of basic info such as clusterid and meta region location.
*/
final Registry registry;
Registry registry;
/**
* constructor
@ -509,34 +500,8 @@ public class HConnectionManager {
* users of an HConnectionImplementation instance.
*/
HConnectionImplementation(Configuration conf, boolean managed) throws IOException {
this.conf = conf;
this(conf);
this.managed = managed;
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.prefetchRegionLimit = conf.getInt(
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
this.useServerTrackerForRetries = conf.getBoolean(RETRIES_BY_SERVER_KEY, true);
long serverTrackerTimeout = 0;
if (this.useServerTrackerForRetries) {
// Server tracker allows us to do faster, and yet useful (hopefully), retries.
// However, if we are too useful, we might fail very quickly due to retry count limit.
// To avoid this, we are going to cheat for now (see HBASE-7659), and calculate maximum
// retry time if normal retries were used. Then we will retry until this time runs out.
// If we keep hitting one server, the net effect will be the incremental backoff, and
// essentially the same number of retries as planned. If we have to do faster retries,
// we will do more retries in aggregate, but the user will be none the wiser.
for (int i = 0; i < this.numTries; ++i) {
serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
}
}
this.serverTrackerTimeout = serverTrackerTimeout;
this.registry = setupRegistry();
retrieveClusterId();
@ -561,6 +526,24 @@ public class HConnectionManager {
}
}
/**
* For tests.
*/
protected HConnectionImplementation(Configuration conf) {
this.conf = conf;
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.rpcTimeout = conf.getInt(
HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.prefetchRegionLimit = conf.getInt(
HConstants.HBASE_CLIENT_PREFETCH_LIMIT,
HConstants.DEFAULT_HBASE_CLIENT_PREFETCH_LIMIT);
}
/**
* @return The cluster registry implementation to use.
* @throws IOException
@ -1982,23 +1965,6 @@ public class HConnectionManager {
return callable.withoutRetries();
}
@Deprecated
private <R> Callable<MultiResponse> createCallable(final HRegionLocation loc,
final MultiAction<R> multi, final byte[] tableName) {
// TODO: This does not belong in here!!! St.Ack HConnections should
// not be dealing in Callables; Callables have HConnections, not other
// way around.
final HConnection connection = this;
return new Callable<MultiResponse>() {
@Override
public MultiResponse call() throws Exception {
ServerCallable<MultiResponse> callable =
new MultiServerCallable<R>(connection, tableName, loc, multi);
return callable.withoutRetries();
}
};
}
void updateCachedLocation(HRegionInfo hri, HRegionLocation source,
ServerName serverName, long seqNum) {
HRegionLocation newHrl = new HRegionLocation(hri, serverName, seqNum);
@ -2058,16 +2024,17 @@ public class HConnectionManager {
* or wrapped or both RegionMovedException
* @param source server that is the source of the location update.
*/
private void updateCachedLocations(final byte[] tableName, Row row,
@Override
public void updateCachedLocations(final byte[] tableName, byte[] rowkey,
final Object exception, final HRegionLocation source) {
if (row == null || tableName == null) {
LOG.warn("Coding error, see method javadoc. row=" + (row == null ? "null" : row) +
if (rowkey == null || tableName == null) {
LOG.warn("Coding error, see method javadoc. row=" + (rowkey == null ? "null" : rowkey) +
", tableName=" + (tableName == null ? "null" : Bytes.toString(tableName)));
return;
}
// Is it something we have already updated?
final HRegionLocation oldLocation = getCachedLocation(tableName, row.getRow());
final HRegionLocation oldLocation = getCachedLocation(tableName, rowkey);
if (oldLocation == null) {
// There is no such location in the cache => it's been removed already => nothing to do
return;
@ -2125,365 +2092,62 @@ public class HConnectionManager {
Batch.Callback<R> callback)
throws IOException, InterruptedException {
Process<R> p = new Process<R>(this, list, tableName, pool, results, callback);
p.processBatchCallback();
// To fulfill the original contract, we have a special callback. This callback
// will set the results in the Object array.
ObjectResultFiller<R> cb = new ObjectResultFiller<R>(results, callback);
AsyncProcess<?> asyncProcess = createAsyncProcess(tableName, pool, cb, conf);
// We're doing a submit all. This way, the originalIndex will match the initial list.
asyncProcess.submitAll(list);
asyncProcess.waitUntilDone();
if (asyncProcess.hasError()) {
throw asyncProcess.getErrors();
}
}
// For tests.
protected <R> AsyncProcess createAsyncProcess(byte[] tableName, ExecutorService pool,
AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
return new AsyncProcess<R>(this, tableName, pool, callback, conf);
}
/**
* Methods and attributes to manage a batch process are grouped into this single class.
* This allows, by creating a Process<R> per batch process to ensure multithread safety.
*
* This code should be move to HTable once processBatchCallback is not supported anymore in
* the HConnection interface.
* Fill the result array for the interfaces using it.
*/
private static class Process<R> {
// Info on the queries and their context
private final HConnectionImplementation hci;
private final List<? extends Row> rows;
private final byte[] tableName;
private final ExecutorService pool;
private static class ObjectResultFiller<Res>
implements AsyncProcess.AsyncProcessCallback<Res> {
private final Object[] results;
private final Batch.Callback<R> callback;
private Batch.Callback<Res> callback;
// Used during the batch process
private final List<Action<R>> toReplay;
private final LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>
inProgress;
private ServerErrorTracker errorsByServer = null;
private int curNumRetries;
// Notified when a tasks is done
private final List<MultiAction<R>> finishedTasks = new ArrayList<MultiAction<R>>();
private Process(HConnectionImplementation hci, List<? extends Row> list,
byte[] tableName, ExecutorService pool, Object[] results,
Batch.Callback<R> callback){
this.hci = hci;
this.rows = list;
this.tableName = tableName;
this.pool = pool;
ObjectResultFiller(Object[] results, Batch.Callback<Res> callback) {
this.results = results;
this.callback = callback;
this.toReplay = new ArrayList<Action<R>>();
this.inProgress =
new LinkedList<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>>();
this.curNumRetries = 0;
}
/**
* Group a list of actions per region servers, and send them. The created MultiActions are
* added to the inProgress list.
* @param actionsList
* @param isRetry Whether we are retrying these actions. If yes, backoff
* time may be applied before new requests.
* @throws IOException - if we can't locate a region after multiple retries.
*/
private void submit(List<Action<R>> actionsList, final boolean isRetry) throws IOException {
// group per location => regions server
final Map<HRegionLocation, MultiAction<R>> actionsByServer =
new HashMap<HRegionLocation, MultiAction<R>>();
for (Action<R> aAction : actionsList) {
final Row row = aAction.getAction();
if (row != null) {
final HRegionLocation loc = hci.locateRegion(this.tableName, row.getRow());
if (loc == null) {
throw new IOException("No location found, aborting submit.");
}
final byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction<R> actions = actionsByServer.get(loc);
if (actions == null) {
actions = new MultiAction<R>();
actionsByServer.put(loc, actions);
}
actions.add(regionName, aAction);
}
}
// Send the queries and add them to the inProgress list
for (Entry<HRegionLocation, MultiAction<R>> e : actionsByServer.entrySet()) {
long backoffTime = 0;
if (isRetry) {
if (hci.isUsingServerTrackerForRetries()) {
assert this.errorsByServer != null;
backoffTime = this.errorsByServer.calculateBackoffTime(e.getKey(), hci.pause);
} else {
// curNumRetries starts with one, subtract to start from 0.
backoffTime = ConnectionUtils.getPauseTime(hci.pause, curNumRetries - 1);
}
}
Callable<MultiResponse> callable =
createDelayedCallable(backoffTime, e.getKey(), e.getValue());
if (LOG.isTraceEnabled() && isRetry) {
StringBuilder sb = new StringBuilder();
for (Action<R> action : e.getValue().allActions()) {
if (sb.length() > 0) sb.append(' ');
sb.append(Bytes.toStringBinary(action.getAction().getRow()));
}
LOG.trace("Attempt #" + this.curNumRetries + " against " + e.getKey().getHostnamePort()
+ " after=" + backoffTime + "ms, row(s)=" + sb.toString());
}
Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> p =
new Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>(
e.getValue(), e.getKey(), this.pool.submit(callable));
this.inProgress.addLast(p);
}
}
/**
* Resubmit the actions which have failed, after a sleep time.
* @throws IOException
*/
private void doRetry() throws IOException{
submit(this.toReplay, true);
this.toReplay.clear();
}
/**
* Parameterized batch processing, allowing varying return types for
* different {@link Row} implementations.
* Throws an exception on error. If there are no exceptions, it means that the 'results'
* array is clean.
*/
private void processBatchCallback() throws IOException, InterruptedException {
if (this.results.length != this.rows.size()) {
throw new IllegalArgumentException(
"argument results (size="+results.length+") must be the same size as " +
"argument list (size="+this.rows.size()+")");
}
if (this.rows.isEmpty()) {
return;
}
boolean isTraceEnabled = LOG.isTraceEnabled();
BatchErrors errors = new BatchErrors();
BatchErrors retriedErrors = null;
if (isTraceEnabled) {
retriedErrors = new BatchErrors();
}
// We keep the number of retry per action.
int[] nbRetries = new int[this.results.length];
// Build the action list. This list won't change after being created, hence the
// indexes will remain constant, allowing a direct lookup.
final List<Action<R>> listActions = new ArrayList<Action<R>>(this.rows.size());
for (int i = 0; i < this.rows.size(); i++) {
Action<R> action = new Action<R>(this.rows.get(i), i);
listActions.add(action);
}
// execute the actions. We will analyze and resubmit the actions in a 'while' loop.
submit(listActions, false);
// LastRetry is true if, either:
// we had an exception 'DoNotRetry'
// we had more than numRetries for any action
// In this case, we will finish the current retries but we won't start new ones.
boolean lastRetry = false;
// If hci.numTries is 1 or 0, we do not retry.
boolean noRetry = (hci.numTries < 2);
// Analyze and resubmit until all actions are done successfully or failed after numTries
while (!this.inProgress.isEmpty()) {
// We need the original multi action to find out what actions to replay if
// we have a 'total' failure of the Future<MultiResponse>
// We need the HRegionLocation as we give it back if we go out of retries
Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> currentTask =
removeFirstDone();
// Get the answer, keep the exception if any as we will use it for the analysis
MultiResponse responses = null;
ExecutionException exception = null;
try {
responses = currentTask.getThird().get();
} catch (ExecutionException e) {
exception = e;
}
HRegionLocation location = currentTask.getSecond();
// Error case: no result at all for this multi action. We need to redo all actions
if (responses == null) {
for (List<Action<R>> actions : currentTask.getFirst().actions.values()) {
for (Action<R> action : actions) {
Row row = action.getAction();
// Do not use the exception for updating cache because it might be coming from
// any of the regions in the MultiAction.
hci.updateCachedLocations(tableName, row, null, location);
if (noRetry) {
errors.add(exception, row, location);
} else {
if (isTraceEnabled) {
retriedErrors.add(exception, row, location);
}
lastRetry = addToReplay(nbRetries, action, location);
}
}
}
} else { // Success or partial success
// Analyze detailed results. We can still have individual failures to be redo.
// two specific exceptions are managed:
// - DoNotRetryIOException: we continue to retry for other actions
// - RegionMovedException: we update the cache with the new region location
for (Entry<byte[], List<Pair<Integer, Object>>> resultsForRS :
responses.getResults().entrySet()) {
for (Pair<Integer, Object> regionResult : resultsForRS.getValue()) {
Action<R> correspondingAction = listActions.get(regionResult.getFirst());
Object result = regionResult.getSecond();
this.results[correspondingAction.getOriginalIndex()] = result;
// Failure: retry if it's make sense else update the errors lists
if (result == null || result instanceof Throwable) {
Row row = correspondingAction.getAction();
hci.updateCachedLocations(this.tableName, row, result, location);
if (result instanceof DoNotRetryIOException || noRetry) {
errors.add((Exception)result, row, location);
} else {
if (isTraceEnabled) {
retriedErrors.add((Exception)result, row, location);
}
lastRetry = addToReplay(nbRetries, correspondingAction, location);
}
} else // success
if (callback != null) {
this.callback.update(resultsForRS.getKey(),
this.rows.get(regionResult.getFirst()).getRow(), (R) result);
}
}
}
}
// Retry all actions in toReplay then clear it.
if (!noRetry && !toReplay.isEmpty()) {
if (isTraceEnabled) {
LOG.trace("Retrying #" + this.curNumRetries +
(lastRetry ? " (one last time)": "") + " because " +
retriedErrors.getDescriptionAndClear());
}
doRetry();
if (lastRetry) {
noRetry = true;
}
}
}
errors.rethrowIfAny();
}
private class BatchErrors {
private List<Throwable> exceptions = new ArrayList<Throwable>();
private List<Row> actions = new ArrayList<Row>();
private List<String> addresses = new ArrayList<String>();
public void add(Exception ex, Row row, HRegionLocation location) {
exceptions.add(ex);
actions.add(row);
addresses.add(location.getHostnamePort());
}
public void rethrowIfAny() throws RetriesExhaustedWithDetailsException {
if (!exceptions.isEmpty()) {
throw makeException();
}
}
public String getDescriptionAndClear(){
if (exceptions.isEmpty()) {
return "";
}
String result = makeException().getExhaustiveDescription();
exceptions.clear();
actions.clear();
addresses.clear();
return result;
}
private RetriesExhaustedWithDetailsException makeException() {
return new RetriesExhaustedWithDetailsException(exceptions, actions, addresses);
}
}
/**
* Put the action that has to be retried in the Replay list.
* @return true if we're out of numRetries and it's the last retry.
*/
private boolean addToReplay(int[] nbRetries, Action<R> action, HRegionLocation source) {
this.toReplay.add(action);
nbRetries[action.getOriginalIndex()]++;
if (nbRetries[action.getOriginalIndex()] > this.curNumRetries) {
this.curNumRetries = nbRetries[action.getOriginalIndex()];
}
if (hci.isUsingServerTrackerForRetries()) {
if (this.errorsByServer == null) {
this.errorsByServer = hci.createServerErrorTracker();
}
this.errorsByServer.reportServerError(source);
return !this.errorsByServer.canRetryMore();
} else {
// We need to add 1 to make tries and retries comparable. And as we look for
// the last try we compare with '>=' and not '>'. And we need curNumRetries
// to means what it says as we don't want to initialize it to 1.
return ((this.curNumRetries + 1) >= hci.numTries);
}
}
/**
* Wait for one of tasks to be done, and remove it from the list.
* @return the tasks done.
*/
private Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>
removeFirstDone() throws InterruptedException {
while (true) {
synchronized (finishedTasks) {
if (!finishedTasks.isEmpty()) {
MultiAction<R> done = finishedTasks.remove(finishedTasks.size() - 1);
// We now need to remove it from the inProgress part.
Iterator<Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>>> it =
inProgress.iterator();
while (it.hasNext()) {
Triple<MultiAction<R>, HRegionLocation, Future<MultiResponse>> task = it.next();
if (task.getFirst() == done) { // We have the exact object. No java equals here.
it.remove();
return task;
}
}
LOG.error("Development error: We didn't see a task in the list. " +
done.getRegions());
}
finishedTasks.wait(10);
}
}
}
private Callable<MultiResponse> createDelayedCallable(
final long delay, final HRegionLocation loc, final MultiAction<R> multi) {
final Callable<MultiResponse> delegate = hci.createCallable(loc, multi, tableName);
return new Callable<MultiResponse>() {
private final long creationTime = System.currentTimeMillis();
@Override
public MultiResponse call() throws Exception {
try {
final long waitingTime = delay + creationTime - System.currentTimeMillis();
if (waitingTime > 0) {
Thread.sleep(waitingTime);
}
return delegate.call();
} finally {
synchronized (finishedTasks) {
finishedTasks.add(multi);
finishedTasks.notifyAll();
public void success(int pos, byte[] region, Row row, Res result) {
assert pos < results.length;
results[pos] = result;
if (callback != null) {
callback.update(region, row.getRow(), result);
}
}
@Override
public boolean failure(int pos, byte[] region, Row row, Throwable t) {
assert pos < results.length;
results[pos] = t;
//Batch.Callback<Res> was not called on failure in 0.94. We keep this.
return true; // we want to have this failure in the failures list.
}
};
@Override
public boolean retriableFailure(int originalIndex, Row row, byte[] region,
Throwable exception) {
return true; // we retry
}
}
@ -2701,14 +2365,15 @@ public class HConnectionManager {
}
throw new TableNotFoundException(Bytes.toString(tableName));
}
}
/**
* The record of errors for servers. Visible for testing.
* The record of errors for servers.
*/
@VisibleForTesting
static class ServerErrorTracker {
private final Map<HRegionLocation, ServerErrors> errorsByServer =
new HashMap<HRegionLocation, ServerErrors>();
// We need a concurrent map here, as we could have multiple threads updating it in parallel.
private final ConcurrentMap<HRegionLocation, ServerErrors> errorsByServer =
new ConcurrentHashMap<HRegionLocation, ServerErrors>();
private long canRetryUntil = 0;
public ServerErrorTracker(long timeout) {
@ -2722,7 +2387,7 @@ public class HConnectionManager {
/**
* Calculates the back-off time for a retrying request to a particular server.
* This is here, and package private, for testing (no good way to get at it).
*
* @param server The server in question.
* @param basePause The default hci pause.
* @return The time to wait before sending next request.
@ -2748,7 +2413,7 @@ public class HConnectionManager {
/**
* Reports that there was an error on the server to do whatever bean-counting necessary.
* This is here, and package private, for testing (no good way to get at it).
*
* @param server The server in question.
*/
void reportServerError(HRegionLocation server) {
@ -2783,20 +2448,6 @@ public class HConnectionManager {
}
}
public boolean isUsingServerTrackerForRetries() {
return this.useServerTrackerForRetries;
}
/**
* Creates the server error tracker to use inside process.
* Currently, to preserve the main assumption about current retries, and to work well with
* the retry-limit-based calculation, the calculation is local per Process object.
* We may benefit from connection-wide tracking of server errors.
* @return ServerErrorTracker to use.
*/
ServerErrorTracker createServerErrorTracker() {
return new ServerErrorTracker(this.serverTrackerTimeout);
}
}
/**
* Set the number of retries to use serverside when trying to communicate
* with another server over {@link HConnection}. Used updating catalog

View File

@ -59,6 +59,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -116,14 +117,14 @@ import java.util.concurrent.TimeUnit;
@InterfaceStability.Stable
public class HTable implements HTableInterface {
private static final Log LOG = LogFactory.getLog(HTable.class);
private HConnection connection;
protected HConnection connection;
private final byte [] tableName;
private volatile Configuration configuration;
private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
protected List<Row> writeAsyncBuffer = new LinkedList<Row>();
private long writeBufferSize;
private boolean clearBufferOnFail;
private boolean autoFlush;
private long currentWriteBufferSize;
protected long currentWriteBufferSize;
protected int scannerCaching;
private int maxKeyValueSize;
private ExecutorService pool; // For Multi
@ -132,6 +133,9 @@ public class HTable implements HTableInterface {
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close()
/** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess<Object> ap;
/**
* Creates an object to access a HBase table.
* Shares zookeeper connection and other resources with other HTable instances
@ -238,6 +242,15 @@ public class HTable implements HTableInterface {
this.finishSetup();
}
/**
* For internal testing.
*/
protected HTable(){
tableName = new byte[]{};
cleanupPoolOnClose = false;
cleanupConnectionOnClose = false;
}
/**
* setup this HTable's parameter based on the passed configuration
*/
@ -257,11 +270,15 @@ public class HTable implements HTableInterface {
HConstants.HBASE_CLIENT_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
ap = new AsyncProcess<Object>(connection, tableName, pool, null, configuration);
this.maxKeyValueSize = this.configuration.getInt(
"hbase.client.keyvalue.maxsize", -1);
this.closed = false;
}
/**
* {@inheritDoc}
*/
@ -396,6 +413,15 @@ public class HTable implements HTableInterface {
return scannerCaching;
}
/**
* Kept in 0.96 for backward compatibility
* @deprecated since 0.96. This is an internal buffer that should not be read nor write.
*/
@Deprecated
public List<Row> getWriteBuffer() {
return writeAsyncBuffer;
}
/**
* Sets the number of rows that a scanner will fetch at once.
* <p>
@ -647,15 +673,13 @@ public class HTable implements HTableInterface {
@Override
public void batch(final List<?extends Row> actions, final Object[] results)
throws InterruptedException, IOException {
connection.processBatchCallback(actions, tableName, pool, results, null);
batchCallback(actions, results, null);
}
@Override
public Object[] batch(final List<? extends Row> actions)
throws InterruptedException, IOException {
Object[] results = new Object[actions.size()];
connection.processBatchCallback(actions, tableName, pool, results, null);
return results;
return batchCallback(actions, null);
}
@Override
@ -670,7 +694,7 @@ public class HTable implements HTableInterface {
final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
InterruptedException {
Object[] results = new Object[actions.size()];
connection.processBatchCallback(actions, tableName, pool, results, callback);
batchCallback(actions, results, callback);
return results;
}
@ -702,7 +726,7 @@ public class HTable implements HTableInterface {
throws IOException {
Object[] results = new Object[deletes.size()];
try {
connection.processBatch((List) deletes, tableName, pool, results);
batch(deletes, results);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
@ -722,7 +746,8 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
public void put(final Put put) throws IOException {
public void put(final Put put)
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
doPut(put);
if (autoFlush) {
flushCommits();
@ -733,7 +758,8 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
public void put(final List<Put> puts) throws IOException {
public void put(final List<Put> puts)
throws InterruptedIOException, RetriesExhaustedWithDetailsException {
for (Put put : puts) {
doPut(put);
}
@ -742,12 +768,64 @@ public class HTable implements HTableInterface {
}
}
private void doPut(Put put) throws IOException{
/**
* Add the put to the buffer. If the buffer is already too large, sends the buffer to the
* cluster.
* @throws RetriesExhaustedWithDetailsException if there is an error on the cluster.
* @throws InterruptedIOException if we were interrupted.
*/
private void doPut(Put put) throws InterruptedIOException, RetriesExhaustedWithDetailsException {
if (ap.hasError()){
backgroundFlushCommits(true);
}
validatePut(put);
writeBuffer.add(put);
currentWriteBufferSize += put.heapSize();
if (currentWriteBufferSize > writeBufferSize) {
flushCommits();
writeAsyncBuffer.add(put);
while (currentWriteBufferSize > writeBufferSize) {
backgroundFlushCommits(false);
}
}
/**
* Send the operations in the buffer to the servers. Does not wait for the server's answer.
* If the is an error (max retried reach from a previous flush or bad operation), it tries to
* send all operations in the buffer and sends an exception.
*/
private void backgroundFlushCommits(boolean synchronous) throws
InterruptedIOException, RetriesExhaustedWithDetailsException {
try {
// If there is an error on the operations in progress, we don't add new operations.
if (writeAsyncBuffer.size() > 0 && !ap.hasError()) {
ap.submit(writeAsyncBuffer, true);
}
if (synchronous || ap.hasError()) {
ap.waitUntilDone();
}
if (ap.hasError()) {
if (!clearBufferOnFail) {
// if clearBufferOnFailed is not set, we're supposed to keep the failed operation in the
// write buffer. This is a questionable feature kept here for backward compatibility
writeAsyncBuffer.addAll(ap.getFailedOperations());
}
RetriesExhaustedWithDetailsException e = ap.getErrors();
ap.clearErrors();
throw e;
}
} finally {
currentWriteBufferSize = 0;
for (Row mut : writeAsyncBuffer) {
if (mut instanceof Mutation) {
currentWriteBufferSize += ((Mutation) mut).heapSize();
}
}
}
}
@ -1080,37 +1158,13 @@ public class HTable implements HTableInterface {
* {@inheritDoc}
*/
@Override
public void flushCommits() throws IOException {
if (writeBuffer.isEmpty()){
// Early exit: we can be called on empty buffers.
return;
}
Object[] results = new Object[writeBuffer.size()];
boolean success = false;
try {
this.connection.processBatch(writeBuffer, tableName, pool, results);
success = true;
} catch (InterruptedException e) {
throw new InterruptedIOException(e.getMessage());
} finally {
// mutate list so that it is empty for complete success, or contains
// only failed records. Results are returned in the same order as the
// requests in list. Walk the list backwards, so we can remove from list
// without impacting the indexes of earlier members
currentWriteBufferSize = 0;
if (success || clearBufferOnFail) {
writeBuffer.clear();
} else {
for (int i = results.length - 1; i >= 0; i--) {
if (results[i] instanceof Result) {
writeBuffer.remove(i);
} else {
currentWriteBufferSize += writeBuffer.get(i).heapSize();
}
}
}
}
public void flushCommits() throws InterruptedIOException, RetriesExhaustedWithDetailsException {
// We're looping, as if one region is overloaded we keep its operations in the buffer.
// As we can have an operation in progress even if the buffer is empty, we call
// backgroundFlushCommits at least one time.
do {
backgroundFlushCommits(true);
} while (!writeAsyncBuffer.isEmpty());
}
/**
@ -1127,7 +1181,7 @@ public class HTable implements HTableInterface {
public <R> void processBatchCallback(
final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
throws IOException, InterruptedException {
connection.processBatchCallback(list, tableName, pool, results, callback);
this.batchCallback(list, results, callback);
}
@ -1253,14 +1307,6 @@ public class HTable implements HTableInterface {
}
}
/**
* Returns the write buffer.
* @return The current write buffer.
*/
public ArrayList<Put> getWriteBuffer() {
return writeBuffer;
}
/**
* The pool is used for mutli requests for this HTable
* @return the pool used for mutli

View File

@ -48,11 +48,11 @@ import java.util.Set;
public class RetriesExhaustedWithDetailsException
extends RetriesExhaustedException {
List<Throwable> exceptions;
List<Row> actions;
List<? extends Row> actions;
List<String> hostnameAndPort;
public RetriesExhaustedWithDetailsException(List<Throwable> exceptions,
List<Row> actions,
List<? extends Row> actions,
List<String> hostnameAndPort) {
super("Failed " + exceptions.size() + " action" +
pluralize(exceptions) + ": " +
@ -105,7 +105,7 @@ extends RetriesExhaustedException {
}
public static String getDesc(List<Throwable> exceptions,
List<Row> actions,
List<? extends Row> actions,
List<String> hostnamePort) {
String s = getDesc(classifyExs(exceptions));
StringBuilder addrs = new StringBuilder(s);

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.exceptions.DoNotRetryIOException;
import org.apache.hadoop.hbase.exceptions.NotServingRegionException;
import org.apache.hadoop.hbase.exceptions.RegionMovedException;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.util.Bytes;
@ -187,6 +188,8 @@ public abstract class ServerCallable<T> implements Callable<T> {
// map to that slow/dead server; otherwise, let cache miss and ask
// .META. again to find the new location
getConnection().clearCaches(location.getServerName());
} else if (t instanceof RegionMovedException) {
getConnection().updateCachedLocations(tableName, row, t, location);
} else if (t instanceof NotServingRegionException && numRetries == 1) {
// Purge cache entries for this specific region from META cache
// since we don't call connect(true) when number of retries is 1.

View File

@ -24,7 +24,6 @@ import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -87,6 +86,7 @@ public class TestHCM {
ClusterStatusPublisher.MulticastPublisher.class, ClusterStatusPublisher.Publisher.class);
TEST_UTIL.getConfiguration().setClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
ClusterStatusListener.MultiCastListener.class, ClusterStatusListener.Listener.class);
TEST_UTIL.startMiniCluster(2);
}
@ -209,10 +209,15 @@ public class TestHCM {
* that we really delete it.
* @throws Exception
*/
@Test(timeout = 60000)
@Test
public void testRegionCaching() throws Exception{
HTable table = TEST_UTIL.createTable(TABLE_NAME, FAM_NAM);
TEST_UTIL.createTable(TABLE_NAME, FAM_NAM).close();
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
HTable table = new HTable(conf, TABLE_NAME);
TEST_UTIL.createMultiRegions(table, FAM_NAM);
TEST_UTIL.waitUntilAllRegionsAssigned(table.getTableName());
Put put = new Put(ROW);
put.add(FAM_NAM, ROW, ROW);
table.put(put);
@ -303,30 +308,18 @@ public class TestHCM {
Assert.assertFalse(
conn.getCachedLocation(TABLE_NAME, ROW).getPort() == destServerName.getPort());
// Hijack the number of retry to fail immediately instead of retrying: there will be no new
// connection to the master
Field numTries = conn.getClass().getDeclaredField("numTries");
numTries.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
modifiersField.setAccessible(true);
modifiersField.setInt(numTries, numTries.getModifiers() & ~Modifier.FINAL);
final int prevNumRetriesVal = (Integer)numTries.get(conn);
numTries.set(conn, 1);
// This part relies on a number of tries equals to 1.
// We do a put and expect the cache to be updated, even if we don't retry
LOG.info("Put starting");
Put put3 = new Put(ROW);
put3.add(FAM_NAM, ROW, ROW);
try {
table.put(put3);
Assert.assertFalse("Unreachable point", true);
}catch (Throwable e){
Assert.fail("Unreachable point");
}catch (RetriesExhaustedWithDetailsException e){
LOG.info("Put done, exception caught: " + e.getClass());
// Now check that we have the exception we wanted
Assert.assertTrue(e instanceof RetriesExhaustedWithDetailsException);
RetriesExhaustedWithDetailsException re = (RetriesExhaustedWithDetailsException)e;
Assert.assertTrue(re.getNumExceptions() == 1);
Assert.assertTrue(Arrays.equals(re.getRow(0).getRow(), ROW));
Assert.assertEquals(1, e.getNumExceptions());
Assert.assertArrayEquals(e.getRow(0).getRow(), ROW);
}
Assert.assertNotNull(conn.getCachedLocation(TABLE_NAME, ROW));
Assert.assertEquals(
@ -360,17 +353,20 @@ public class TestHCM {
Assert.assertFalse(conn.getCachedLocation(TABLE_NAME, ROW).getPort() ==
curServer.getServerName().getPort());
Scan sc = new Scan();
sc.setStopRow(ROW);
sc.setStopRow(ROW);
sc.setStartRow(ROW);
// The scanner takes the max retries from the connection configuration, not the table as
// the put.
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
try {
ResultScanner rs = table.getScanner(sc);
while (rs.next() != null) {
}
Assert.assertFalse("Unreachable point", true);
} catch (Throwable e) {
Assert.fail("Unreachable point");
} catch (RetriesExhaustedException e) {
LOG.info("Scan done, expected exception caught: " + e.getClass());
}
@ -380,7 +376,8 @@ public class TestHCM {
"Previous server was "+destServer.getServerName().getHostAndPort(),
curServer.getServerName().getPort(), conn.getCachedLocation(TABLE_NAME, ROW).getPort());
numTries.set(conn, prevNumRetriesVal);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
table.close();
}
@ -740,8 +737,8 @@ public class TestHCM {
try {
long timeBase = timeMachine.currentTimeMillis();
long largeAmountOfTime = ANY_PAUSE * 1000;
HConnectionImplementation.ServerErrorTracker tracker =
new HConnectionImplementation.ServerErrorTracker(largeAmountOfTime);
HConnectionManager.ServerErrorTracker tracker =
new HConnectionManager.ServerErrorTracker(largeAmountOfTime);
// The default backoff is 0.
assertEquals(0, tracker.calculateBackoffTime(location, ANY_PAUSE));