HBASE-7503 Add exists(List) in HTableInterface to allow multiple parallel exists at one time (Jean-Marc)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1440454 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2013-01-30 15:10:40 +00:00
parent 023a37dd25
commit 1641957baa
11 changed files with 2255 additions and 127 deletions

View File

@ -71,8 +71,22 @@ message GetRequest {
optional bool closestRowBefore = 3;
// The result isn't asked for, just check for
// the existence. If specified, closestRowBefore
// will be ignored
// the existence. If closestRowBefore specified,
// this will be ignored
optional bool existenceOnly = 4;
}
message MultiGetRequest {
required RegionSpecifier region = 1;
repeated Get get = 2;
// If the row to get doesn't exist, return the
// closest row before.
optional bool closestRowBefore = 3;
// The result isn't asked for, just check for
// the existence. If closestRowBefore specified,
// this will be ignored
optional bool existenceOnly = 4;
}
@ -83,6 +97,13 @@ message GetResponse {
optional bool exists = 2;
}
message MultiGetResponse {
repeated Result result = 1;
// used for Get to check existence only
repeated bool exists = 2;
}
/**
* Condition to check if the value of a given cell (row,
* family, qualifier) matches a value via a given comparator.
@ -299,6 +320,9 @@ service ClientService {
rpc get(GetRequest)
returns(GetResponse);
rpc multiGet(MultiGetRequest)
returns(MultiGetResponse);
rpc mutate(MutateRequest)
returns(MutateResponse);

View File

@ -24,6 +24,7 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
@ -871,6 +874,147 @@ public class HTable implements HTableInterface {
}.withRetries();
}
/**
* Goal of this inner class is to keep track of the initial position of a get in a list before
* sorting it. This is used to send back results in the same orders we got the Gets before we sort
* them.
*/
private static class SortedGet implements Comparable<SortedGet> {
protected int initialIndex = -1; // Used to store the get initial index in a list.
protected Get get; // Encapsulated Get instance.
public SortedGet (Get get, int initialIndex) {
this.get = get;
this.initialIndex = initialIndex;
}
public int getInitialIndex() {
return initialIndex;
}
@Override
public int compareTo(SortedGet o) {
return get.compareTo(o.get);
}
public Get getGet() {
return get;
}
@Override
public int hashCode() {
return get.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof SortedGet)
return get.equals(((SortedGet)obj).get);
else
return false;
}
}
/**
* {@inheritDoc}
*/
@Override
public Boolean[] exists(final List<Get> gets) throws IOException {
// Prepare the sorted list of gets. Take the list of gets received, and encapsulate them into
// a list of SortedGet instances. Simple list parsing, so complexity here is O(n)
// The list is later used to recreate the response order based on the order the Gets
// got received.
ArrayList<SortedGet> sortedGetsList = new ArrayList<HTable.SortedGet>();
for (int indexGet = 0; indexGet < gets.size(); indexGet++) {
sortedGetsList.add(new SortedGet (gets.get(indexGet), indexGet));
}
// Sorting the list to get the Gets ordered based on the key.
Collections.sort(sortedGetsList); // O(n log n)
// step 1: sort the requests by regions to send them bundled.
// Map key is startKey index. Map value is the list of Gets related to the region starting
// with the startKey.
Map<Integer, List<Get>> getsByRegion = new HashMap<Integer, List<Get>>();
// Reference map to quickly find back in which region a get belongs.
Map<Get, Integer> getToRegionIndexMap = new HashMap<Get, Integer>();
Pair<byte[][], byte[][]> startEndKeys = getStartEndKeys();
int regionIndex = 0;
for (final SortedGet get : sortedGetsList) {
// Progress on the regions until we find the one the current get resides in.
while ((regionIndex < startEndKeys.getSecond().length) && ((Bytes.compareTo(startEndKeys.getSecond()[regionIndex], get.getGet().getRow()) <= 0))) {
regionIndex++;
}
List<Get> regionGets = getsByRegion.get(regionIndex);
if (regionGets == null) {
regionGets = new ArrayList<Get>();
getsByRegion.put(regionIndex, regionGets);
}
regionGets.add(get.getGet());
getToRegionIndexMap.put(get.getGet(), regionIndex);
}
// step 2: make the requests
Map<Integer, Future<List<Boolean>>> futures =
new HashMap<Integer, Future<List<Boolean>>>(sortedGetsList.size());
for (final Map.Entry<Integer, List<Get>> getsByRegionEntry : getsByRegion.entrySet()) {
Callable<List<Boolean>> callable = new Callable<List<Boolean>>() {
public List<Boolean> call() throws Exception {
return new ServerCallable<List<Boolean>>(connection, tableName, getsByRegionEntry.getValue()
.get(0).getRow(), operationTimeout) {
public List<Boolean> call() throws IOException {
try {
MultiGetRequest requests = RequestConverter.buildMultiGetRequest(location
.getRegionInfo().getRegionName(), getsByRegionEntry.getValue(), true, false);
MultiGetResponse responses = server.multiGet(null, requests);
return responses.getExistsList();
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
}.withRetries();
}
};
futures.put(getsByRegionEntry.getKey(), pool.submit(callable));
}
// step 3: collect the failures and successes
Map<Integer, List<Boolean>> responses = new HashMap<Integer, List<Boolean>>();
for (final Map.Entry<Integer, List<Get>> sortedGetEntry : getsByRegion.entrySet()) {
try {
Future<List<Boolean>> future = futures.get(sortedGetEntry.getKey());
List<Boolean> resp = future.get();
if (resp == null) {
LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
}
responses.put(sortedGetEntry.getKey(), resp);
} catch (ExecutionException e) {
LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
} catch (InterruptedException e) {
LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
Thread.currentThread().interrupt();
}
}
Boolean[] results = new Boolean[sortedGetsList.size()];
// step 4: build the response.
Map<Integer, Integer> indexes = new HashMap<Integer, Integer>();
for (int i = 0; i < sortedGetsList.size(); i++) {
Integer regionInfoIndex = getToRegionIndexMap.get(sortedGetsList.get(i).getGet());
Integer index = indexes.get(regionInfoIndex);
if (index == null) {
index = 0;
}
results[sortedGetsList.get(i).getInitialIndex()] = responses.get(regionInfoIndex).get(index);
indexes.put(regionInfoIndex, index + 1);
}
return results;
}
/**
* {@inheritDoc}
*/

View File

@ -64,7 +64,7 @@ public interface HTableInterface extends Closeable {
HTableDescriptor getTableDescriptor() throws IOException;
/**
* Test for the existence of columns in the table, as specified in the Get.
* Test for the existence of columns in the table, as specified by the Get.
* <p>
*
* This will return true if the Get matches one or more keys, false if not.
@ -79,6 +79,23 @@ public interface HTableInterface extends Closeable {
*/
boolean exists(Get get) throws IOException;
/**
* Test for the existence of columns in the table, as specified by the Gets.
* <p>
*
* This will return an array of booleans. Each value will be true if the related Get matches
* one or more keys, false if not.
* <p>
*
* This is a server-side call so it prevents any data from being transfered to
* the client.
*
* @param gets the Gets
* @return Array of Boolean true if the specified Get matches one or more keys, false if not
* @throws IOException e
*/
Boolean[] exists(List<Get> gets) throws IOException;
/**
* Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations.
* The ordering of execution of the actions is not defined. Meaning if you do a Put and a

View File

@ -350,6 +350,11 @@ public class HTablePool implements Closeable {
return table.exists(get);
}
@Override
public Boolean[] exists(List<Get> gets) throws IOException {
return table.exists(gets);
}
@Override
public void batch(List<? extends Row> actions, Object[] results) throws IOException,
InterruptedException {

View File

@ -47,6 +47,8 @@ import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URL;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.*;
import java.util.concurrent.ConcurrentMap;
import java.util.jar.JarEntry;
@ -232,7 +234,7 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
// NOTE: Path.toURL is deprecated (toURI instead) but the URLClassLoader
// unsurprisingly wants URLs, not URIs; so we will use the deprecated
// method which returns URLs for as long as it is available
List<URL> paths = new ArrayList<URL>();
final List<URL> paths = new ArrayList<URL>();
URL url = dst.getCanonicalFile().toURL();
paths.add(url);
@ -250,7 +252,13 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
}
jarFile.close();
cl = new CoprocessorClassLoader(paths, this.getClass().getClassLoader());
cl = AccessController.doPrivileged(new PrivilegedAction<CoprocessorClassLoader>() {
@Override
public CoprocessorClassLoader run() {
return new CoprocessorClassLoader(paths, this.getClass().getClassLoader());
}
});
// cache cp classloader as a weak value, will be GC'ed when no reference left
ClassLoader prev = classLoadersCache.putIfAbsent(path, cl);
if (prev != null) {
@ -472,6 +480,10 @@ public abstract class CoprocessorHost<E extends CoprocessorEnvironment> {
return table.exists(get);
}
public Boolean[] exists(List<Get> gets) throws IOException{
return table.exists(gets);
}
public void put(Put put) throws IOException {
table.put(put);
}

View File

@ -68,6 +68,7 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Column;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Condition;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiAction;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate.ColumnValue;
@ -166,7 +167,7 @@ public final class RequestConverter {
* @param regionName the name of the region to get
* @param get the client Get
* @param existenceOnly indicate if check row existence only
* @return a protocol buffer GetReuqest
* @return a protocol buffer GetRequest
*/
public static GetRequest buildGetRequest(final byte[] regionName,
final Get get, final boolean existenceOnly) throws IOException {
@ -179,6 +180,27 @@ public final class RequestConverter {
return builder.build();
}
/**
* Create a protocol buffer MultiGetRequest for client Gets All gets are going to be run against
* the same region.
* @param regionName the name of the region to get from
* @param gets the client Gets
* @param existenceOnly indicate if check rows existence only
* @return a protocol buffer MultiGetRequest
*/
public static MultiGetRequest buildMultiGetRequest(final byte[] regionName, final List<Get> gets,
final boolean existenceOnly, final boolean closestRowBefore) throws IOException {
MultiGetRequest.Builder builder = MultiGetRequest.newBuilder();
RegionSpecifier region = buildRegionSpecifier(RegionSpecifierType.REGION_NAME, regionName);
builder.setExistenceOnly(existenceOnly);
builder.setClosestRowBefore(closestRowBefore);
builder.setRegion(region);
for (Get get : gets) {
builder.addGet(ProtobufUtil.toGet(get));
}
return builder.build();
}
/**
* Create a protocol buffer MutateRequest for a client increment
*

View File

@ -150,6 +150,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServic
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.Mutate;
@ -221,7 +223,7 @@ import com.google.protobuf.ServiceException;
*/
@InterfaceAudience.Private
@SuppressWarnings("deprecation")
public class HRegionServer implements ClientProtocol,
public class HRegionServer implements ClientProtocol,
AdminProtocol, Runnable, RegionServerServices, HBaseRPCErrorHandler, LastSequenceId {
public static final Log LOG = LogFactory.getLog(HRegionServer.class);
@ -2728,6 +2730,64 @@ public class HRegionServer implements ClientProtocol,
}
}
/**
* Get multi data from a table.
*
* @param controller the RPC controller
* @param request multi-the get request
* @throws ServiceException
*/
@Override
public MultiGetResponse multiGet(final RpcController controller, final MultiGetRequest request)
throws ServiceException {
long before = EnvironmentEdgeManager.currentTimeMillis();
try {
requestCount.add(request.getGetCount());
HRegion region = getRegion(request.getRegion());
MultiGetResponse.Builder builder = MultiGetResponse.newBuilder();
for (ClientProtos.Get get: request.getGetList())
{
Boolean existence = null;
Result r = null;
if (request.getClosestRowBefore()) {
if (get.getColumnCount() != 1) {
throw new DoNotRetryIOException(
"get ClosestRowBefore supports one and only one family now, not "
+ get.getColumnCount() + " families");
}
byte[] row = get.getRow().toByteArray();
byte[] family = get.getColumn(0).getFamily().toByteArray();
r = region.getClosestRowBefore(row, family);
} else {
Get clientGet = ProtobufUtil.toGet(get);
if (request.getExistenceOnly() && region.getCoprocessorHost() != null) {
existence = region.getCoprocessorHost().preExists(clientGet);
}
if (existence == null) {
r = region.get(clientGet);
if (request.getExistenceOnly()) {
boolean exists = r != null && !r.isEmpty();
if (region.getCoprocessorHost() != null) {
exists = region.getCoprocessorHost().postExists(clientGet, exists);
}
existence = exists;
}
}
}
if (existence != null) {
builder.addExists(existence.booleanValue());
} else if (r != null) {
builder.addResult(ProtobufUtil.toResult(r));
}
}
return builder.build();
} catch (IOException ie) {
throw new ServiceException(ie);
} finally {
metricsRegionServer.updateGet(EnvironmentEdgeManager.currentTimeMillis() - before);
}
}
/**
* Mutate data in a table.
*

View File

@ -344,6 +344,19 @@ public class RemoteHTable implements HTableInterface {
return (result != null && !(result.isEmpty()));
}
/**
* exists(List) is really a list of get() calls. Just use get().
* @param gets list of Get to test for the existence
*/
public Boolean[] exists(List<Get> gets) throws IOException {
LOG.warn("exists(List<Get>) is really list of get() calls, just use get()");
Boolean[] results = new Boolean[gets.size()];
for (int i = 0; i < results.length; i++) {
results[i] = exists(gets.get(i));
}
return results;
}
public void put(Put put) throws IOException {
CellSetModel model = buildModelFromPut(put);
StringBuilder sb = new StringBuilder();

View File

@ -19,9 +19,11 @@
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
@ -52,10 +54,15 @@ public class TestFromClientSide3 {
private static byte[] FAMILY = Bytes.toBytes("testFamily");
private static Random random = new Random();
private static int SLAVES = 3;
private static byte [] ROW = Bytes.toBytes("testRow");
private static final byte[] ANOTHERROW = Bytes.toBytes("anotherrow");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
private final static byte[] COL_QUAL = Bytes.toBytes("f1");
private final static byte[] VAL_BYTES = Bytes.toBytes("v1");
private final static byte[] ROW_BYTES = Bytes.toBytes("r1");
/**
* @throws java.lang.Exception
*/
@ -260,6 +267,116 @@ public class TestFromClientSide3 {
"hbase.hstore.compaction.min"));
}
@Test
public void testHTableExistsMethodSingleRegionSingleGet() throws Exception {
// Test with a single region table.
HTable table = TEST_UTIL.createTable(
Bytes.toBytes("testHTableExistsMethodSingleRegionSingleGet"), new byte[][] { FAMILY });
Put put = new Put(ROW);
put.add(FAMILY, QUALIFIER, VALUE);
Get get = new Get(ROW);
boolean exist = table.exists(get);
assertEquals(exist, false);
table.put(put);
exist = table.exists(get);
assertEquals(exist, true);
}
public void testHTableExistsMethodSingleRegionMultipleGets() throws Exception {
HTable table = TEST_UTIL.createTable(
Bytes.toBytes("testHTableExistsMethodSingleRegionMultipleGets"), new byte[][] { FAMILY });
Put put = new Put(ROW);
put.add(FAMILY, QUALIFIER, VALUE);
table.put(put);
List<Get> gets = new ArrayList<Get>();
gets.add(new Get(ROW));
gets.add(null);
gets.add(new Get(ANOTHERROW));
Boolean[] results = table.exists(gets);
assertEquals(results[0], true);
assertEquals(results[1], false);
assertEquals(results[2], false);
}
@Test
public void testHTableExistsMethodMultipleRegionsSingleGet() throws Exception {
HTable table = TEST_UTIL.createTable(
Bytes.toBytes("testHTableExistsMethodMultipleRegionsSingleGet"), new byte[][] { FAMILY }, 1,
new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255);
Put put = new Put(ROW);
put.add(FAMILY, QUALIFIER, VALUE);
Get get = new Get(ROW);
boolean exist = table.exists(get);
assertEquals(exist, false);
table.put(put);
exist = table.exists(get);
assertEquals(exist, true);
}
@Test
public void testHTableExistsMethodMultipleRegionsMultipleGets() throws Exception {
HTable table = TEST_UTIL.createTable(
Bytes.toBytes("testHTableExistsMethodMultipleRegionsMultipleGets"), new byte[][] { FAMILY },
1, new byte[] { 0x00 }, new byte[] { (byte) 0xff }, 255);
Put put = new Put(ROW);
put.add(FAMILY, QUALIFIER, VALUE);
table.put (put);
List<Get> gets = new ArrayList<Get>();
gets.add(new Get(ANOTHERROW));
gets.add(new Get(Bytes.add(ROW, new byte[] { 0x00 })));
gets.add(new Get(ROW));
gets.add(new Get(Bytes.add(ANOTHERROW, new byte[] { 0x00 })));
Boolean[] results = table.exists(gets);
assertEquals(results[0], false);
assertEquals(results[1], false);
assertEquals(results[2], true);
assertEquals(results[3], false);
// Test with the first region.
put = new Put(new byte[] { 0x00 });
put.add(FAMILY, QUALIFIER, VALUE);
table.put(put);
gets = new ArrayList<Get>();
gets.add(new Get(new byte[] { 0x00 }));
gets.add(new Get(new byte[] { 0x00, 0x00 }));
results = table.exists(gets);
assertEquals(results[0], true);
assertEquals(results[1], false);
// Test with the last region
put = new Put(new byte[] { (byte) 0xff, (byte) 0xff });
put.add(FAMILY, QUALIFIER, VALUE);
table.put(put);
gets = new ArrayList<Get>();
gets.add(new Get(new byte[] { (byte) 0xff }));
gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff }));
gets.add(new Get(new byte[] { (byte) 0xff, (byte) 0xff, (byte) 0xff }));
results = table.exists(gets);
assertEquals(results[0], false);
assertEquals(results[1], true);
assertEquals(results[2], false);
}
@Test
public void testGetEmptyRow() throws Exception {
//Create a table and put in 1 row

View File

@ -67,6 +67,8 @@ import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileRequ
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.BulkLoadHFileResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
@ -324,6 +326,22 @@ class MockRegionServer implements AdminProtocol, ClientProtocol, RegionServerSer
return builder.build();
}
@Override
public MultiGetResponse multiGet(RpcController controller, MultiGetRequest requests)
throws ServiceException {
byte[] regionName = requests.getRegion().getValue().toByteArray();
Map<byte [], Result> m = this.gets.get(regionName);
MultiGetResponse.Builder builder = MultiGetResponse.newBuilder();
if (m != null) {
for (ClientProtos.Get get: requests.getGetList()) {
byte[] row = get.getRow().toByteArray();
builder.addResult(ProtobufUtil.toResult(m.get(row)));
}
}
return builder.build();
}
@Override
public MutateResponse mutate(RpcController controller, MutateRequest request)
throws ServiceException {