HBASE-2403 [stargate] client HTable interface to REST connector

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@930491 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2010-04-03 08:02:11 +00:00
parent 62db1f1cf6
commit 53f0d0db3e
7 changed files with 975 additions and 21 deletions

View File

@ -479,6 +479,7 @@ Release 0.21.0 - Unreleased
HBASE-2388 Give a very explicit message when we figure a big GC pause
HBASE-2270 Improve how we handle recursive calls in ExplicitColumnTracker
and WildcardColumnTracker
HBASE-2402 [stargate] set maxVersions on gets
HBASE-2087 The wait on compaction because "Too many store files"
holds up all flushing
@ -509,7 +510,7 @@ Release 0.21.0 - Unreleased
(Alexey Kovyrin via Stack)
HBASE-2327 [EC2] Allocate elastic IP addresses for ZK and master nodes
HBASE-2319 [stargate] multiuser mode: request shaping
HBASE-2402 [stargate] set maxVersions on gets
HBASE-2403 [stargate] client HTable interface to REST connector
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite

View File

@ -96,6 +96,7 @@ public class RowResource implements Constants {
if (!generator.hasNext()) {
throw new WebApplicationException(Response.Status.NOT_FOUND);
}
int count = 0;
CellSetModel model = new CellSetModel();
KeyValue value = generator.next();
byte[] rowKey = value.getRow();
@ -109,6 +110,9 @@ public class RowResource implements Constants {
rowModel.addCell(
new CellModel(value.getFamily(), value.getQualifier(),
value.getTimestamp(), value.getValue()));
if (++count > rowspec.getMaxValues()) {
break;
}
value = generator.next();
} while (value != null);
model.addRow(rowModel);

View File

@ -37,8 +37,6 @@ public class RowSpec {
public static final long DEFAULT_START_TIMESTAMP = 0;
public static final long DEFAULT_END_TIMESTAMP = Long.MAX_VALUE;
private static final String versionPrefix = "?v=";
private byte[] row = HConstants.EMPTY_START_ROW;
private byte[] endRow = null;
private TreeSet<byte[]> columns =
@ -46,6 +44,7 @@ public class RowSpec {
private long startTime = DEFAULT_START_TIMESTAMP;
private long endTime = DEFAULT_END_TIMESTAMP;
private int maxVersions = HColumnDescriptor.DEFAULT_VERSIONS;
private int maxValues = Integer.MAX_VALUE;
public RowSpec(String path) throws IllegalArgumentException {
int i = 0;
@ -55,7 +54,7 @@ public class RowSpec {
i = parseRowKeys(path, i);
i = parseColumns(path, i);
i = parseTimestamp(path, i);
i = parseMaxVersions(path, i);
i = parseQueryParams(path, i);
}
private int parseRowKeys(final String path, int i)
@ -201,14 +200,54 @@ public class RowSpec {
return i;
}
private int parseMaxVersions(final String path, int i) {
if (i >= path.length()) {
return i;
}
String s = path.substring(i);
if (s.startsWith(versionPrefix)) {
this.maxVersions = Integer.valueOf(s.substring(versionPrefix.length()));
i += s.length();
private int parseQueryParams(final String path, int i) {
while (i < path.length()) {
char c = path.charAt(i);
if (c != '?' && c != '&') {
break;
}
if (++i > path.length()) {
break;
}
char what = path.charAt(i);
if (++i > path.length()) {
break;
}
c = path.charAt(i);
if (c != '=') {
throw new IllegalArgumentException("malformed query parameter");
}
if (++i > path.length()) {
break;
}
switch (what) {
case 'm': {
StringBuilder sb = new StringBuilder();
while (i <= path.length()) {
c = path.charAt(i);
if (c < '0' || c > '9') {
i--;
break;
}
sb.append(c);
}
maxVersions = Integer.valueOf(sb.toString());
} break;
case 'n': {
StringBuilder sb = new StringBuilder();
while (i <= path.length()) {
c = path.charAt(i);
if (c < '0' || c > '9') {
i--;
break;
}
sb.append(c);
}
maxValues = Integer.valueOf(sb.toString());
} break;
default:
throw new IllegalArgumentException("unknown parameter '" + c + "'");
}
}
return i;
}
@ -251,6 +290,14 @@ public class RowSpec {
this.maxVersions = maxVersions;
}
public int getMaxValues() {
return maxValues;
}
public void setMaxValues(final int maxValues) {
this.maxValues = maxValues;
}
public boolean hasColumns() {
return !columns.isEmpty();
}
@ -325,6 +372,8 @@ public class RowSpec {
result.append(Long.toString(endTime));
result.append(", maxVersions => ");
result.append(Integer.toString(maxVersions));
result.append(", maxValues => ");
result.append(Integer.toString(maxValues));
result.append("}");
return result.toString();
}

View File

@ -25,6 +25,7 @@ import java.io.IOException;
import javax.ws.rs.DELETE;
import javax.ws.rs.GET;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.CacheControl;
import javax.ws.rs.core.Context;
@ -50,7 +51,7 @@ public class ScannerInstanceResource implements Constants {
User user;
ResultGenerator generator;
String id;
int batch;
int batch = 1;
RESTServlet servlet;
CacheControl cacheControl;
@ -68,7 +69,9 @@ public class ScannerInstanceResource implements Constants {
@GET
@Produces({MIMETYPE_XML, MIMETYPE_JSON, MIMETYPE_PROTOBUF})
public Response get(final @Context UriInfo uriInfo) throws IOException {
public Response get(final @Context UriInfo uriInfo,
@QueryParam("n") int maxRows, final @QueryParam("c") int maxValues)
throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("GET " + uriInfo.getAbsolutePath());
}
@ -76,7 +79,11 @@ public class ScannerInstanceResource implements Constants {
CellSetModel model = new CellSetModel();
RowModel rowModel = null;
byte[] rowKey = null;
int count = batch;
int limit = batch;
if (maxValues > 0) {
limit = maxValues;
}
int count = limit;
do {
KeyValue value = null;
try {
@ -89,7 +96,7 @@ public class ScannerInstanceResource implements Constants {
LOG.info("generator exhausted");
// respond with 204 (No Content) if an empty cell set would be
// returned
if (count == batch) {
if (count == limit) {
return Response.noContent().build();
}
break;
@ -105,6 +112,14 @@ public class ScannerInstanceResource implements Constants {
generator.putBack(value);
break;
}
// if maxRows was given as a query param, stop if we would exceed the
// specified number of rows
if (maxRows > 0) {
if (--maxRows == 0) {
generator.putBack(value);
break;
}
}
model.addRow(rowModel);
rowKey = value.getRow();
rowModel = new RowModel(rowKey);

View File

@ -0,0 +1,549 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.stargate.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import javax.xml.namespace.QName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.stargate.Constants;
import org.apache.hadoop.hbase.stargate.model.CellModel;
import org.apache.hadoop.hbase.stargate.model.CellSetModel;
import org.apache.hadoop.hbase.stargate.model.ColumnSchemaModel;
import org.apache.hadoop.hbase.stargate.model.RowModel;
import org.apache.hadoop.hbase.stargate.model.ScannerModel;
import org.apache.hadoop.hbase.stargate.model.TableSchemaModel;
import org.apache.hadoop.hbase.util.Bytes;
/**
* HTable interface to remote tables accessed via Stargate
*/
public class RemoteHTable implements HTableInterface {
private static final Log LOG = LogFactory.getLog(RemoteHTable.class);
Client client;
Configuration conf;
byte[] name;
String accessToken;
@SuppressWarnings("unchecked")
protected String buildRowSpec(final byte[] row, final Map familyMap,
final long startTime, final long endTime, final int maxVersions) {
StringBuffer sb = new StringBuffer();
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
sb.append('/');
}
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append(Bytes.toStringBinary(row));
Set families = familyMap.entrySet();
if (families != null) {
Iterator i = familyMap.entrySet().iterator();
if (i.hasNext()) {
sb.append('/');
}
while (i.hasNext()) {
Map.Entry e = (Map.Entry)i.next();
Collection quals = (Collection)e.getValue();
if (quals != null && !quals.isEmpty()) {
Iterator ii = quals.iterator();
while (ii.hasNext()) {
sb.append(Bytes.toStringBinary((byte[])e.getKey()));
sb.append(':');
Object o = ii.next();
// Puts use byte[] but Deletes use KeyValue
if (o instanceof byte[]) {
sb.append(Bytes.toStringBinary((byte[])o));
} else if (o instanceof KeyValue) {
sb.append(Bytes.toStringBinary(((KeyValue)o).getQualifier()));
} else {
throw new RuntimeException("object type not handled");
}
if (ii.hasNext()) {
sb.append(',');
}
}
} else {
sb.append(Bytes.toStringBinary((byte[])e.getKey()));
sb.append(':');
}
if (i.hasNext()) {
sb.append(',');
}
}
}
if (startTime != 0 && endTime != Long.MAX_VALUE) {
sb.append('/');
sb.append(startTime);
if (startTime != endTime) {
sb.append(',');
sb.append(endTime);
}
} else if (endTime != Long.MAX_VALUE) {
sb.append('/');
sb.append(endTime);
}
if (maxVersions > 1) {
sb.append("?v=");
sb.append(maxVersions);
}
return sb.toString();
}
protected Result[] buildResultFromModel(final CellSetModel model) {
List<Result> results = new ArrayList<Result>();
for (RowModel row: model.getRows()) {
List<KeyValue> kvs = new ArrayList<KeyValue>();
for (CellModel cell: row.getCells()) {
byte[][] split = KeyValue.parseColumn(cell.getColumn());
byte[] column = split[0];
byte[] qualifier = split.length > 1 ? split[1] : null;
kvs.add(new KeyValue(row.getKey(), column, qualifier,
cell.getTimestamp(), cell.getValue()));
}
results.add(new Result(kvs));
}
return results.toArray(new Result[results.size()]);
}
protected CellSetModel buildModelFromPut(Put put) {
RowModel row = new RowModel(put.getRow());
long ts = put.getTimeStamp();
for (List<KeyValue> kvs: put.getFamilyMap().values()) {
for (KeyValue kv: kvs) {
row.addCell(new CellModel(kv.getFamily(), kv.getQualifier(),
ts != HConstants.LATEST_TIMESTAMP ? ts : kv.getTimestamp(),
kv.getValue()));
}
}
CellSetModel model = new CellSetModel();
model.addRow(row);
return model;
}
/**
* Constructor
* @param client
* @param name
*/
public RemoteHTable(Client client, String name) {
this(client, HBaseConfiguration.create(), Bytes.toBytes(name), null);
}
/**
* Constructor
* @param client
* @param name
* @param accessToken
*/
public RemoteHTable(Client client, String name, String accessToken) {
this(client, HBaseConfiguration.create(), Bytes.toBytes(name), accessToken);
}
/**
* Constructor
* @param client
* @param conf
* @param name
* @param accessToken
*/
public RemoteHTable(Client client, Configuration conf, String name,
String accessToken) {
this(client, conf, Bytes.toBytes(name), accessToken);
}
/**
* Constructor
* @param conf
*/
public RemoteHTable(Client client, Configuration conf, byte[] name,
String accessToken) {
this.client = client;
this.conf = conf;
this.name = name;
this.accessToken = accessToken;
}
@Override
public byte[] getTableName() {
return name.clone();
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
StringBuilder sb = new StringBuilder();
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
sb.append('/');
}
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append("schema");
Response response = client.get(sb.toString(), Constants.MIMETYPE_PROTOBUF);
if (response.getCode() != 200) {
throw new IOException("schema request returned " + response.getCode());
}
TableSchemaModel schema = new TableSchemaModel();
schema.getObjectFromMessage(response.getBody());
HTableDescriptor htd = new HTableDescriptor(schema.getName());
for (Map.Entry<QName, Object> e: schema.getAny().entrySet()) {
htd.setValue(e.getKey().getLocalPart(), e.getValue().toString());
}
for (ColumnSchemaModel column: schema.getColumns()) {
HColumnDescriptor hcd = new HColumnDescriptor(column.getName());
for (Map.Entry<QName, Object> e: column.getAny().entrySet()) {
hcd.setValue(e.getKey().getLocalPart(), e.getValue().toString());
}
htd.addFamily(hcd);
}
return htd;
}
@Override
public void close() throws IOException {
client.shutdown();
}
@Override
public Result get(Get get) throws IOException {
TimeRange range = get.getTimeRange();
String spec = buildRowSpec(get.getRow(), get.getFamilyMap(),
range.getMin(), range.getMax(), get.getMaxVersions());
if (get.getFilter() != null) {
LOG.warn("filters not supported on gets");
}
Response response = client.get(spec, Constants.MIMETYPE_PROTOBUF);
int code = response.getCode();
if (code == 404) {
return new Result();
}
if (code != 200) {
throw new IOException("get request returned " + code);
}
CellSetModel model = new CellSetModel();
model.getObjectFromMessage(response.getBody());
Result[] results = buildResultFromModel(model);
if (results.length > 0) {
if (results.length > 1) {
LOG.warn("too many results for get (" + results.length + ")");
}
return results[0];
}
return new Result();
}
@Override
public boolean exists(Get get) throws IOException {
LOG.warn("exists() is really get(), just use get()");
Result result = get(get);
return (result != null && !(result.isEmpty()));
}
@Override
public void put(Put put) throws IOException {
CellSetModel model = buildModelFromPut(put);
StringBuilder sb = new StringBuilder();
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
sb.append('/');
}
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append(Bytes.toStringBinary(put.getRow()));
Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
model.createProtobufOutput());
if (response.getCode() != 200) {
throw new IOException("put failed with " + response.getCode());
}
}
@Override
public void put(List<Put> puts) throws IOException {
// this is a trick: Stargate accepts multiple rows in a cell set and
// ignores the row specification in the URI
// separate puts by row
TreeMap<byte[],List<KeyValue>> map =
new TreeMap<byte[],List<KeyValue>>(Bytes.BYTES_COMPARATOR);
for (Put put: puts) {
byte[] row = put.getRow();
List<KeyValue> kvs = map.get(row);
if (kvs == null) {
kvs = new ArrayList<KeyValue>();
map.put(row, kvs);
}
for (List<KeyValue> l: put.getFamilyMap().values()) {
kvs.addAll(l);
}
}
// build the cell set
CellSetModel model = new CellSetModel();
for (Map.Entry<byte[], List<KeyValue>> e: map.entrySet()) {
RowModel row = new RowModel(e.getKey());
for (KeyValue kv: e.getValue()) {
row.addCell(new CellModel(kv));
}
model.addRow(row);
}
// build path for multiput
StringBuilder sb = new StringBuilder();
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
sb.append('/');
}
sb.append(Bytes.toStringBinary(name));
sb.append("/$multiput"); // can be any nonexistent row
Response response = client.put(sb.toString(), Constants.MIMETYPE_PROTOBUF,
model.createProtobufOutput());
if (response.getCode() != 200) {
throw new IOException("multiput failed with " + response.getCode());
}
}
@Override
public void delete(Delete delete) throws IOException {
String spec = buildRowSpec(delete.getRow(), delete.getFamilyMap(),
delete.getTimeStamp(), delete.getTimeStamp(), 1);
Response response = client.delete(spec);
if (response.getCode() != 200) {
throw new IOException("delete() returned " + response.getCode());
}
}
@Override
public void delete(List<Delete> deletes) throws IOException {
for (Delete delete: deletes) {
delete(delete);
}
}
@Override
public void flushCommits() throws IOException {
// no-op
}
class Scanner implements ResultScanner {
String uri;
public Scanner(Scan scan) throws IOException {
StringBuffer sb = new StringBuffer();
sb.append('/');
if (accessToken != null) {
sb.append(accessToken);
sb.append('/');
}
sb.append(Bytes.toStringBinary(name));
sb.append('/');
sb.append("scanner");
try {
ScannerModel model = ScannerModel.fromScan(scan);
Response response = client.post(sb.toString(),
Constants.MIMETYPE_PROTOBUF, model.createProtobufOutput());
if (response.getCode() != 201) {
throw new IOException("scan request failed with " +
response.getCode());
}
uri = response.getLocation();
} catch (Exception e) {
throw new IOException(e);
}
}
@Override
public Result[] next(int nbRows) throws IOException {
StringBuilder sb = new StringBuilder(uri);
sb.append("?n=");
sb.append(nbRows);
Response response = client.get(sb.toString(),
Constants.MIMETYPE_PROTOBUF);
if (response.getCode() == 206) {
return null;
}
if (response.getCode() != 200) {
LOG.error("scanner.next failed with " + response.getCode());
return null;
}
CellSetModel model = new CellSetModel();
model.getObjectFromMessage(response.getBody());
return buildResultFromModel(model);
}
@Override
public Result next() throws IOException {
Result[] results = next(1);
if (results == null || results.length < 1) {
return null;
}
return results[0];
}
class Iter implements Iterator<Result> {
Result cache;
public Iter() {
try {
cache = Scanner.this.next();
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
}
@Override
public boolean hasNext() {
return cache != null;
}
@Override
public Result next() {
Result result = cache;
try {
cache = Scanner.this.next();
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
cache = null;
}
return result;
}
@Override
public void remove() {
throw new RuntimeException("remove() not supported");
}
}
@Override
public Iterator<Result> iterator() {
return new Iter();
}
@Override
public void close() {
try {
client.delete(uri);
} catch (IOException e) {
LOG.warn(StringUtils.stringifyException(e));
}
}
}
@Override
public ResultScanner getScanner(Scan scan) throws IOException {
return new Scanner(scan);
}
@Override
public ResultScanner getScanner(byte[] family) throws IOException {
Scan scan = new Scan();
scan.addFamily(family);
return new Scanner(scan);
}
@Override
public ResultScanner getScanner(byte[] family, byte[] qualifier)
throws IOException {
Scan scan = new Scan();
scan.addColumn(family, qualifier);
return new Scanner(scan);
}
@Override
public boolean isAutoFlush() {
return true;
}
@Override
public Result getRowOrBefore(byte[] row, byte[] family) throws IOException {
throw new IOException("getRowOrBefore not supported");
}
@Override
public RowLock lockRow(byte[] row) throws IOException {
throw new IOException("lockRow not implemented");
}
@Override
public void unlockRow(RowLock rl) throws IOException {
throw new IOException("unlockRow not implemented");
}
@Override
public boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
byte[] value, Put put) throws IOException {
throw new IOException("checkAndPut not supported");
}
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
long amount) throws IOException {
throw new IOException("incrementColumnValue not supported");
}
@Override
public long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
long amount, boolean writeToWAL) throws IOException {
throw new IOException("incrementColumnValue not supported");
}
}

View File

@ -110,11 +110,11 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable {
private byte[] startRow = HConstants.EMPTY_START_ROW;
private byte[] endRow = HConstants.EMPTY_END_ROW;;
private List<byte[]> columns = new ArrayList<byte[]>();
private int batch = 1;
private int batch = Integer.MAX_VALUE;
private long startTime = 0;
private long endTime = Long.MAX_VALUE;
private String filter;
private int maxVersions = 1;
private String filter = null;
private int maxVersions = Integer.MAX_VALUE;
/**
* @param o the JSONObject under construction
@ -343,8 +343,14 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable {
}
model.setStartTime(scan.getTimeRange().getMin());
model.setEndTime(scan.getTimeRange().getMax());
model.setBatch(scan.getCaching());
model.setMaxVersions(scan.getMaxVersions());
int caching = scan.getCaching();
if (caching > 0) {
model.setBatch(caching);
}
int maxVersions = scan.getMaxVersions();
if (maxVersions > 0) {
model.setMaxVersions(maxVersions);
}
Filter filter = scan.getFilter();
if (filter != null) {
model.setFilter(stringifyFilter(new JSONStringer(), filter).toString());

View File

@ -0,0 +1,330 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.stargate.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.stargate.MiniClusterTestBase;
import org.apache.hadoop.hbase.stargate.client.Client;
import org.apache.hadoop.hbase.stargate.client.Cluster;
import org.apache.hadoop.hbase.stargate.client.RemoteHTable;
import org.apache.hadoop.hbase.util.Bytes;
public class TestRemoteTable extends MiniClusterTestBase {
static final String TABLE = "TestRemoteTable";
static final byte[] ROW_1 = Bytes.toBytes("testrow1");
static final byte[] ROW_2 = Bytes.toBytes("testrow2");
static final byte[] ROW_3 = Bytes.toBytes("testrow3");
static final byte[] ROW_4 = Bytes.toBytes("testrow4");
static final byte[] COLUMN_1 = Bytes.toBytes("a");
static final byte[] COLUMN_2 = Bytes.toBytes("b");
static final byte[] COLUMN_3 = Bytes.toBytes("c");
static final byte[] QUALIFIER_1 = Bytes.toBytes("1");
static final byte[] QUALIFIER_2 = Bytes.toBytes("2");
static final byte[] QUALIFIER_3 = Bytes.toBytes("3");
static final byte[] VALUE_1 = Bytes.toBytes("testvalue1");
static final byte[] VALUE_2 = Bytes.toBytes("testvalue2");
static final byte[] VALUE_3 = Bytes.toBytes("testvalue3");
static final long ONE_HOUR = 60 * 60 * 1000;
static final long TS_2 = System.currentTimeMillis();
static final long TS_1 = TS_2 - ONE_HOUR;
Client client;
HBaseAdmin admin;
RemoteHTable remoteTable;
@Override
protected void setUp() throws Exception {
super.setUp();
admin = new HBaseAdmin(conf);
if (!admin.tableExists(TABLE)) {
HTableDescriptor htd = new HTableDescriptor(TABLE);
htd.addFamily(new HColumnDescriptor(COLUMN_1));
htd.addFamily(new HColumnDescriptor(COLUMN_2));
htd.addFamily(new HColumnDescriptor(COLUMN_3));
admin.createTable(htd);
HTable table = new HTable(TABLE);
Put put = new Put(ROW_1);
put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_1);
table.put(put);
put = new Put(ROW_2);
put.add(COLUMN_1, QUALIFIER_1, TS_1, VALUE_1);
put.add(COLUMN_1, QUALIFIER_1, TS_2, VALUE_2);
put.add(COLUMN_2, QUALIFIER_2, TS_2, VALUE_2);
table.put(put);
table.flushCommits();
}
remoteTable = new RemoteHTable(
new Client(new Cluster().add("localhost", testServletPort)),
conf, TABLE, null);
}
@Override
protected void tearDown() throws Exception {
remoteTable.close();
super.tearDown();
}
public void testGetTableDescriptor() throws IOException {
HTableDescriptor local = new HTable(conf, TABLE).getTableDescriptor();
assertEquals(remoteTable.getTableDescriptor(), local);
}
public void testGet() throws IOException {
Get get = new Get(ROW_1);
Result result = remoteTable.get(get);
byte[] value1 = result.getValue(COLUMN_1, QUALIFIER_1);
byte[] value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value1);
assertTrue(Bytes.equals(VALUE_1, value1));
assertNull(value2);
get = new Get(ROW_1);
get.addFamily(COLUMN_3);
result = remoteTable.get(get);
value1 = result.getValue(COLUMN_1, QUALIFIER_1);
value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNull(value1);
assertNull(value2);
get = new Get(ROW_1);
get.addColumn(COLUMN_1, QUALIFIER_1);
get.addColumn(COLUMN_2, QUALIFIER_2);
result = remoteTable.get(get);
value1 = result.getValue(COLUMN_1, QUALIFIER_1);
value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value1);
assertTrue(Bytes.equals(VALUE_1, value1));
assertNull(value2);
get = new Get(ROW_2);
result = remoteTable.get(get);
value1 = result.getValue(COLUMN_1, QUALIFIER_1);
value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value1);
assertTrue(Bytes.equals(VALUE_2, value1)); // @TS_2
assertNotNull(value2);
assertTrue(Bytes.equals(VALUE_2, value2));
get = new Get(ROW_2);
get.addFamily(COLUMN_1);
result = remoteTable.get(get);
value1 = result.getValue(COLUMN_1, QUALIFIER_1);
value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value1);
assertTrue(Bytes.equals(VALUE_2, value1)); // @TS_2
assertNull(value2);
get = new Get(ROW_2);
get.addColumn(COLUMN_1, QUALIFIER_1);
get.addColumn(COLUMN_2, QUALIFIER_2);
result = remoteTable.get(get);
value1 = result.getValue(COLUMN_1, QUALIFIER_1);
value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value1);
assertTrue(Bytes.equals(VALUE_2, value1)); // @TS_2
assertNotNull(value2);
assertTrue(Bytes.equals(VALUE_2, value2));
// test timestamp
get = new Get(ROW_2);
get.addFamily(COLUMN_1);
get.addFamily(COLUMN_2);
get.setTimeStamp(TS_1);
result = remoteTable.get(get);
value1 = result.getValue(COLUMN_1, QUALIFIER_1);
value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value1);
assertTrue(Bytes.equals(VALUE_1, value1)); // @TS_1
assertNull(value2);
// test timerange
get = new Get(ROW_2);
get.addFamily(COLUMN_1);
get.addFamily(COLUMN_2);
get.setTimeRange(0, TS_1 + 1);
result = remoteTable.get(get);
value1 = result.getValue(COLUMN_1, QUALIFIER_1);
value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value1);
assertTrue(Bytes.equals(VALUE_1, value1)); // @TS_1
assertNull(value2);
// test maxVersions
get = new Get(ROW_2);
get.addFamily(COLUMN_1);
get.setMaxVersions(2);
result = remoteTable.get(get);
int count = 0;
for (KeyValue kv: result.list()) {
if (Bytes.equals(COLUMN_1, kv.getFamily()) && TS_1 == kv.getTimestamp()) {
assertTrue(Bytes.equals(VALUE_1, kv.getValue())); // @TS_1
count++;
}
if (Bytes.equals(COLUMN_1, kv.getFamily()) && TS_2 == kv.getTimestamp()) {
assertTrue(Bytes.equals(VALUE_2, kv.getValue())); // @TS_2
count++;
}
}
assertEquals(2, count);
}
public void testPut() throws IOException {
Put put = new Put(ROW_3);
put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
remoteTable.put(put);
Get get = new Get(ROW_3);
get.addFamily(COLUMN_1);
Result result = remoteTable.get(get);
byte[] value = result.getValue(COLUMN_1, QUALIFIER_1);
assertNotNull(value);
assertTrue(Bytes.equals(VALUE_1, value));
// multiput
List<Put> puts = new ArrayList<Put>();
put = new Put(ROW_3);
put.add(COLUMN_2, QUALIFIER_2, VALUE_2);
puts.add(put);
put = new Put(ROW_4);
put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
puts.add(put);
put = new Put(ROW_4);
put.add(COLUMN_2, QUALIFIER_2, VALUE_2);
puts.add(put);
remoteTable.put(puts);
get = new Get(ROW_3);
get.addFamily(COLUMN_2);
result = remoteTable.get(get);
value = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value);
assertTrue(Bytes.equals(VALUE_2, value));
get = new Get(ROW_4);
result = remoteTable.get(get);
value = result.getValue(COLUMN_1, QUALIFIER_1);
assertNotNull(value);
assertTrue(Bytes.equals(VALUE_1, value));
value = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value);
assertTrue(Bytes.equals(VALUE_2, value));
}
public void testDelete() throws IOException {
Put put = new Put(ROW_3);
put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
put.add(COLUMN_2, QUALIFIER_2, VALUE_2);
remoteTable.put(put);
Get get = new Get(ROW_3);
get.addFamily(COLUMN_1);
get.addFamily(COLUMN_2);
Result result = remoteTable.get(get);
byte[] value1 = result.getValue(COLUMN_1, QUALIFIER_1);
byte[] value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value1);
assertTrue(Bytes.equals(VALUE_1, value1));
assertNotNull(value2);
assertTrue(Bytes.equals(VALUE_2, value2));
Delete delete = new Delete(ROW_3);
delete.deleteColumn(COLUMN_2, QUALIFIER_2);
remoteTable.delete(delete);
get = new Get(ROW_3);
get.addFamily(COLUMN_1);
get.addFamily(COLUMN_2);
result = remoteTable.get(get);
value1 = result.getValue(COLUMN_1, QUALIFIER_1);
value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNotNull(value1);
assertTrue(Bytes.equals(VALUE_1, value1));
assertNull(value2);
delete = new Delete(ROW_3);
remoteTable.delete(delete);
get = new Get(ROW_3);
get.addFamily(COLUMN_1);
get.addFamily(COLUMN_2);
result = remoteTable.get(get);
value1 = result.getValue(COLUMN_1, QUALIFIER_1);
value2 = result.getValue(COLUMN_2, QUALIFIER_2);
assertNull(value1);
assertNull(value2);
}
public void testScanner() throws IOException {
List<Put> puts = new ArrayList<Put>();
Put put = new Put(ROW_1);
put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
puts.add(put);
put = new Put(ROW_2);
put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
puts.add(put);
put = new Put(ROW_3);
put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
puts.add(put);
put = new Put(ROW_4);
put.add(COLUMN_1, QUALIFIER_1, VALUE_1);
puts.add(put);
remoteTable.put(puts);
ResultScanner scanner = remoteTable.getScanner(new Scan());
Result[] results = scanner.next(1);
assertNotNull(results);
assertEquals(1, results.length);
assertTrue(Bytes.equals(ROW_1, results[0].getRow()));
results = scanner.next(3);
assertNotNull(results);
assertEquals(3, results.length);
assertTrue(Bytes.equals(ROW_2, results[0].getRow()));
assertTrue(Bytes.equals(ROW_3, results[1].getRow()));
assertTrue(Bytes.equals(ROW_4, results[2].getRow()));
results = scanner.next(1);
assertNull(results);
scanner.close();
}
}