HBASE-1845 MultiGet, MultiDelete, and MultiPut - batched to the appropriate region servers; commit again -- was removed by hbase-2692 commit
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@992530 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
61f9214268
commit
9f12e0d060
|
@ -60,7 +60,7 @@ import java.util.TreeSet;
|
||||||
* <p>
|
* <p>
|
||||||
* To add a filter, execute {@link #setFilter(Filter) setFilter}.
|
* To add a filter, execute {@link #setFilter(Filter) setFilter}.
|
||||||
*/
|
*/
|
||||||
public class Get implements Writable {
|
public class Get implements Writable, Row, Comparable<Row> {
|
||||||
private static final byte GET_VERSION = (byte)1;
|
private static final byte GET_VERSION = (byte)1;
|
||||||
|
|
||||||
private byte [] row = null;
|
private byte [] row = null;
|
||||||
|
@ -325,6 +325,11 @@ public class Get implements Writable {
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Row
|
||||||
|
public int compareTo(Row other) {
|
||||||
|
return Bytes.compareTo(this.getRow(), other.getRow());
|
||||||
|
}
|
||||||
|
|
||||||
//Writable
|
//Writable
|
||||||
public void readFields(final DataInput in)
|
public void readFields(final DataInput in)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -211,6 +211,21 @@ public interface HConnection {
|
||||||
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
|
public <T> T getRegionServerWithoutRetries(ServerCallable<T> callable)
|
||||||
throws IOException, RuntimeException;
|
throws IOException, RuntimeException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a mixed batch of Get, Put and Delete actions. All actions for a
|
||||||
|
* RegionServer are forwarded in one RPC call.
|
||||||
|
*
|
||||||
|
* @param actions The collection of actions.
|
||||||
|
* @param tableName Name of the hbase table
|
||||||
|
* @param pool thread pool for parallel execution
|
||||||
|
* @param results An empty array, same size as list. If an exception is thrown,
|
||||||
|
* you can test here for partial results, and to determine which actions
|
||||||
|
* processed successfully.
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public void processBatch(List<Row> actions, final byte[] tableName,
|
||||||
|
ExecutorService pool, Result[] results)
|
||||||
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process a batch of Puts. Does the retries.
|
* Process a batch of Puts. Does the retries.
|
||||||
|
@ -218,20 +233,32 @@ public interface HConnection {
|
||||||
* @param tableName The name of the table
|
* @param tableName The name of the table
|
||||||
* @return Count of committed Puts. On fault, < list.size().
|
* @return Count of committed Puts. On fault, < list.size().
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
|
* @deprecated Use HConnectionManager::processBatch instead.
|
||||||
*/
|
*/
|
||||||
public int processBatchOfRows(ArrayList<Put> list, byte[] tableName)
|
public int processBatchOfRows(ArrayList<Put> list, byte[] tableName, ExecutorService pool)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process a batch of Deletes. Does the retries.
|
* Process a batch of Deletes. Does the retries.
|
||||||
* @param list A batch of Deletes to process.
|
* @param list A batch of Deletes to process.
|
||||||
* @return Count of committed Deletes. On fault, < list.size().
|
|
||||||
* @param tableName The name of the table
|
* @param tableName The name of the table
|
||||||
|
* @return Count of committed Deletes. On fault, < list.size().
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
|
* @deprecated Use HConnectionManager::processBatch instead.
|
||||||
*/
|
*/
|
||||||
public int processBatchOfDeletes(List<Delete> list, byte[] tableName)
|
public int processBatchOfDeletes(List<Delete> list, byte[] tableName, ExecutorService pool)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Process a batch of Puts.
|
||||||
|
*
|
||||||
|
* @param list The collection of actions. The list is mutated: all successful Puts
|
||||||
|
* are removed from the list.
|
||||||
|
* @param tableName Name of the hbase table
|
||||||
|
* @param pool Thread pool for parallel execution
|
||||||
|
* @throws IOException
|
||||||
|
* @deprecated Use HConnectionManager::processBatch instead.
|
||||||
|
*/
|
||||||
public void processBatchOfPuts(List<Put> list,
|
public void processBatchOfPuts(List<Put> list,
|
||||||
final byte[] tableName, ExecutorService pool) throws IOException;
|
final byte[] tableName, ExecutorService pool) throws IOException;
|
||||||
|
|
||||||
|
@ -248,7 +275,7 @@ public interface HConnection {
|
||||||
/**
|
/**
|
||||||
* Check whether region cache prefetch is enabled or not.
|
* Check whether region cache prefetch is enabled or not.
|
||||||
* @param tableName name of table to check
|
* @param tableName name of table to check
|
||||||
* @return true if table's region cache prefecth is enabled. Otherwise
|
* @return true if table's region cache prefetch is enabled. Otherwise
|
||||||
* it is disabled.
|
* it is disabled.
|
||||||
*/
|
*/
|
||||||
public boolean getRegionCachePrefetch(final byte[] tableName);
|
public boolean getRegionCachePrefetch(final byte[] tableName);
|
||||||
|
|
|
@ -29,12 +29,15 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.Map.Entry;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.CopyOnWriteArraySet;
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
import java.util.concurrent.ExecutionException;
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -65,6 +68,7 @@ import org.apache.hadoop.hbase.util.SoftValueSortedMap;
|
||||||
import org.apache.hadoop.hbase.util.Writables;
|
import org.apache.hadoop.hbase.util.Writables;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
|
import org.apache.hadoop.hbase.zookeeper.ZKTableDisable;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
@ -863,7 +867,7 @@ public class HConnectionManager {
|
||||||
* Allows flushing the region cache.
|
* Allows flushing the region cache.
|
||||||
*/
|
*/
|
||||||
public void clearRegionCache() {
|
public void clearRegionCache() {
|
||||||
cachedRegionLocations.clear();
|
cachedRegionLocations.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -1105,176 +1109,38 @@ public class HConnectionManager {
|
||||||
return location;
|
return location;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
* Helper class for batch updates.
|
* @deprecated Use HConnectionManager::processBatch instead.
|
||||||
* Holds code shared doing batch puts and batch deletes.
|
|
||||||
*/
|
*/
|
||||||
private abstract class Batch {
|
public int processBatchOfRows(final ArrayList<Put> list, final byte[] tableName, ExecutorService pool)
|
||||||
final HConnection c;
|
|
||||||
|
|
||||||
private Batch(final HConnection c) {
|
|
||||||
this.c = c;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method subclasses must implement.
|
|
||||||
* @param currentList current list of rows
|
|
||||||
* @param tableName table we are processing
|
|
||||||
* @param row row
|
|
||||||
* @return Count of items processed or -1 if all.
|
|
||||||
* @throws IOException if a remote or network exception occurs
|
|
||||||
* @throws RuntimeException other undefined exception
|
|
||||||
*/
|
|
||||||
abstract int doCall(final List<? extends Row> currentList,
|
|
||||||
final byte [] row, final byte [] tableName)
|
|
||||||
throws IOException, RuntimeException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Process the passed <code>list</code>.
|
|
||||||
* @param list list of rows to process
|
|
||||||
* @param tableName table we are processing
|
|
||||||
* @return Count of how many added or -1 if all added.
|
|
||||||
* @throws IOException if a remote or network exception occurs
|
|
||||||
*/
|
|
||||||
int process(final List<? extends Row> list, final byte[] tableName)
|
|
||||||
throws IOException {
|
|
||||||
byte [] region = getRegionName(tableName, list.get(0).getRow(), false);
|
|
||||||
byte [] currentRegion = region;
|
|
||||||
boolean isLastRow;
|
|
||||||
boolean retryOnlyOne = false;
|
|
||||||
List<Row> currentList = new ArrayList<Row>();
|
|
||||||
int i, tries;
|
|
||||||
for (i = 0, tries = 0; i < list.size() && tries < numRetries; i++) {
|
|
||||||
Row row = list.get(i);
|
|
||||||
currentList.add(row);
|
|
||||||
// If the next record goes to a new region, then we are to clear
|
|
||||||
// currentList now during this cycle.
|
|
||||||
isLastRow = (i + 1) == list.size();
|
|
||||||
if (!isLastRow) {
|
|
||||||
region = getRegionName(tableName, list.get(i + 1).getRow(), false);
|
|
||||||
}
|
|
||||||
if (!Bytes.equals(currentRegion, region) || isLastRow || retryOnlyOne) {
|
|
||||||
int index = doCall(currentList, row.getRow(), tableName);
|
|
||||||
// index is == -1 if all processed successfully, else its index
|
|
||||||
// of last record successfully processed.
|
|
||||||
if (index != -1) {
|
|
||||||
if (tries == numRetries - 1) {
|
|
||||||
throw new RetriesExhaustedException("Some server, retryOnlyOne=" +
|
|
||||||
retryOnlyOne + ", index=" + index + ", islastrow=" + isLastRow +
|
|
||||||
", tries=" + tries + ", numtries=" + numRetries + ", i=" + i +
|
|
||||||
", listsize=" + list.size() + ", region=" +
|
|
||||||
Bytes.toStringBinary(region), currentRegion, row.getRow(),
|
|
||||||
tries, new ArrayList<Throwable>());
|
|
||||||
}
|
|
||||||
tries = doBatchPause(currentRegion, tries);
|
|
||||||
i = i - currentList.size() + index;
|
|
||||||
retryOnlyOne = true;
|
|
||||||
// Reload location.
|
|
||||||
region = getRegionName(tableName, list.get(i + 1).getRow(), true);
|
|
||||||
} else {
|
|
||||||
// Reset these flags/counters on successful batch Put
|
|
||||||
retryOnlyOne = false;
|
|
||||||
tries = 0;
|
|
||||||
}
|
|
||||||
currentRegion = region;
|
|
||||||
currentList.clear();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return i;
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* @param t
|
|
||||||
* @param r
|
|
||||||
* @param re
|
|
||||||
* @return Region name that holds passed row <code>r</code>
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private byte [] getRegionName(final byte [] t, final byte [] r,
|
|
||||||
final boolean re)
|
|
||||||
throws IOException {
|
|
||||||
HRegionLocation location = getRegionLocationForRowWithRetries(t, r, re);
|
|
||||||
return location.getRegionInfo().getRegionName();
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Do pause processing before retrying...
|
|
||||||
* @param currentRegion
|
|
||||||
* @param tries
|
|
||||||
* @return New value for tries.
|
|
||||||
*/
|
|
||||||
private int doBatchPause(final byte [] currentRegion, final int tries) {
|
|
||||||
int localTries = tries;
|
|
||||||
long sleepTime = getPauseTime(tries);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Reloading region " + Bytes.toStringBinary(currentRegion) +
|
|
||||||
" location because regionserver didn't accept updates; tries=" +
|
|
||||||
tries + " of max=" + numRetries + ", waiting=" + sleepTime + "ms");
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
Thread.sleep(sleepTime);
|
|
||||||
localTries++;
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// continue
|
|
||||||
}
|
|
||||||
return localTries;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public int processBatchOfRows(final ArrayList<Put> list,
|
|
||||||
final byte[] tableName)
|
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (list.isEmpty()) {
|
Result[] results = new Result[list.size()];
|
||||||
return 0;
|
processBatch((List) list, tableName, pool, results);
|
||||||
}
|
int count = 0;
|
||||||
if (list.size() > 1) {
|
for (Result r : results) {
|
||||||
Collections.sort(list);
|
if (r != null) {
|
||||||
}
|
count++;
|
||||||
Batch b = new Batch(this) {
|
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
|
||||||
int doCall(final List<? extends Row> currentList, final byte [] row,
|
|
||||||
final byte [] tableName)
|
|
||||||
throws IOException, RuntimeException {
|
|
||||||
final List<Put> puts = (List<Put>)currentList;
|
|
||||||
return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
|
|
||||||
tableName, row) {
|
|
||||||
public Integer call() throws IOException {
|
|
||||||
return server.put(location.getRegionInfo().getRegionName(), puts);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
return b.process(list, tableName);
|
return (count == list.size() ? -1 : count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Use HConnectionManager::processBatch instead.
|
||||||
|
*/
|
||||||
public int processBatchOfDeletes(final List<Delete> list,
|
public int processBatchOfDeletes(final List<Delete> list,
|
||||||
final byte[] tableName)
|
final byte[] tableName, ExecutorService pool)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
if (list.isEmpty()) {
|
Result[] results = new Result[list.size()];
|
||||||
return 0;
|
processBatch((List) list, tableName, pool, results);
|
||||||
}
|
int count = 0;
|
||||||
if (list.size() > 1) {
|
for (Result r : results) {
|
||||||
Collections.sort(list);
|
if (r != null) {
|
||||||
}
|
count++;
|
||||||
Batch b = new Batch(this) {
|
}
|
||||||
@SuppressWarnings("unchecked")
|
|
||||||
@Override
|
|
||||||
int doCall(final List<? extends Row> currentList, final byte [] row,
|
|
||||||
final byte [] tableName)
|
|
||||||
throws IOException, RuntimeException {
|
|
||||||
final List<Delete> deletes = (List<Delete>)currentList;
|
|
||||||
return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
|
|
||||||
tableName, row) {
|
|
||||||
public Integer call() throws IOException {
|
|
||||||
return server.delete(location.getRegionInfo().getRegionName(),
|
|
||||||
deletes);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
};
|
|
||||||
return b.process(list, tableName);
|
|
||||||
}
|
}
|
||||||
|
return (count == list.size() ? -1 : count);
|
||||||
|
}
|
||||||
|
|
||||||
void close(boolean stopProxy) {
|
void close(boolean stopProxy) {
|
||||||
if (master != null) {
|
if (master != null) {
|
||||||
|
@ -1291,168 +1157,196 @@ public class HConnectionManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
private Callable<MultiResponse> createCallable(
|
||||||
* Process a batch of Puts on the given executor service.
|
final HServerAddress address,
|
||||||
*
|
final MultiAction multi,
|
||||||
* @param list the puts to make - successful puts will be removed.
|
final byte [] tableName) {
|
||||||
* @param pool thread pool to execute requests on
|
final HConnection connection = this;
|
||||||
*
|
return new Callable<MultiResponse>() {
|
||||||
* In the case of an exception, we take different actions depending on the
|
public MultiResponse call() throws IOException {
|
||||||
* situation:
|
return getRegionServerWithoutRetries(
|
||||||
* - If the exception is a DoNotRetryException, we rethrow it and leave the
|
new ServerCallable<MultiResponse>(connection, tableName, null) {
|
||||||
* 'list' parameter in an indeterminate state.
|
public MultiResponse call() throws IOException {
|
||||||
* - If the 'list' parameter is a singleton, we directly throw the specific
|
return server.multi(multi);
|
||||||
* exception for that put.
|
}
|
||||||
* - Otherwise, we throw a generic exception indicating that an error occurred.
|
@Override
|
||||||
* The 'list' parameter is mutated to contain those puts that did not succeed.
|
public void instantiateServer(boolean reload) throws IOException {
|
||||||
*/
|
server = connection.getHRegionConnection(address);
|
||||||
public void processBatchOfPuts(List<Put> list,
|
}
|
||||||
final byte[] tableName, ExecutorService pool) throws IOException {
|
}
|
||||||
boolean singletonList = list.size() == 1;
|
);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
public void processBatch(List<Row> list,
|
||||||
|
final byte[] tableName,
|
||||||
|
ExecutorService pool,
|
||||||
|
Result[] results) throws IOException {
|
||||||
|
|
||||||
|
// results must be the same size as list
|
||||||
|
if (results.length != list.size()) {
|
||||||
|
throw new IllegalArgumentException("argument results must be the same size as argument list");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (list.size() == 0) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Row> workingList = new ArrayList<Row>(list);
|
||||||
|
final boolean singletonList = (list.size() == 1);
|
||||||
|
boolean retry = true;
|
||||||
Throwable singleRowCause = null;
|
Throwable singleRowCause = null;
|
||||||
for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
|
|
||||||
Collections.sort(list);
|
|
||||||
Map<HServerAddress, MultiPut> regionPuts =
|
|
||||||
new HashMap<HServerAddress, MultiPut>();
|
|
||||||
// step 1:
|
|
||||||
// break up into regionserver-sized chunks and build the data structs
|
|
||||||
for ( Put put : list ) {
|
|
||||||
byte [] row = put.getRow();
|
|
||||||
|
|
||||||
HRegionLocation loc = locateRegion(tableName, row, true);
|
for (int tries = 0; tries < numRetries && retry; ++tries) {
|
||||||
HServerAddress address = loc.getServerAddress();
|
|
||||||
byte [] regionName = loc.getRegionInfo().getRegionName();
|
|
||||||
|
|
||||||
MultiPut mput = regionPuts.get(address);
|
// sleep first, if this is a retry
|
||||||
if (mput == null) {
|
if (tries >= 1) {
|
||||||
mput = new MultiPut(address);
|
long sleepTime = getPauseTime(tries);
|
||||||
regionPuts.put(address, mput);
|
LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
|
||||||
|
try {
|
||||||
|
Thread.sleep(sleepTime);
|
||||||
|
} catch (InterruptedException ignore) {
|
||||||
|
LOG.debug("Interupted");
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
mput.add(regionName, put);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// step 2:
|
// step 1: break up into regionserver-sized chunks and build the data structs
|
||||||
// make the requests
|
|
||||||
// Discard the map, just use a list now, makes error recovery easier.
|
|
||||||
List<MultiPut> multiPuts = new ArrayList<MultiPut>(regionPuts.values());
|
|
||||||
|
|
||||||
List<Future<MultiPutResponse>> futures =
|
Map<HServerAddress, MultiAction> actionsByServer = new HashMap<HServerAddress, MultiAction>();
|
||||||
new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
|
for (int i=0; i<workingList.size(); i++) {
|
||||||
for ( MultiPut put : multiPuts ) {
|
Row row = workingList.get(i);
|
||||||
futures.add(pool.submit(createPutCallable(put.address,
|
if (row != null) {
|
||||||
put,
|
HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
|
||||||
tableName)));
|
HServerAddress address = loc.getServerAddress();
|
||||||
|
byte[] regionName = loc.getRegionInfo().getRegionName();
|
||||||
|
|
||||||
|
MultiAction actions = actionsByServer.get(address);
|
||||||
|
if (actions == null) {
|
||||||
|
actions = new MultiAction();
|
||||||
|
actionsByServer.put(address, actions);
|
||||||
|
}
|
||||||
|
|
||||||
|
Action action = new Action(regionName, row, i);
|
||||||
|
actions.add(regionName, action);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
// RUN!
|
|
||||||
List<Put> failed = new ArrayList<Put>();
|
// step 2: make the requests
|
||||||
|
|
||||||
// step 3:
|
Map<HServerAddress,Future<MultiResponse>> futures =
|
||||||
// collect the failures and tries from step 1.
|
new HashMap<HServerAddress, Future<MultiResponse>>(actionsByServer.size());
|
||||||
for (int i = 0; i < futures.size(); i++ ) {
|
|
||||||
Future<MultiPutResponse> future = futures.get(i);
|
for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
|
||||||
MultiPut request = multiPuts.get(i);
|
futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// step 3: collect the failures and successes and prepare for retry
|
||||||
|
|
||||||
|
for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures.entrySet()) {
|
||||||
|
HServerAddress address = responsePerServer.getKey();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
MultiPutResponse resp = future.get();
|
// Gather the results for one server
|
||||||
|
Future<MultiResponse> future = responsePerServer.getValue();
|
||||||
|
|
||||||
// For each region
|
// Not really sure what a reasonable timeout value is. Here's a first try.
|
||||||
for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
|
|
||||||
Integer result = resp.getAnswer(e.getKey());
|
MultiResponse resp = future.get(1000, TimeUnit.MILLISECONDS);
|
||||||
if (result == null) {
|
|
||||||
// failed
|
if (resp == null) {
|
||||||
LOG.debug("Failed all for region: " +
|
// Entire server failed
|
||||||
Bytes.toStringBinary(e.getKey()) + ", removing from cache");
|
LOG.debug("Failed all for server: " + address + ", removing from cache");
|
||||||
failed.addAll(e.getValue());
|
|
||||||
} else if (result >= 0) {
|
|
||||||
// some failures
|
|
||||||
List<Put> lst = e.getValue();
|
|
||||||
failed.addAll(lst.subList(result, lst.size()));
|
|
||||||
LOG.debug("Failed past " + result + " for region: " +
|
|
||||||
Bytes.toStringBinary(e.getKey()) + ", removing from cache");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
// go into the failed list.
|
|
||||||
LOG.debug("Failed all from " + request.address, e);
|
|
||||||
failed.addAll(request.allPuts());
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
Throwable cause = e.getCause();
|
|
||||||
// Don't print stack trace if NSRE; NSRE is 'normal' operation.
|
|
||||||
if (cause instanceof NotServingRegionException) {
|
|
||||||
String msg = cause.getMessage();
|
|
||||||
if (msg != null && msg.length() > 0) {
|
|
||||||
// msg is the exception as a String... we just want first line.
|
|
||||||
msg = msg.split("[\\n\\r]+\\s*at")[0];
|
|
||||||
}
|
|
||||||
LOG.debug("Failed execution of all on " + request.address +
|
|
||||||
" because: " + msg);
|
|
||||||
} else {
|
} else {
|
||||||
// all go into the failed list.
|
// For each region
|
||||||
LOG.debug("Failed execution of all on " + request.address,
|
for (Entry<byte[], List<Pair<Integer,Result>>> e : resp.getResults().entrySet()) {
|
||||||
e.getCause());
|
byte[] regionName = e.getKey();
|
||||||
|
List<Pair<Integer, Result>> regionResults = e.getValue();
|
||||||
|
for (int i = 0; i < regionResults.size(); i++) {
|
||||||
|
Pair<Integer, Result> regionResult = regionResults.get(i);
|
||||||
|
if (regionResult.getSecond() == null) {
|
||||||
|
// failed
|
||||||
|
LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache");
|
||||||
|
} else {
|
||||||
|
// success
|
||||||
|
results[regionResult.getFirst()] = regionResult.getSecond();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
failed.addAll(request.allPuts());
|
} catch (TimeoutException e) {
|
||||||
|
LOG.debug("Timeout for region server: " + address + ", removing from cache");
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.debug("Failed all from " + address, e);
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
LOG.debug("Failed all from " + address, e);
|
||||||
|
|
||||||
// Just give up, leaving the batch put list in an untouched/semi-committed state
|
// Just give up, leaving the batch incomplete
|
||||||
if (e.getCause() instanceof DoNotRetryIOException) {
|
if (e.getCause() instanceof DoNotRetryIOException) {
|
||||||
throw (DoNotRetryIOException) e.getCause();
|
throw (DoNotRetryIOException) e.getCause();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (singletonList) {
|
if (singletonList) {
|
||||||
// be richer for reporting in a 1 row case.
|
// be richer for reporting in a 1 row case.
|
||||||
singleRowCause = e.getCause();
|
singleRowCause = e.getCause();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
list.clear();
|
|
||||||
if (!failed.isEmpty()) {
|
|
||||||
for (Put failedPut: failed) {
|
|
||||||
deleteCachedLocation(tableName, failedPut.getRow());
|
|
||||||
}
|
|
||||||
|
|
||||||
list.addAll(failed);
|
// Find failures (i.e. null Result), and add them to the workingList (in
|
||||||
|
// order), so they can be retried.
|
||||||
long sleepTime = getPauseTime(tries);
|
retry = false;
|
||||||
LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime +
|
workingList.clear();
|
||||||
" ms!");
|
for (int i = 0; i < results.length; i++) {
|
||||||
try {
|
if (results[i] == null) {
|
||||||
Thread.sleep(sleepTime);
|
retry = true;
|
||||||
} catch (InterruptedException ignored) {
|
Row row = list.get(i);
|
||||||
|
workingList.add(row);
|
||||||
|
deleteCachedLocation(tableName, row.getRow());
|
||||||
|
} else {
|
||||||
|
// add null to workingList, so the order remains consistent with the original list argument.
|
||||||
|
workingList.add(null);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!list.isEmpty()) {
|
|
||||||
if (singletonList && singleRowCause != null) {
|
|
||||||
throw new IOException(singleRowCause);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ran out of retries and didnt succeed everything!
|
if (Thread.currentThread().isInterrupted()) {
|
||||||
throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " +
|
throw new IOException("Aborting attempt because of a thread interruption");
|
||||||
numRetries + " times.");
|
}
|
||||||
|
|
||||||
|
if (retry) {
|
||||||
|
// ran out of retries and didn't successfully finish everything!
|
||||||
|
if (singleRowCause != null) {
|
||||||
|
throw new IOException(singleRowCause);
|
||||||
|
} else {
|
||||||
|
throw new RetriesExhaustedException("Still had " + workingList.size()
|
||||||
|
+ " actions left after retrying " + numRetries + " times.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
private Callable<MultiPutResponse> createPutCallable(
|
* @deprecated Use HConnectionManager::processBatch instead.
|
||||||
final HServerAddress address, final MultiPut puts,
|
*/
|
||||||
final byte [] tableName) {
|
public void processBatchOfPuts(List<Put> list,
|
||||||
final HConnection connection = this;
|
final byte[] tableName,
|
||||||
return new Callable<MultiPutResponse>() {
|
ExecutorService pool) throws IOException {
|
||||||
public MultiPutResponse call() throws IOException {
|
Result[] results = new Result[list.size()];
|
||||||
return getRegionServerWithoutRetries(
|
processBatch((List) list, tableName, pool, results);
|
||||||
new ServerCallable<MultiPutResponse>(connection, tableName, null) {
|
|
||||||
public MultiPutResponse call() throws IOException {
|
// mutate list so that it is empty for complete success, or contains only failed records
|
||||||
MultiPutResponse resp = server.multiPut(puts);
|
// results are returned in the same order as the requests in list
|
||||||
resp.request = puts;
|
// walk the list backwards, so we can remove from list without impacting the indexes of earlier members
|
||||||
return resp;
|
for (int i = results.length - 1; i>=0; i--) {
|
||||||
}
|
// if result is not null, it succeeded
|
||||||
@Override
|
if (results[i] != null) {
|
||||||
public void instantiateServer(boolean reload) throws IOException {
|
list.remove(i);
|
||||||
server = connection.getHRegionConnection(address);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Throwable translateException(Throwable t) throws IOException {
|
private Throwable translateException(Throwable t) throws IOException {
|
||||||
|
|
|
@ -46,6 +46,7 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
|
@ -77,7 +78,7 @@ public class HTable implements HTableInterface {
|
||||||
private long currentWriteBufferSize;
|
private long currentWriteBufferSize;
|
||||||
protected int scannerCaching;
|
protected int scannerCaching;
|
||||||
private int maxKeyValueSize;
|
private int maxKeyValueSize;
|
||||||
|
private ExecutorService pool; // For Multi
|
||||||
private long maxScannerResultSize;
|
private long maxScannerResultSize;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -144,13 +145,11 @@ public class HTable implements HTableInterface {
|
||||||
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
|
||||||
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
|
||||||
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
|
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
|
||||||
|
|
||||||
int nrHRS = getCurrentNrHRS();
|
int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
|
||||||
if (nrHRS == 0) {
|
if (nrThreads == 0) {
|
||||||
// No servers running -- set default of 10 threads.
|
nrThreads = 1; // is there a better default?
|
||||||
nrHRS = 10;
|
|
||||||
}
|
}
|
||||||
int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS);
|
|
||||||
|
|
||||||
// Unfortunately Executors.newCachedThreadPool does not allow us to
|
// Unfortunately Executors.newCachedThreadPool does not allow us to
|
||||||
// set the maximum size of the pool, so we have to do it ourselves.
|
// set the maximum size of the pool, so we have to do it ourselves.
|
||||||
|
@ -175,9 +174,6 @@ public class HTable implements HTableInterface {
|
||||||
return admin.getClusterStatus().getServers();
|
return admin.getClusterStatus().getServers();
|
||||||
}
|
}
|
||||||
|
|
||||||
// For multiput
|
|
||||||
private ExecutorService pool;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tells whether or not a table is enabled or not.
|
* Tells whether or not a table is enabled or not.
|
||||||
* @param tableName Name of table to check.
|
* @param tableName Name of table to check.
|
||||||
|
@ -508,6 +504,40 @@ public class HTable implements HTableInterface {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that does a batch call on Deletes, Gets and Puts.
|
||||||
|
*
|
||||||
|
* @param actions list of Get, Put, Delete objects
|
||||||
|
* @param results Empty Result[], same size as actions. Provides access to partial
|
||||||
|
* results, in case an exception is thrown. A null in the result array means that
|
||||||
|
* the call for that action failed, even after retries
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public synchronized void batch(final List<Row> actions, final Result[] results) throws IOException {
|
||||||
|
connection.processBatch(actions, tableName, pool, results);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method that does a batch call on Deletes, Gets and Puts.
|
||||||
|
*
|
||||||
|
* @param actions list of Get, Put, Delete objects
|
||||||
|
* @return the results from the actions. A null in the return array means that
|
||||||
|
* the call for that action failed, even after retries
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public synchronized Result[] batch(final List<Row> actions) throws IOException {
|
||||||
|
Result[] results = new Result[actions.size()];
|
||||||
|
connection.processBatch(actions, tableName, pool, results);
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes the specified cells/row.
|
||||||
|
*
|
||||||
|
* @param delete The object that specifies what to delete.
|
||||||
|
* @throws IOException if a remote or network exception occurs.
|
||||||
|
* @since 0.20.0
|
||||||
|
*/
|
||||||
public void delete(final Delete delete)
|
public void delete(final Delete delete)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
connection.getRegionServerWithRetries(
|
connection.getRegionServerWithRetries(
|
||||||
|
@ -520,13 +550,28 @@ public class HTable implements HTableInterface {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Deletes the specified cells/rows in bulk.
|
||||||
|
* @param deletes List of things to delete. As a side effect, it will be modified:
|
||||||
|
* successful {@link Delete}s are removed. The ordering of the list will not change.
|
||||||
|
* @throws IOException if a remote or network exception occurs. In that case
|
||||||
|
* the {@code deletes} argument will contain the {@link Delete} instances
|
||||||
|
* that have not be successfully applied.
|
||||||
|
* @since 0.20.1
|
||||||
|
*/
|
||||||
public void delete(final List<Delete> deletes)
|
public void delete(final List<Delete> deletes)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
int last = 0;
|
Result[] results = new Result[deletes.size()];
|
||||||
try {
|
connection.processBatch((List) deletes, tableName, pool, results);
|
||||||
last = connection.processBatchOfDeletes(deletes, this.tableName);
|
|
||||||
} finally {
|
// mutate list so that it is empty for complete success, or contains only failed records
|
||||||
deletes.subList(0, last).clear();
|
// results are returned in the same order as the requests in list
|
||||||
|
// walk the list backwards, so we can remove from list without impacting the indexes of earlier members
|
||||||
|
for (int i = results.length - 1; i>=0; i--) {
|
||||||
|
// if result is not null, it succeeded
|
||||||
|
if (results[i] != null) {
|
||||||
|
deletes.remove(i);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -657,10 +702,17 @@ public class HTable implements HTableInterface {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Executes all the buffered {@link Put} operations.
|
||||||
|
* <p>
|
||||||
|
* This method gets called once automatically for every {@link Put} or batch
|
||||||
|
* of {@link Put}s (when {@link #batch(List)} is used) when
|
||||||
|
* {@link #isAutoFlush()} is {@code true}.
|
||||||
|
* @throws IOException if a remote or network exception occurs.
|
||||||
|
*/
|
||||||
public void flushCommits() throws IOException {
|
public void flushCommits() throws IOException {
|
||||||
try {
|
try {
|
||||||
connection.processBatchOfPuts(writeBuffer,
|
connection.processBatchOfPuts(writeBuffer, tableName, pool);
|
||||||
tableName, pool);
|
|
||||||
} finally {
|
} finally {
|
||||||
// the write buffer was adjusted by processBatchOfPuts
|
// the write buffer was adjusted by processBatchOfPuts
|
||||||
currentWriteBufferSize = 0;
|
currentWriteBufferSize = 0;
|
||||||
|
|
|
@ -34,6 +34,7 @@ import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @deprecated Use MultiAction instead
|
||||||
* Data type class for putting multiple regions worth of puts in one RPC.
|
* Data type class for putting multiple regions worth of puts in one RPC.
|
||||||
*/
|
*/
|
||||||
public class MultiPut implements Writable {
|
public class MultiPut implements Writable {
|
||||||
|
|
|
@ -30,6 +30,7 @@ import java.util.Map;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* @deprecated Replaced by MultiResponse
|
||||||
* Response class for MultiPut.
|
* Response class for MultiPut.
|
||||||
*/
|
*/
|
||||||
public class MultiPutResponse implements Writable {
|
public class MultiPutResponse implements Writable {
|
||||||
|
|
|
@ -29,5 +29,4 @@ public interface Row extends WritableComparable<Row> {
|
||||||
* @return The row.
|
* @return The row.
|
||||||
*/
|
*/
|
||||||
public byte [] getRow();
|
public byte [] getRow();
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,8 +46,12 @@ import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.MultiPut;
|
import org.apache.hadoop.hbase.client.MultiPut;
|
||||||
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
||||||
|
import org.apache.hadoop.hbase.client.MultiAction;
|
||||||
|
import org.apache.hadoop.hbase.client.Action;
|
||||||
|
import org.apache.hadoop.hbase.client.MultiResponse;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Row;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
import org.apache.hadoop.hbase.filter.BinaryComparator;
|
||||||
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
|
import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
|
||||||
|
@ -187,6 +191,13 @@ public class HbaseObjectWritable implements Writable, Configurable {
|
||||||
|
|
||||||
addToMap(NavigableSet.class, code++);
|
addToMap(NavigableSet.class, code++);
|
||||||
addToMap(ColumnPrefixFilter.class, code++);
|
addToMap(ColumnPrefixFilter.class, code++);
|
||||||
|
|
||||||
|
// Multi
|
||||||
|
addToMap(Row.class, code++);
|
||||||
|
addToMap(Action.class, code++);
|
||||||
|
addToMap(MultiAction.class, code++);
|
||||||
|
addToMap(MultiResponse.class, code++);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private Class<?> declaredClass;
|
private Class<?> declaredClass;
|
||||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.hadoop.hbase.NotServingRegionException;
|
||||||
import org.apache.hadoop.hbase.Stoppable;
|
import org.apache.hadoop.hbase.Stoppable;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.MultiAction;
|
||||||
|
import org.apache.hadoop.hbase.client.MultiResponse;
|
||||||
import org.apache.hadoop.hbase.client.MultiPut;
|
import org.apache.hadoop.hbase.client.MultiPut;
|
||||||
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
@ -266,6 +268,13 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion, Stoppable, Ab
|
||||||
*/
|
*/
|
||||||
public HServerInfo getHServerInfo() throws IOException;
|
public HServerInfo getHServerInfo() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Method used for doing multiple actions(Deletes, Gets and Puts) in one call
|
||||||
|
* @param multi
|
||||||
|
* @return MultiResult
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public MultiResponse multi(MultiAction multi) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Multi put for putting multiple regions worth of puts at once.
|
* Multi put for putting multiple regions worth of puts at once.
|
||||||
|
|
|
@ -82,6 +82,10 @@ import org.apache.hadoop.hbase.catalog.CatalogTracker;
|
||||||
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
import org.apache.hadoop.hbase.catalog.MetaEditor;
|
||||||
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
|
||||||
import org.apache.hadoop.hbase.client.Delete;
|
import org.apache.hadoop.hbase.client.Delete;
|
||||||
|
import org.apache.hadoop.hbase.client.Action;
|
||||||
|
import org.apache.hadoop.hbase.client.MultiAction;
|
||||||
|
import org.apache.hadoop.hbase.client.MultiResponse;
|
||||||
|
import org.apache.hadoop.hbase.client.Row;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.MultiPut;
|
import org.apache.hadoop.hbase.client.MultiPut;
|
||||||
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
import org.apache.hadoop.hbase.client.MultiPutResponse;
|
||||||
|
@ -2204,6 +2208,54 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
return serverInfo;
|
return serverInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MultiResponse multi(MultiAction multi) throws IOException {
|
||||||
|
MultiResponse response = new MultiResponse();
|
||||||
|
for (Map.Entry<byte[], List<Action>> e : multi.actions.entrySet()) {
|
||||||
|
byte[] regionName = e.getKey();
|
||||||
|
List<Action> actionsForRegion = e.getValue();
|
||||||
|
// sort based on the row id - this helps in the case where we reach the
|
||||||
|
// end of a region, so that we don't have to try the rest of the
|
||||||
|
// actions in the list.
|
||||||
|
Collections.sort(actionsForRegion);
|
||||||
|
Row action = null;
|
||||||
|
try {
|
||||||
|
for (Action a : actionsForRegion) {
|
||||||
|
action = a.getAction();
|
||||||
|
if (action instanceof Delete) {
|
||||||
|
delete(regionName, (Delete) action);
|
||||||
|
response.add(regionName, new Pair<Integer, Result>(
|
||||||
|
a.getOriginalIndex(), new Result()));
|
||||||
|
} else if (action instanceof Get) {
|
||||||
|
response.add(regionName, new Pair<Integer, Result>(
|
||||||
|
a.getOriginalIndex(), get(regionName, (Get) action)));
|
||||||
|
} else if (action instanceof Put) {
|
||||||
|
put(regionName, (Put) action);
|
||||||
|
response.add(regionName, new Pair<Integer, Result>(
|
||||||
|
a.getOriginalIndex(), new Result()));
|
||||||
|
} else {
|
||||||
|
LOG.debug("Error: invalid Action, row must be a Get, Delete or Put.");
|
||||||
|
throw new IllegalArgumentException("Invalid Action, row must be a Get, Delete or Put.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
if (multi.size() == 1) {
|
||||||
|
throw ioe;
|
||||||
|
} else {
|
||||||
|
LOG.error("Exception found while attempting " + action.toString()
|
||||||
|
+ " " + StringUtils.stringifyException(ioe));
|
||||||
|
response.add(regionName,null);
|
||||||
|
// stop processing on this region, continue to the next.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @deprecated Use HRegionServer.multi( MultiAction action) instead
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public MultiPutResponse multiPut(MultiPut puts) throws IOException {
|
public MultiPutResponse multiPut(MultiPut puts) throws IOException {
|
||||||
MultiPutResponse resp = new MultiPutResponse();
|
MultiPutResponse resp = new MultiPutResponse();
|
||||||
|
@ -2246,7 +2298,7 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
|
||||||
public CompactionRequestor getCompactionRequester() {
|
public CompactionRequestor getCompactionRequester() {
|
||||||
return this.compactSplitThread;
|
return this.compactSplitThread;
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
//
|
||||||
// Main program and support routines
|
// Main program and support routines
|
||||||
//
|
//
|
||||||
|
|
|
@ -1,117 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright 2009 The Apache Software Foundation
|
|
||||||
*
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.hbase;
|
|
||||||
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
|
||||||
import org.apache.hadoop.hbase.client.HTable;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
|
|
||||||
public class TestMultiParallelPut extends MultiRegionTable {
|
|
||||||
final Log LOG = LogFactory.getLog(getClass());
|
|
||||||
private static final byte[] VALUE = Bytes.toBytes("value");
|
|
||||||
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
|
|
||||||
private static final String FAMILY = "family";
|
|
||||||
private static final String TEST_TABLE = "test_table";
|
|
||||||
private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
|
|
||||||
|
|
||||||
|
|
||||||
public TestMultiParallelPut() {
|
|
||||||
super(2, FAMILY);
|
|
||||||
desc = new HTableDescriptor(TEST_TABLE);
|
|
||||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
|
||||||
makeKeys();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void makeKeys() {
|
|
||||||
for (byte [] k : KEYS) {
|
|
||||||
byte [] cp = new byte[k.length+1];
|
|
||||||
System.arraycopy(k, 0, cp, 0, k.length);
|
|
||||||
cp[k.length] = 1;
|
|
||||||
keys.add(cp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
List<byte[]> keys = new ArrayList<byte[]>();
|
|
||||||
|
|
||||||
public void testParallelPut() throws Exception {
|
|
||||||
LOG.info("Starting testParallelPut");
|
|
||||||
doATest(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testParallelPutWithRSAbort() throws Exception {
|
|
||||||
LOG.info("Starting testParallelPutWithRSAbort");
|
|
||||||
doATest(true);
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doATest(boolean doAbort) throws Exception {
|
|
||||||
conf.setInt("hbase.client.retries.number", 10);
|
|
||||||
HTable table = new HTable(conf, TEST_TABLE);
|
|
||||||
table.setAutoFlush(false);
|
|
||||||
table.setWriteBufferSize(10 * 1024 * 1024);
|
|
||||||
for ( byte [] k : keys ) {
|
|
||||||
Put put = new Put(k);
|
|
||||||
put.add(BYTES_FAMILY, QUALIFIER, VALUE);
|
|
||||||
table.put(put);
|
|
||||||
}
|
|
||||||
table.flushCommits();
|
|
||||||
|
|
||||||
if (doAbort) {
|
|
||||||
LOG.info("Aborting...");
|
|
||||||
cluster.abortRegionServer(0);
|
|
||||||
// try putting more keys after the abort.
|
|
||||||
for ( byte [] k : keys ) {
|
|
||||||
Put put = new Put(k);
|
|
||||||
put.add(BYTES_FAMILY, QUALIFIER, VALUE);
|
|
||||||
table.put(put);
|
|
||||||
}
|
|
||||||
table.flushCommits();
|
|
||||||
}
|
|
||||||
|
|
||||||
for (byte [] k : keys ) {
|
|
||||||
Get get = new Get(k);
|
|
||||||
get.addColumn(BYTES_FAMILY, QUALIFIER);
|
|
||||||
Result r = table.get(get);
|
|
||||||
assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
|
|
||||||
assertEquals(0,
|
|
||||||
Bytes.compareTo(VALUE,
|
|
||||||
r.getValue(BYTES_FAMILY, QUALIFIER)));
|
|
||||||
}
|
|
||||||
|
|
||||||
HBaseAdmin admin = new HBaseAdmin(conf);
|
|
||||||
ClusterStatus cs = admin.getClusterStatus();
|
|
||||||
int expectedServerCount = 2;
|
|
||||||
if (doAbort) expectedServerCount = 1;
|
|
||||||
LOG.info("Clusterstatus servers count " + cs.getServers());
|
|
||||||
assertEquals(expectedServerCount, cs.getServers());
|
|
||||||
for ( HServerInfo info : cs.getServerInfo()) {
|
|
||||||
LOG.info("Info from clusterstatus=" + info);
|
|
||||||
assertTrue(info.getLoad().getNumberOfRegions() > 8);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue