HBASE-2066

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@909235 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Ryan Rawson 2010-02-12 05:06:06 +00:00
parent 94fdc559e7
commit c6ee51819a
13 changed files with 560 additions and 44 deletions

View File

@ -358,6 +358,7 @@ Release 0.21.0 - Unreleased
HBASE-2209 Support of List [ ] in HBaseOutputWritable for serialization
(Kay Kay via Stack)
HBASE-2177 Add timestamping to gc logging option
HBASE-2066 Perf: parallelize puts
NEW FEATURES
HBASE-1961 HBase EC2 scripts

View File

@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.HServerAddress;
@ -214,4 +215,9 @@ public interface HConnection {
*/
public int processBatchOfDeletes(List<Delete> list, byte[] tableName)
throws IOException;
public void processBatchOfPuts(List<Put> list,
final byte[] tableName, ExecutorService pool) throws IOException;
}

View File

@ -29,6 +29,10 @@ import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@ -831,6 +835,7 @@ public class HConnectionManager implements HConstants {
*/
private void deleteCachedLocation(final byte [] tableName,
final byte [] row) {
synchronized (this.cachedRegionLocations) {
SoftValueSortedMap<byte [], HRegionLocation> tableLocations =
getTableLocations(tableName);
@ -852,8 +857,9 @@ public class HConnectionManager implements HConstants {
// by nature of the map, we know that the start key has to be <
// otherwise it wouldn't be in the headMap.
if (KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length,
row, 0, row.length) <= 0) {
if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
KeyValue.getRowComparator(tableName).compareRows(endKey, 0, endKey.length,
row, 0, row.length) > 0) {
// delete any matching entry
HRegionLocation rl =
tableLocations.remove(matchingRegions.lastKey());
@ -866,6 +872,7 @@ public class HConnectionManager implements HConstants {
}
}
}
}
/*
* @param tableName
@ -1295,5 +1302,127 @@ public class HConnectionManager implements HConstants {
}
}
}
public void processBatchOfPuts(List<Put> list,
final byte[] tableName, ExecutorService pool) throws IOException {
for ( int tries = 0 ; tries < numRetries && !list.isEmpty(); ++tries) {
Collections.sort(list);
Map<HServerAddress, MultiPut> regionPuts =
new HashMap<HServerAddress, MultiPut>();
// step 1:
// break up into regionserver-sized chunks and build the data structs
for ( Put put : list ) {
byte [] row = put.getRow();
HRegionLocation loc = locateRegion(tableName, row, true);
HServerAddress address = loc.getServerAddress();
byte [] regionName = loc.getRegionInfo().getRegionName();
MultiPut mput = regionPuts.get(address);
if (mput == null) {
mput = new MultiPut(address);
regionPuts.put(address, mput);
}
mput.add(regionName, put);
}
// step 2:
// make the requests
// Discard the map, just use a list now, makes error recovery easier.
List<MultiPut> multiPuts = new ArrayList<MultiPut>(regionPuts.values());
List<Future<MultiPutResponse>> futures =
new ArrayList<Future<MultiPutResponse>>(regionPuts.size());
for ( MultiPut put : multiPuts ) {
futures.add(pool.submit(createPutCallable(put.address,
put,
tableName)));
}
// RUN!
List<Put> failed = new ArrayList<Put>();
// step 3:
// collect the failures and tries from step 1.
for (int i = 0; i < futures.size(); i++ ) {
Future<MultiPutResponse> future = futures.get(i);
MultiPut request = multiPuts.get(i);
try {
MultiPutResponse resp = future.get();
// For each region
for (Map.Entry<byte[], List<Put>> e : request.puts.entrySet()) {
Integer result = resp.getAnswer(e.getKey());
if (result == null) {
// failed
LOG.debug("Failed all for region: " +
Bytes.toStringBinary(e.getKey()) + ", removing from cache");
failed.addAll(e.getValue());
} else if (result >= 0) {
// some failures
List<Put> lst = e.getValue();
failed.addAll(lst.subList(result, lst.size()));
LOG.debug("Failed past " + result + " for region: " +
Bytes.toStringBinary(e.getKey()) + ", removing from cache");
}
}
} catch (InterruptedException e) {
// go into the failed list.
LOG.debug("Failed all from " + request.address, e);
failed.addAll(request.allPuts());
} catch (ExecutionException e) {
System.out.println(e);
// all go into the failed list.
LOG.debug("Failed all from " + request.address, e);
failed.addAll(request.allPuts());
}
}
list.clear();
if (!failed.isEmpty()) {
for (Put failedPut: failed) {
deleteCachedLocation(tableName, failedPut.getRow());
}
list.addAll(failed);
long sleepTime = getPauseTime(tries);
LOG.debug("processBatchOfPuts had some failures, sleeping for " + sleepTime +
" ms!");
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
}
}
}
if (!list.isEmpty()) {
// ran out of retries and didnt succeed everything!
throw new RetriesExhaustedException("Still had " + list.size() + " puts left after retrying " +
numRetries + " times. Should have detail on which Regions failed the most");
}
}
private Callable<MultiPutResponse> createPutCallable(
final HServerAddress address, final MultiPut puts,
final byte [] tableName) {
final HConnection connection = this;
return new Callable<MultiPutResponse>() {
public MultiPutResponse call() throws IOException {
return getRegionServerWithRetries(
new ServerCallable<MultiPutResponse>(connection, tableName, null) {
public MultiPutResponse call() throws IOException {
MultiPutResponse resp = server.multiPut(puts);
resp.request = puts;
return resp;
}
@Override
public void instantiateServer(boolean reload) throws IOException {
server = connection.getHRegionConnection(address);
}
}
);
}
};
}
}
}

View File

@ -27,6 +27,12 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -65,9 +71,10 @@ public class HTable implements HTableInterface {
private boolean autoFlush;
private long currentWriteBufferSize;
protected int scannerCaching;
private long maxScannerResultSize;
private int maxKeyValueSize;
private long maxScannerResultSize;
/**
* Creates an object to access a HBase table
*
@ -102,6 +109,7 @@ public class HTable implements HTableInterface {
this(conf, Bytes.toBytes(tableName));
}
/**
* Creates an object to access a HBase table.
*
@ -126,12 +134,37 @@ public class HTable implements HTableInterface {
this.autoFlush = true;
this.currentWriteBufferSize = 0;
this.scannerCaching = conf.getInt("hbase.client.scanner.caching", 1);
this.maxScannerResultSize = conf.getLong(
HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE);
this.maxKeyValueSize = conf.getInt("hbase.client.keyvalue.maxsize", -1);
int nrHRS = getCurrentNrHRS();
int nrThreads = conf.getInt("hbase.htable.threads.max", nrHRS);
// Unfortunately Executors.newCachedThreadPool does not allow us to
// set the maximum size of the pool, so we have to do it ourselves.
this.pool = new ThreadPoolExecutor(0, nrThreads,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new DaemonThreadFactory());
}
/**
* TODO Might want to change this to public, would be nice if the number
* of threads would automatically change when servers were added and removed
* @return the number of region servers that are currently running
* @throws IOException
*/
private int getCurrentNrHRS() throws IOException {
HBaseAdmin admin = new HBaseAdmin(this.configuration);
return admin.getClusterStatus().getServers();
}
// For multiput
private ExecutorService pool;
/**
* @param tableName name of table to check
* @return true if table is on-line
@ -591,11 +624,11 @@ public class HTable implements HTableInterface {
* @throws IOException
*/
public void flushCommits() throws IOException {
int last = 0;
try {
last = connection.processBatchOfRows(writeBuffer, tableName);
connection.processBatchOfPuts(writeBuffer,
tableName, pool);
} finally {
writeBuffer.subList(0, last).clear();
// the write buffer was adjsuted by processBatchOfPuts
currentWriteBufferSize = 0;
for (int i = 0; i < writeBuffer.size(); i++) {
currentWriteBufferSize += writeBuffer.get(i).heapSize();
@ -1016,4 +1049,31 @@ public class HTable implements HTableInterface {
};
}
}
static class DaemonThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DaemonThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (!t.isDaemon())
t.setDaemon(true);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
}

View File

@ -0,0 +1,107 @@
/*
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.HServerAddress;
import java.io.DataOutput;
import java.io.IOException;
import java.io.DataInput;
import java.util.List;
import java.util.Map;
import java.util.ArrayList;
import java.util.Collection;
import java.util.TreeMap;
public class MultiPut implements Writable {
public HServerAddress address; // client code ONLY
// map of regions to lists of puts for that region.
public Map<byte[], List<Put> > puts = new TreeMap<byte[], List<Put>>(Bytes.BYTES_COMPARATOR);
public MultiPut() {}
public MultiPut(HServerAddress a) {
address = a;
}
public int size() {
int size = 0;
for( List<Put> l : puts.values()) {
size += l.size();
}
return size;
}
public void add(byte[] regionName, Put aPut) {
List<Put> rsput = puts.get(regionName);
if (rsput == null) {
rsput = new ArrayList<Put>();
puts.put(regionName, rsput);
}
rsput.add(aPut);
}
public Collection<Put> allPuts() {
List<Put> res = new ArrayList<Put>();
for ( List<Put> pp : puts.values() ) {
res.addAll(pp);
}
return res;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(puts.size());
for( Map.Entry<byte[],List<Put>> e : puts.entrySet()) {
Bytes.writeByteArray(out, e.getKey());
List<Put> ps = e.getValue();
out.writeInt(ps.size());
for( Put p : ps ) {
p.write(out);
}
}
}
@Override
public void readFields(DataInput in) throws IOException {
puts.clear();
int mapSize = in.readInt();
for (int i = 0 ; i < mapSize; i++) {
byte[] key = Bytes.readByteArray(in);
int listSize = in.readInt();
List<Put> ps = new ArrayList<Put>(listSize);
for ( int j = 0 ; j < listSize; j++ ) {
Put put = new Put();
put.readFields(in);
ps.add(put);
}
puts.put(key, ps);
}
}
}

View File

@ -0,0 +1,71 @@
/*
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.DataOutput;
import java.io.IOException;
import java.io.DataInput;
import java.util.Map;
import java.util.List;
import java.util.ArrayList;
import java.util.TreeMap;
public class MultiPutResponse implements Writable {
public MultiPut request; // used in client code ONLY
public Map<byte[], Integer> answers = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
public MultiPutResponse() {}
public void addResult(byte[] regionName, int result) {
answers.put(regionName, result);
}
public Integer getAnswer(byte[] region) {
return answers.get(region);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(answers.size());
for( Map.Entry<byte[],Integer> e : answers.entrySet()) {
Bytes.writeByteArray(out, e.getKey());
out.writeInt(e.getValue());
}
}
@Override
public void readFields(DataInput in) throws IOException {
answers.clear();
int mapSize = in.readInt();
for( int i = 0 ; i < mapSize ; i++ ) {
byte[] key = Bytes.readByteArray(in);
int value = in.readInt();
answers.put(key, value);
}
}
}

View File

@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.MultiPutResponse;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.filter.*;
import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.io.MapWritable;
@ -157,6 +159,10 @@ public class HbaseObjectWritable implements Writable, Configurable {
addToMap(FirstKeyOnlyFilter.class, code++);
addToMap(Delete [].class, code++);
addToMap(MultiPut.class, code++);
addToMap(MultiPutResponse.class, code++);
addToMap(HLog.Entry.class, code++);
addToMap(HLog.Entry[].class, code++);
addToMap(HLogKey.class, code++);

View File

@ -74,7 +74,8 @@ public interface HBaseRPCProtocolVersion extends VersionedProtocol {
* <li>Version 20: Backed Transaction HBase out of HBase core.</li>
* <li>Version 21: HBASE-1665.</li>
* <li>Version 22: HBASE-2209. Added List support to RPC</li>
* <li>Version 23: HBASE-2066, multi-put.</li>
* </ul>
*/
public static final long versionID = 22L;
public static final long versionID = 23L;
}

View File

@ -29,6 +29,8 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.MultiPutResponse;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
@ -245,4 +247,15 @@ public interface HRegionInterface extends HBaseRPCProtocolVersion {
* @throws IOException
*/
public HServerInfo getHServerInfo() throws IOException;
/**
* Multi put for putting multiple regions worth of puts at once.
*
* @param puts the request
* @return the reply
* @throws IOException
*/
public MultiPutResponse multiPut(MultiPut puts) throws IOException;
}

View File

@ -82,6 +82,8 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.ServerConnection;
import org.apache.hadoop.hbase.client.ServerConnectionManager;
import org.apache.hadoop.hbase.client.MultiPutResponse;
import org.apache.hadoop.hbase.client.MultiPut;
import org.apache.hadoop.hbase.io.hfile.LruBlockCache;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler;
@ -1701,17 +1703,17 @@ public class HRegionServer implements HConstants, HRegionInterface,
if (!region.getRegionInfo().isMetaTable()) {
this.cacheFlusher.reclaimMemStoreMemory();
}
Integer[] locks = new Integer[puts.length];
for (i = 0; i < puts.length; i++) {
this.requestCount.incrementAndGet();
locks[i] = getLockFromId(puts[i].getLockId());
region.put(puts[i], locks[i]);
Integer lock = getLockFromId(puts[i].getLockId());
region.put(puts[i], lock);
}
} catch (WrongRegionException ex) {
LOG.debug("Batch puts: " + i, ex);
return i;
} catch (NotServingRegionException ex) {
LOG.debug("Batch puts: " + i, ex);
return i;
} catch (Throwable t) {
throw convertThrowableToIOE(cleanup(t));
@ -2427,4 +2429,20 @@ public class HRegionServer implements HConstants, HRegionInterface,
doMain(args, regionServerClass);
}
@Override
public MultiPutResponse multiPut(MultiPut puts) throws IOException {
MultiPutResponse resp = new MultiPutResponse();
// do each region as it's own.
for( Map.Entry<byte[],List<Put>> e: puts.puts.entrySet()) {
int result = put(e.getKey(), e.getValue().toArray(new Put[]{}));
resp.addResult(e.getKey(), result);
e.getValue().clear(); // clear some RAM
}
return resp;
}
}

