HBASE-2208 TableServers # processBatchOfRows - converts from List to [ ] - Expensive copy

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@953802 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2010-06-11 18:09:48 +00:00
parent 4193875580
commit 53b3274251
4 changed files with 59 additions and 58 deletions

View File

@ -682,6 +682,8 @@ Release 0.21.0 - Unreleased
HBASE-2558 Our javadoc overview -- "Getting Started", requirements, etc. --
is not carried across by mvn javadoc:javadoc target
HBASE-2618 Don't inherit from HConstants (Benoit Sigoure via Stack)
HBASE-2208 TableServers # processBatchOfRows - converts from List to [ ]
- Expensive copy
NEW FEATURES
HBASE-1961 HBase EC2 scripts

View File

@ -70,10 +70,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
*
* Used by {@link HTable} and {@link HBaseAdmin}
*/
@SuppressWarnings("serial")
public class HConnectionManager {
private static final Delete[] DELETE_ARRAY_TYPE = new Delete[] {};
private static final Put[] PUT_ARRAY_TYPE = new Put[] {};
// Register a shutdown hook, one that cleans up RPC and closes zk sessions.
static {
Runtime.getRuntime().addShutdownHook(new Thread("HCM.shutdownHook") {
@ -652,7 +650,6 @@ public class HConnectionManager {
* Search one of the meta tables (-ROOT- or .META.) for the HRegionLocation
* info that contains the table and row we're seeking.
*/
@SuppressWarnings({"ConstantConditions"})
private HRegionLocation locateRegionInMeta(final byte [] parentTable,
final byte [] tableName, final byte [] row, boolean useCache,
Object regionLockObject)
@ -1061,7 +1058,6 @@ public class HConnectionManager {
HRegionInfo.ROOT_REGIONINFO, rootRegionAddress);
}
@SuppressWarnings({"ConstantConditions"})
public <T> T getRegionServerWithRetries(ServerCallable<T> callable)
throws IOException, RuntimeException {
List<Throwable> exceptions = new ArrayList<Throwable>();
@ -1101,7 +1097,6 @@ public class HConnectionManager {
}
}
@SuppressWarnings({"ConstantConditions"})
private HRegionLocation
getRegionLocationForRowWithRetries(byte[] tableName, byte[] rowKey,
boolean reload)
@ -1258,11 +1253,12 @@ public class HConnectionManager {
if (list.isEmpty()) return 0;
if (list.size() > 1) Collections.sort(list);
Batch b = new Batch(this) {
@SuppressWarnings("unchecked")
@Override
int doCall(final List<? extends Row> currentList, final byte [] row,
final byte [] tableName)
throws IOException, RuntimeException {
final Put [] puts = currentList.toArray(PUT_ARRAY_TYPE);
final List<Put> puts = (List<Put>)currentList;
return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
tableName, row) {
public Integer call() throws IOException {
@ -1280,11 +1276,12 @@ public class HConnectionManager {
if (list.isEmpty()) return 0;
if (list.size() > 1) Collections.sort(list);
Batch b = new Batch(this) {
@SuppressWarnings("unchecked")
@Override
int doCall(final List<? extends Row> currentList, final byte [] row,
final byte [] tableName)
throws IOException, RuntimeException {
final Delete [] deletes = currentList.toArray(DELETE_ARRAY_TYPE);
final List<Delete> deletes = (List<Delete>)currentList;
return getRegionServerWithRetries(new ServerCallable<Integer>(this.c,
tableName, row) {
public Integer call() throws IOException {

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import java.io.IOException;
import java.util.List;
/**
* Clients interact with HRegionServers using a handle to the HRegionInterface.
@ -102,12 +103,12 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
* Put an array of puts into the specified region
*
* @param regionName region name
* @param puts array of puts to execute
* @param puts List of puts to execute
* @return The number of processed put's. Returns -1 if all Puts
* processed successfully.
* @throws IOException e
*/
public int put(final byte[] regionName, final Put [] puts)
public int put(final byte[] regionName, final List<Put> puts)
throws IOException;
/**
@ -125,12 +126,12 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
* Put an array of deletes into the specified region
*
* @param regionName region name
* @param deletes delete array to execute
* @param deletes delete List to execute
* @return The number of processed deletes. Returns -1 if all Deletes
* processed successfully.
* @throws IOException e
*/
public int delete(final byte[] regionName, final Delete [] deletes)
public int delete(final byte[] regionName, final List<Delete> deletes)
throws IOException;
/**

View File

@ -19,6 +19,37 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Constructor;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -28,7 +59,6 @@ import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HMsg;
import org.apache.hadoop.hbase.HMsg.Type;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
@ -38,14 +68,14 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LeaseListener;
import org.apache.hadoop.hbase.Leases;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.PleaseHoldException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.UnknownRowLockException;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.YouAreDeadException;
import org.apache.hadoop.hbase.HMsg.Type;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.MultiPut;
@ -80,37 +110,6 @@ import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryUsage;
import java.lang.management.RuntimeMXBean;
import java.lang.reflect.Constructor;
import java.net.BindException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* HRegionServer makes a set of HRegions available to clients. It checks in with
* the HMaster. There are many HRegionServers in a single HBase deployment.
@ -1659,7 +1658,7 @@ public class HRegionServer implements HRegionInterface,
}
}
public int put(final byte[] regionName, final Put [] puts)
public int put(final byte[] regionName, final List<Put> puts)
throws IOException {
// Count of Puts processed.
int i = 0;
@ -1671,11 +1670,11 @@ public class HRegionServer implements HRegionInterface,
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
for (i = 0; i < puts.length; i++) {
for (Put put: puts) {
this.requestCount.incrementAndGet();
Integer lock = getLockFromId(puts[i].getLockId());
writeToWAL &= puts[i].getWriteToWAL();
region.put(puts[i], lock);
Integer lock = getLockFromId(put.getLockId());
writeToWAL &= put.getWriteToWAL();
region.put(put, lock);
}
} catch (WrongRegionException ex) {
@ -1898,7 +1897,7 @@ public class HRegionServer implements HRegionInterface,
}
}
public int delete(final byte[] regionName, final Delete [] deletes)
public int delete(final byte[] regionName, final List<Delete> deletes)
throws IOException {
// Count of Deletes processed.
int i = 0;
@ -1910,11 +1909,13 @@ public class HRegionServer implements HRegionInterface,
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
Integer[] locks = new Integer[deletes.length];
for (i = 0; i < deletes.length; i++) {
int size = deletes.size();
Integer[] locks = new Integer[size];
for (Delete delete: deletes) {
this.requestCount.incrementAndGet();
locks[i] = getLockFromId(deletes[i].getLockId());
region.delete(deletes[i], locks[i], writeToWAL);
locks[i] = getLockFromId(delete.getLockId());
region.delete(delete, locks[i], writeToWAL);
i++;
}
} catch (WrongRegionException ex) {
LOG.debug("Batch deletes: " + i, ex);
@ -2335,8 +2336,8 @@ public class HRegionServer implements HRegionInterface,
MultiPutResponse resp = new MultiPutResponse();
// do each region as it's own.
for( Map.Entry<byte[],List<Put>> e: puts.puts.entrySet()) {
int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
for( Map.Entry<byte[], List<Put>> e: puts.puts.entrySet()) {
int result = put(e.getKey(), e.getValue());
resp.addResult(e.getKey(), result);
e.getValue().clear(); // clear some RAM