HBASE-2985 HRegionServer.multi() no longer calls HRegion.put(List) when possible

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1024074 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-10-19 01:01:52 +00:00
parent 113d533ad7
commit 1eede65ae1
6 changed files with 81 additions and 47 deletions

View File

@ -596,6 +596,8 @@ Release 0.21.0 - Unreleased
HBASE-3121 [rest] Do not perform cache control when returning results HBASE-3121 [rest] Do not perform cache control when returning results
HBASE-2669 HCM.shutdownHook causes data loss with HBASE-2669 HCM.shutdownHook causes data loss with
hbase.client.write.buffer != 0 hbase.client.write.buffer != 0
HBASE-2985 HRegionServer.multi() no longer calls HRegion.put(List) when
possible
IMPROVEMENTS IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable HBASE-1760 Cleanup TODOs in HTable

View File

@ -1068,7 +1068,7 @@ public class HConnectionManager {
} }
private Callable<MultiResponse> createCallable( private Callable<MultiResponse> createCallable(
final HServerAddress address, final HServerAddress address,
final MultiAction multi, final MultiAction multi,
final byte [] tableName) { final byte [] tableName) {
final HConnection connection = this; final HConnection connection = this;
@ -1098,11 +1098,11 @@ public class HConnectionManager {
if (results.length != list.size()) { if (results.length != list.size()) {
throw new IllegalArgumentException("argument results must be the same size as argument list"); throw new IllegalArgumentException("argument results must be the same size as argument list");
} }
if (list.size() == 0) { if (list.size() == 0) {
return; return;
} }
List<Row> workingList = new ArrayList<Row>(list); List<Row> workingList = new ArrayList<Row>(list);
final boolean singletonList = (list.size() == 1); final boolean singletonList = (list.size() == 1);
boolean retry = true; boolean retry = true;
@ -1114,8 +1114,8 @@ public class HConnectionManager {
if (tries >= 1) { if (tries >= 1) {
long sleepTime = getPauseTime(tries); long sleepTime = getPauseTime(tries);
LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!"); LOG.debug("Retry " +tries+ ", sleep for " +sleepTime+ "ms!");
try { try {
Thread.sleep(sleepTime); Thread.sleep(sleepTime);
} catch (InterruptedException ignore) { } catch (InterruptedException ignore) {
LOG.debug("Interupted"); LOG.debug("Interupted");
Thread.currentThread().interrupt(); Thread.currentThread().interrupt();
@ -1132,38 +1132,38 @@ public class HConnectionManager {
HRegionLocation loc = locateRegion(tableName, row.getRow(), true); HRegionLocation loc = locateRegion(tableName, row.getRow(), true);
HServerAddress address = loc.getServerAddress(); HServerAddress address = loc.getServerAddress();
byte[] regionName = loc.getRegionInfo().getRegionName(); byte[] regionName = loc.getRegionInfo().getRegionName();
MultiAction actions = actionsByServer.get(address); MultiAction actions = actionsByServer.get(address);
if (actions == null) { if (actions == null) {
actions = new MultiAction(); actions = new MultiAction();
actionsByServer.put(address, actions); actionsByServer.put(address, actions);
} }
Action action = new Action(regionName, row, i); Action action = new Action(regionName, row, i);
actions.add(regionName, action); actions.add(regionName, action);
} }
} }
// step 2: make the requests // step 2: make the requests
Map<HServerAddress,Future<MultiResponse>> futures = Map<HServerAddress,Future<MultiResponse>> futures =
new HashMap<HServerAddress, Future<MultiResponse>>(actionsByServer.size()); new HashMap<HServerAddress, Future<MultiResponse>>(actionsByServer.size());
for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) { for (Entry<HServerAddress, MultiAction> e : actionsByServer.entrySet()) {
futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName))); futures.put(e.getKey(), pool.submit(createCallable(e.getKey(), e.getValue(), tableName)));
} }
// step 3: collect the failures and successes and prepare for retry // step 3: collect the failures and successes and prepare for retry
for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures.entrySet()) { for (Entry<HServerAddress, Future<MultiResponse>> responsePerServer : futures.entrySet()) {
HServerAddress address = responsePerServer.getKey(); HServerAddress address = responsePerServer.getKey();
try { try {
// Gather the results for one server // Gather the results for one server
Future<MultiResponse> future = responsePerServer.getValue(); Future<MultiResponse> future = responsePerServer.getValue();
// Not really sure what a reasonable timeout value is. Here's a first try. // Not really sure what a reasonable timeout value is. Here's a first try.
MultiResponse resp = future.get(); MultiResponse resp = future.get();
if (resp == null) { if (resp == null) {
@ -1176,7 +1176,7 @@ public class HConnectionManager {
List<Pair<Integer, Result>> regionResults = e.getValue(); List<Pair<Integer, Result>> regionResults = e.getValue();
for (Pair<Integer, Result> regionResult : regionResults) { for (Pair<Integer, Result> regionResult : regionResults) {
if (regionResult == null) { if (regionResult == null) {
// failed // if the first/only record is 'null' the entire region failed.
LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache"); LOG.debug("Failures for region: " + Bytes.toStringBinary(regionName) + ", removing from cache");
} else { } else {
// success // success
@ -1196,12 +1196,12 @@ public class HConnectionManager {
if (e.getCause() instanceof DoNotRetryIOException) { if (e.getCause() instanceof DoNotRetryIOException) {
throw (DoNotRetryIOException) e.getCause(); throw (DoNotRetryIOException) e.getCause();
} }
if (singletonList) { if (singletonList) {
// be richer for reporting in a 1 row case. // be richer for reporting in a 1 row case.
singleRowCause = e.getCause(); singleRowCause = e.getCause();
} }
} }
} }
// Find failures (i.e. null Result), and add them to the workingList (in // Find failures (i.e. null Result), and add them to the workingList (in
@ -1224,7 +1224,7 @@ public class HConnectionManager {
if (Thread.currentThread().isInterrupted()) { if (Thread.currentThread().isInterrupted()) {
throw new IOException("Aborting attempt because of a thread interruption"); throw new IOException("Aborting attempt because of a thread interruption");
} }
if (retry) { if (retry) {
// ran out of retries and didn't successfully finish everything! // ran out of retries and didn't successfully finish everything!
if (singleRowCause != null) { if (singleRowCause != null) {

View File

@ -57,7 +57,7 @@ import org.apache.hadoop.hbase.util.Writables;
/** /**
* Used to communicate with a single HBase table. * Used to communicate with a single HBase table.
* *
* This class is not thread safe for updates; the underlying write buffer can * This class is not thread safe for updates; the underlying write buffer can
* be corrupted if multiple threads contend over a single HTable instance. * be corrupted if multiple threads contend over a single HTable instance.
* *
@ -100,7 +100,7 @@ public class HTable implements HTableInterface {
/** /**
* Creates an object to access a HBase table. * Creates an object to access a HBase table.
* Internally it creates a new instance of {@link Configuration} and a new * Internally it creates a new instance of {@link Configuration} and a new
* client to zookeeper as well as other resources. It also comes up with * client to zookeeper as well as other resources. It also comes up with
* a fresh view of the cluster and must do discovery from scratch of region * a fresh view of the cluster and must do discovery from scratch of region
* locations; i.e. it will not make use of already-cached region locations if * locations; i.e. it will not make use of already-cached region locations if
* available. Use only when being quick and dirty. * available. Use only when being quick and dirty.
@ -115,7 +115,7 @@ public class HTable implements HTableInterface {
/** /**
* Creates an object to access a HBase table. * Creates an object to access a HBase table.
* Internally it creates a new instance of {@link Configuration} and a new * Internally it creates a new instance of {@link Configuration} and a new
* client to zookeeper as well as other resources. It also comes up with * client to zookeeper as well as other resources. It also comes up with
* a fresh view of the cluster and must do discovery from scratch of region * a fresh view of the cluster and must do discovery from scratch of region
* locations; i.e. it will not make use of already-cached region locations if * locations; i.e. it will not make use of already-cached region locations if
* available. Use only when being quick and dirty. * available. Use only when being quick and dirty.
@ -176,7 +176,7 @@ public class HTable implements HTableInterface {
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE); HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1); this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS()); int nrThreads = conf.getInt("hbase.htable.threads.max", getCurrentNrHRS());
if (nrThreads == 0) { if (nrThreads == 0) {
nrThreads = 1; // is there a better default? nrThreads = 1; // is there a better default?
@ -551,7 +551,10 @@ public class HTable implements HTableInterface {
} }
/** /**
* Method that does a batch call on Deletes, Gets and Puts. * Method that does a batch call on Deletes, Gets and Puts. The ordering of
* execution of the actions is not defined. Meaning if you do a Put and a
* Get in the same {@link #batch} call, you will not necessarily be
* guaranteed that the Get returns what the Put had put.
* *
* @param actions list of Get, Put, Delete objects * @param actions list of Get, Put, Delete objects
* @param results Empty Result[], same size as actions. Provides access to partial * @param results Empty Result[], same size as actions. Provides access to partial
@ -566,7 +569,7 @@ public class HTable implements HTableInterface {
/** /**
* Method that does a batch call on Deletes, Gets and Puts. * Method that does a batch call on Deletes, Gets and Puts.
* *
* @param actions list of Get, Put, Delete objects * @param actions list of Get, Put, Delete objects
* @return the results from the actions. A null in the return array means that * @return the results from the actions. A null in the return array means that
* the call for that action failed, even after retries * the call for that action failed, even after retries
@ -581,7 +584,7 @@ public class HTable implements HTableInterface {
/** /**
* Deletes the specified cells/row. * Deletes the specified cells/row.
* *
* @param delete The object that specifies what to delete. * @param delete The object that specifies what to delete.
* @throws IOException if a remote or network exception occurs. * @throws IOException if a remote or network exception occurs.
* @since 0.20.0 * @since 0.20.0
@ -602,7 +605,7 @@ public class HTable implements HTableInterface {
/** /**
* Deletes the specified cells/rows in bulk. * Deletes the specified cells/rows in bulk.
* @param deletes List of things to delete. As a side effect, it will be modified: * @param deletes List of things to delete. As a side effect, it will be modified:
* successful {@link Delete}s are removed. The ordering of the list will not change. * successful {@link Delete}s are removed. The ordering of the list will not change.
* @throws IOException if a remote or network exception occurs. In that case * @throws IOException if a remote or network exception occurs. In that case
* the {@code deletes} argument will contain the {@link Delete} instances * the {@code deletes} argument will contain the {@link Delete} instances
* that have not be successfully applied. * that have not be successfully applied.

View File

@ -42,8 +42,8 @@ public class MultiResponse implements Writable {
// map of regionName to list of (Results paired to the original index for that // map of regionName to list of (Results paired to the original index for that
// Result) // Result)
private Map<byte[], List<Pair<Integer, Result>>> results = new TreeMap<byte[], List<Pair<Integer, Result>>>( private Map<byte[], List<Pair<Integer, Result>>> results =
Bytes.BYTES_COMPARATOR); new TreeMap<byte[], List<Pair<Integer, Result>>>(Bytes.BYTES_COMPARATOR);
public MultiResponse() { public MultiResponse() {
} }
@ -111,7 +111,7 @@ public class MultiResponse implements Writable {
for (int j = 0; j < listSize; j++) { for (int j = 0; j < listSize; j++) {
Integer idx = in.readInt(); Integer idx = in.readInt();
if (idx == -1) { if (idx == -1) {
lst.add(null); lst.add(null);
} else { } else {
Result r = (Result) HbaseObjectWritable.readObject(in, null); Result r = (Result) HbaseObjectWritable.readObject(in, null);
lst.add(new Pair<Integer, Result>(idx, r)); lst.add(new Pair<Integer, Result>(idx, r));

View File

@ -2330,9 +2330,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// actions in the list. // actions in the list.
Collections.sort(actionsForRegion); Collections.sort(actionsForRegion);
Row action = null; Row action = null;
List<Action> puts = new ArrayList<Action>();
try { try {
for (Action a : actionsForRegion) { for (Action a : actionsForRegion) {
action = a.getAction(); action = a.getAction();
// TODO catch exceptions so we can report them on a per-item basis.
if (action instanceof Delete) { if (action instanceof Delete) {
delete(regionName, (Delete) action); delete(regionName, (Delete) action);
response.add(regionName, new Pair<Integer, Result>( response.add(regionName, new Pair<Integer, Result>(
@ -2341,14 +2343,50 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
response.add(regionName, new Pair<Integer, Result>( response.add(regionName, new Pair<Integer, Result>(
a.getOriginalIndex(), get(regionName, (Get) action))); a.getOriginalIndex(), get(regionName, (Get) action)));
} else if (action instanceof Put) { } else if (action instanceof Put) {
put(regionName, (Put) action); puts.add(a);
response.add(regionName, new Pair<Integer, Result>(
a.getOriginalIndex(), new Result()));
} else { } else {
LOG.debug("Error: invalid Action, row must be a Get, Delete or Put."); LOG.debug("Error: invalid Action, row must be a Get, Delete or Put.");
throw new IllegalArgumentException("Invalid Action, row must be a Get, Delete or Put."); throw new IllegalArgumentException("Invalid Action, row must be a Get, Delete or Put.");
} }
} }
// We do the puts with result.put so we can get the batching efficiency
// we so need. All this data munging doesn't seem great, but at least
// we arent copying bytes or anything.
if (!puts.isEmpty()) {
HRegion region = getRegion(regionName);
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
Pair<Put,Integer> [] putsWithLocks = new Pair[puts.size()];
int i = 0;
for (Action a : puts) {
Put p = (Put) a.getAction();
Integer lock = getLockFromId(p.getLockId());
putsWithLocks[i++] = new Pair<Put, Integer>(p, lock);
}
this.requestCount.addAndGet(puts.size());
OperationStatusCode[] codes = region.put(putsWithLocks);
for( i = 0 ; i < codes.length ; i++) {
OperationStatusCode code = codes[i];
Action theAction = puts.get(i);
Result result = null;
if (code == OperationStatusCode.SUCCESS) {
result = new Result();
}
// TODO turning the alternate exception into a different result
response.add(regionName,
new Pair<Integer, Result>(
theAction.getOriginalIndex(), result));
}
}
} catch (IOException ioe) { } catch (IOException ioe) {
if (multi.size() == 1) throw ioe; if (multi.size() == 1) throw ioe;
LOG.debug("Exception processing " + LOG.debug("Exception processing " +

View File

@ -140,7 +140,7 @@ public class TestMultiParallel {
/** /**
* Only run one Multi test with a forced RegionServer abort. Otherwise, the * Only run one Multi test with a forced RegionServer abort. Otherwise, the
* unit tests will take an unnecessarily long time to run. * unit tests will take an unnecessarily long time to run.
* *
* @throws Exception * @throws Exception
*/ */
@Test public void testFlushCommitsWithAbort() throws Exception { @Test public void testFlushCommitsWithAbort() throws Exception {
@ -354,17 +354,11 @@ public class TestMultiParallel {
get.addColumn(BYTES_FAMILY, QUALIFIER); get.addColumn(BYTES_FAMILY, QUALIFIER);
actions.add(get); actions.add(get);
// 5 get of the put in #2 (entire family) // There used to be a 'get' of a previous put here, but removed
get = new Get(KEYS[10]); // since this API really cannot guarantee order in terms of mixed
get.addFamily(BYTES_FAMILY); // get/puts.
actions.add(get);
// 6 get of the delete from #3 // 5 put of new column
get = new Get(KEYS[20]);
get.addColumn(BYTES_FAMILY, QUALIFIER);
actions.add(get);
// 7 put of new column
put = new Put(KEYS[40]); put = new Put(KEYS[40]);
put.add(BYTES_FAMILY, qual2, val2); put.add(BYTES_FAMILY, qual2, val2);
actions.add(put); actions.add(put);
@ -378,10 +372,7 @@ public class TestMultiParallel {
validateEmpty(results[2]); validateEmpty(results[2]);
validateEmpty(results[3]); validateEmpty(results[3]);
validateResult(results[4]); validateResult(results[4]);
validateResult(results[5]); validateEmpty(results[5]);
validateResult(results[5], qual2, val2); // testing second column in #5
validateEmpty(results[6]); // deleted
validateEmpty(results[7]);
// validate last put, externally from the batch // validate last put, externally from the batch
get = new Get(KEYS[40]); get = new Get(KEYS[40]);