View File

@ -82,6 +82,8 @@ public abstract class HBaseClusterTestCase extends HBaseTestCase {
}
/**
* Subclass hook.
*
* Run after dfs is ready but before hbase cluster is started up.
*/
protected void preHBaseClusterSetup() throws Exception {

View File

@ -28,7 +28,7 @@ import org.apache.hadoop.hbase.util.Bytes;
* Utility class to build a table of multiple regions.
*/
public class MultiRegionTable extends HBaseClusterTestCase {
private static final byte [][] KEYS = {
protected static final byte [][] KEYS = {
HConstants.EMPTY_BYTE_ARRAY,
Bytes.toBytes("bbb"),
Bytes.toBytes("ccc"),
@ -63,7 +63,12 @@ public class MultiRegionTable extends HBaseClusterTestCase {
* @param familyName the family to populate.
*/
public MultiRegionTable(final String familyName) {
super();
this(1, familyName);
}
public MultiRegionTable(int nServers, final String familyName) {
super(nServers);
this.columnFamily = Bytes.toBytes(familyName);
// These are needed for the new and improved Map/Reduce framework
System.setProperty("hadoop.log.dir", conf.get("hadoop.log.dir"));

View File

@ -0,0 +1,97 @@
/*
* Copyright 2009 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import java.util.List;
import java.util.ArrayList;
public class TestMultiParallelPut extends MultiRegionTable {
private static final byte[] VALUE = Bytes.toBytes("value");
private static final byte[] QUALIFIER = Bytes.toBytes("qual");
private static final String FAMILY = "family";
private static final String TEST_TABLE = "test_table";
private static final byte[] BYTES_FAMILY = Bytes.toBytes(FAMILY);
public TestMultiParallelPut() {
super(2, FAMILY);
desc = new HTableDescriptor(TEST_TABLE);
desc.addFamily(new HColumnDescriptor(FAMILY));
makeKeys();
}
private void makeKeys() {
for (byte [] k : KEYS) {
byte [] cp = new byte[k.length+1];
System.arraycopy(k, 0, cp, 0, k.length);
cp[k.length] = 1;
keys.add(cp);
}
}
List<byte[]> keys = new ArrayList<byte[]>();
public void testMultiPut() throws Exception {
HTable table = new HTable(TEST_TABLE);
table.setAutoFlush(false);
table.setWriteBufferSize(10 * 1024 * 1024);
for ( byte [] k : keys ) {
Put put = new Put(k);
put.add(BYTES_FAMILY, QUALIFIER, VALUE);
table.put(put);
}
table.flushCommits();
for (byte [] k : keys ) {
Get get = new Get(k);
get.addColumn(BYTES_FAMILY, QUALIFIER);
Result r = table.get(get);
assertTrue(r.containsColumn(BYTES_FAMILY, QUALIFIER));
assertEquals(0,
Bytes.compareTo(VALUE,
r.getValue(BYTES_FAMILY, QUALIFIER)));
}
HBaseAdmin admin = new HBaseAdmin(conf);
ClusterStatus cs = admin.getClusterStatus();
assertEquals(2, cs.getServers());
for ( HServerInfo info : cs.getServerInfo()) {
System.out.println(info);
assertTrue( info.getLoad().getNumberOfRegions() > 10);
}
}
}