HBASE-9343 Implement stateless scanner for Stargate (Vandana Ayyalasomayajula)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1558994 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
ndimiduk 2014-01-17 02:31:00 +00:00
parent 608d873be0
commit a8ec7232cc
13 changed files with 1071 additions and 9 deletions

View File

@ -46,6 +46,10 @@ public interface MetricsRESTSource extends BaseSource {
String FAILED_PUT_KEY = "failedPut";
String FAILED_DELETE_KEY = "failedDelete";
String SUCCESSFUL_SCAN_KEY = "successfulScanCount";
String FAILED_SCAN_KEY = "failedScanCount";
/**
* Increment the number of requests
@ -95,4 +99,18 @@ public interface MetricsRESTSource extends BaseSource {
* @param inc The number of failed delete requests.
*/
void incrementFailedDeleteRequests(int inc);
/**
* Increment the number of successful scan requests.
*
* @param inc Number of successful scan requests.
*/
void incrementSucessfulScanRequests(final int inc);
/**
* Increment the number failed scan requests.
*
* @param inc the inc
*/
void incrementFailedScanRequests(final int inc);
}

View File

@ -33,9 +33,11 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST
private MetricMutableCounterLong sucGet;
private MetricMutableCounterLong sucPut;
private MetricMutableCounterLong sucDel;
private MetricMutableCounterLong sucScan;
private MetricMutableCounterLong fGet;
private MetricMutableCounterLong fPut;
private MetricMutableCounterLong fDel;
private MetricMutableCounterLong fScan;
public MetricsRESTSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, CONTEXT, JMX_CONTEXT);
@ -56,10 +58,12 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST
sucGet = getMetricsRegistry().getLongCounter(SUCCESSFUL_GET_KEY, 0l);
sucPut = getMetricsRegistry().getLongCounter(SUCCESSFUL_PUT_KEY, 0l);
sucDel = getMetricsRegistry().getLongCounter(SUCCESSFUL_DELETE_KEY, 0l);
sucScan = getMetricsRegistry().getLongCounter(SUCCESSFUL_SCAN_KEY, 0L);
fGet = getMetricsRegistry().getLongCounter(FAILED_GET_KEY, 0l);
fPut = getMetricsRegistry().getLongCounter(FAILED_PUT_KEY, 0l);
fDel = getMetricsRegistry().getLongCounter(FAILED_DELETE_KEY, 0l);
fScan = getMetricsRegistry().getLongCounter(FAILED_SCAN_KEY, 0l);
}
@Override
@ -96,4 +100,14 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST
public void incrementFailedDeleteRequests(int inc) {
fDel.incr(inc);
}
@Override
public void incrementSucessfulScanRequests(int inc) {
sucScan.incr(inc);
}
@Override
public void incrementFailedScanRequests(int inc) {
fScan.incr(inc);
}
}

View File

@ -35,9 +35,11 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST
private MutableCounterLong sucGet;
private MutableCounterLong sucPut;
private MutableCounterLong sucDel;
private MutableCounterLong sucScan;
private MutableCounterLong fGet;
private MutableCounterLong fPut;
private MutableCounterLong fDel;
private MutableCounterLong fScan;
public MetricsRESTSourceImpl() {
this(METRICS_NAME, METRICS_DESCRIPTION, CONTEXT, JMX_CONTEXT);
@ -58,10 +60,12 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST
sucGet = getMetricsRegistry().getLongCounter(SUCCESSFUL_GET_KEY, 0l);
sucPut = getMetricsRegistry().getLongCounter(SUCCESSFUL_PUT_KEY, 0l);
sucDel = getMetricsRegistry().getLongCounter(SUCCESSFUL_DELETE_KEY, 0l);
sucScan = getMetricsRegistry().getLongCounter(SUCCESSFUL_SCAN_KEY, 0L);
fGet = getMetricsRegistry().getLongCounter(FAILED_GET_KEY, 0l);
fPut = getMetricsRegistry().getLongCounter(FAILED_PUT_KEY, 0l);
fDel = getMetricsRegistry().getLongCounter(FAILED_DELETE_KEY, 0l);
fScan = getMetricsRegistry().getLongCounter(FAILED_SCAN_KEY, 0l);
}
@Override
@ -98,4 +102,14 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST
public void incrementFailedDeleteRequests(int inc) {
fDel.incr(inc);
}
@Override
public void incrementSucessfulScanRequests(int inc) {
sucScan.incr(inc);
}
@Override
public void incrementFailedScanRequests(int inc) {
fScan.incr(inc);
}
}

View File

@ -57,5 +57,16 @@ public interface Constants {
static final String REST_DNS_NAMESERVER = "hbase.rest.dns.nameserver";
static final String REST_DNS_INTERFACE = "hbase.rest.dns.interface";
public static final String FILTER_CLASSES = "hbase.rest.filter.classes";
public static final String SCAN_START_ROW = "startrow";
public static final String SCAN_END_ROW = "endrow";
public static final String SCAN_COLUMN = "column";
public static final String SCAN_START_TIME = "starttime";
public static final String SCAN_END_TIME = "endtime";
public static final String SCAN_MAX_VERSIONS = "maxversions";
public static final String SCAN_BATCH_SIZE = "batchsize";
public static final String SCAN_LIMIT = "limit";
public static final String SCAN_FETCH_SIZE = "hbase.rest.scan.fetchsize";
}

View File

@ -78,12 +78,26 @@ public class MetricsREST {
public void incrementSucessfulDeleteRequests(final int inc) {
source.incrementSucessfulDeleteRequests(inc);
}
/**
* @param inc How much to add to failedDeleteCount.
*/
public void incrementFailedDeleteRequests(final int inc) {
source.incrementFailedDeleteRequests(inc);
}
/**
* @param inc How much to add to sucessfulScanCount.
*/
public synchronized void incrementSucessfulScanRequests(final int inc) {
source.incrementSucessfulScanRequests(inc);
}
/**
* @param inc How much to add to failedScanCount.
*/
public void incrementFailedScanRequests(final int inc) {
source.incrementFailedScanRequests(inc);
}
}

View File

@ -0,0 +1,102 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rest;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.StreamingOutput;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel;
import org.apache.hadoop.hbase.rest.model.RowModel;
import org.apache.hadoop.hbase.util.Bytes;
public class ProtobufStreamingUtil implements StreamingOutput {
private static final Log LOG = LogFactory.getLog(ProtobufStreamingUtil.class);
private String contentType;
private ResultScanner resultScanner;
private int limit;
private int fetchSize;
protected ProtobufStreamingUtil(ResultScanner scanner, String type, int limit, int fetchSize) {
this.resultScanner = scanner;
this.contentType = type;
this.limit = limit;
this.fetchSize = fetchSize;
LOG.debug("Created ScanStreamingUtil with content type = " + this.contentType + " user limit : "
+ this.limit + " scan fetch size : " + this.fetchSize);
}
@Override
public void write(OutputStream outStream) throws IOException, WebApplicationException {
Result[] rowsToSend;
if(limit < fetchSize){
rowsToSend = this.resultScanner.next(limit);
writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream);
} else {
int count = limit;
while (count > 0) {
if (count < fetchSize) {
rowsToSend = this.resultScanner.next(count);
} else {
rowsToSend = this.resultScanner.next(this.fetchSize);
}
if(rowsToSend.length == 0){
break;
}
count = count - rowsToSend.length;
writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream);
}
}
}
private void writeToStream(CellSetModel model, String contentType, OutputStream outStream)
throws IOException {
byte[] objectBytes = model.createProtobufOutput();
outStream.write(Bytes.toBytes((short)objectBytes.length));
outStream.write(objectBytes);
outStream.flush();
LOG.trace("Wrote " + model.getRows().size() + " rows to stream successfully.");
}
private CellSetModel createModelFromResults(Result[] results) {
CellSetModel cellSetModel = new CellSetModel();
for (Result rs : results) {
byte[] rowKey = rs.getRow();
RowModel rModel = new RowModel(rowKey);
List<Cell> kvs = rs.listCells();
for (Cell kv : kvs) {
rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv
.getTimestamp(), CellUtil.cloneValue(kv)));
}
cellSetModel.addRow(rModel);
}
return cellSetModel;
}
}

View File

@ -20,18 +20,32 @@
package org.apache.hadoop.hbase.rest;
import java.io.IOException;
import java.util.List;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
@InterfaceAudience.Private
public class TableResource extends ResourceBase {
String table;
private static final Log LOG = LogFactory.getLog(TableResource.class);
/**
* Constructor
@ -82,7 +96,7 @@ public class TableResource extends ResourceBase {
return new MultiRowResource(this, versions);
}
@Path("{rowspec: .+}")
@Path("{rowspec: [^*]+}")
public RowResource getRowResource(
// We need the @Encoded decorator so Jersey won't urldecode before
// the RowSpec constructor has a chance to parse
@ -91,4 +105,76 @@ public class TableResource extends ResourceBase {
final @QueryParam("check") String check) throws IOException {
return new RowResource(this, rowspec, versions, check);
}
@Path("{suffixglobbingspec: .*\\*/.+}")
public RowResource getRowResourceWithSuffixGlobbing(
// We need the @Encoded decorator so Jersey won't urldecode before
// the RowSpec constructor has a chance to parse
final @PathParam("suffixglobbingspec") @Encoded String suffixglobbingspec,
final @QueryParam("v") String versions,
final @QueryParam("check") String check) throws IOException {
return new RowResource(this, suffixglobbingspec, versions, check);
}
@Path("{scanspec: .*[*]$}")
public TableScanResource getScanResource(
final @Context UriInfo uriInfo,
final @PathParam("scanspec") String scanSpec,
final @HeaderParam("Accept") String contentType,
@DefaultValue(Integer.MAX_VALUE + "")
@QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit,
@DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow,
@DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow,
@DefaultValue("") @QueryParam(Constants.SCAN_COLUMN) List<String> column,
@DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions,
@DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize,
@DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime,
@DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime,
@DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) {
try {
Filter filter = null;
if (scanSpec.indexOf('*') > 0) {
String prefix = scanSpec.substring(0, scanSpec.indexOf('*'));
filter = new PrefixFilter(Bytes.toBytes(prefix));
}
LOG.debug("Query parameters : Table Name = > " + this.table + " Start Row => " + startRow
+ " End Row => " + endRow + " Columns => " + column + " Start Time => " + startTime
+ " End Time => " + endTime + " Cache Blocks => " + cacheBlocks + " Max Versions => "
+ maxVersions + " Batch Size => " + batchSize);
HTableInterface hTable = RESTServlet.getInstance().getTable(this.table);
Scan tableScan = new Scan();
tableScan.setBatch(batchSize);
tableScan.setMaxVersions(maxVersions);
tableScan.setTimeRange(startTime, endTime);
tableScan.setStartRow(Bytes.toBytes(startRow));
tableScan.setStopRow(Bytes.toBytes(endRow));
for (String csplit : column) {
String[] familysplit = csplit.trim().split(":");
if (familysplit.length == 2) {
if (familysplit[1].length() > 0) {
LOG.debug("Scan family and column : " + familysplit[0] + " " + familysplit[1]);
tableScan.addColumn(Bytes.toBytes(familysplit[0]), Bytes.toBytes(familysplit[1]));
} else {
tableScan.addFamily(Bytes.toBytes(familysplit[0]));
LOG.debug("Scan family : " + familysplit[0] + " and empty qualifier.");
tableScan.addColumn(Bytes.toBytes(familysplit[0]), null);
}
} else if (StringUtils.isNotEmpty(familysplit[0])){
LOG.debug("Scan family : " + familysplit[0]);
tableScan.addFamily(Bytes.toBytes(familysplit[0]));
}
}
if (filter != null) {
tableScan.setFilter(filter);
}
int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10);
tableScan.setCaching(fetchSize);
return new TableScanResource(hTable.getScanner(tableScan), userRequestedLimit);
} catch (Exception exp) {
servlet.getMetrics().incrementFailedScanRequests(1);
processException(exp);
LOG.warn(exp);
return null;
}
}
}

View File

@ -0,0 +1,168 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rest;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.Response.ResponseBuilder;
import javax.ws.rs.core.UriInfo;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.RowModel;
import org.codehaus.jackson.annotate.JsonIgnore;
import org.codehaus.jackson.annotate.JsonProperty;
@InterfaceAudience.Private
public class TableScanResource extends ResourceBase {
private static final Log LOG = LogFactory.getLog(TableScanResource.class);
TableResource tableResource;
ResultScanner results;
int userRequestedLimit;
public TableScanResource(ResultScanner scanner, int userRequestedLimit) throws IOException {
super();
this.results = scanner;
this.userRequestedLimit = userRequestedLimit;
}
@GET
@Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON })
public CellSetModelStream get(final @Context UriInfo uriInfo) {
servlet.getMetrics().incrementRequests(1);
final int rowsToSend = userRequestedLimit;
servlet.getMetrics().incrementSucessfulScanRequests(1);
final Iterator<Result> itr = results.iterator();
return new CellSetModelStream(new ArrayList<RowModel>() {
public Iterator<RowModel> iterator() {
return new Iterator<RowModel>() {
int count = rowsToSend;
@Override
public boolean hasNext() {
if (count > 0) {
return itr.hasNext();
} else {
return false;
}
}
@Override
public void remove() {
throw new UnsupportedOperationException(
"Remove method cannot be used in CellSetModelStream");
}
@Override
public RowModel next() {
Result rs = itr.next();
if ((rs == null) || (count <= 0)) {
return null;
}
byte[] rowKey = rs.getRow();
RowModel rModel = new RowModel(rowKey);
List<Cell> kvs = rs.listCells();
for (Cell kv : kvs) {
rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
kv.getTimestamp(), CellUtil.cloneValue(kv)));
}
count--;
return rModel;
}
};
}
});
}
@GET
@Produces({ Constants.MIMETYPE_PROTOBUF, Constants.MIMETYPE_PROTOBUF_IETF })
public Response getProtobuf(
final @Context UriInfo uriInfo,
final @PathParam("scanspec") String scanSpec,
final @HeaderParam("Accept") String contentType,
@DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit,
@DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow,
@DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow,
@DefaultValue("column") @QueryParam(Constants.SCAN_COLUMN) List<String> column,
@DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions,
@DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize,
@DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime,
@DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime,
@DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) {
servlet.getMetrics().incrementRequests(1);
try {
int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10);
ProtobufStreamingUtil stream = new ProtobufStreamingUtil(this.results, contentType,
userRequestedLimit, fetchSize);
servlet.getMetrics().incrementSucessfulScanRequests(1);
ResponseBuilder response = Response.ok(stream);
response.header("content-type", contentType);
return response.build();
} catch (Exception exp) {
servlet.getMetrics().incrementFailedScanRequests(1);
processException(exp);
LOG.warn(exp);
return null;
}
}
@XmlRootElement(name = "CellSet")
@XmlAccessorType(XmlAccessType.FIELD)
public static class CellSetModelStream {
// JAXB needs an arraylist for streaming
@XmlElement(name = "Row")
@JsonIgnore
private ArrayList<RowModel> Row;
public CellSetModelStream() {
}
public CellSetModelStream(final ArrayList<RowModel> rowList) {
this.Row = rowList;
}
// jackson needs an iterator for streaming
@JsonProperty("Row")
public Iterator<RowModel> getIterator() {
return Row.iterator();
}
}
}

View File

@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.rest.client;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@ -333,7 +334,8 @@ public class Client {
int code = execute(c, method, headers, path);
headers = method.getResponseHeaders();
byte[] body = method.getResponseBody();
return new Response(code, headers, body);
InputStream in = method.getResponseBodyAsStream();
return new Response(code, headers, body, in);
} finally {
method.releaseConnection();
}

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.rest.client;
import java.io.InputStream;
import org.apache.commons.httpclient.Header;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -32,6 +34,7 @@ public class Response {
private int code;
private Header[] headers;
private byte[] body;
private InputStream stream;
/**
* Constructor
@ -61,6 +64,20 @@ public class Response {
this.headers = headers;
this.body = body;
}
/**
* Constructor
* @param code the HTTP response code
* @param headers headers the HTTP response headers
* @param body the response body, can be null
* @param in Inputstream if the response had one.
*/
public Response(int code, Header[] headers, byte[] body, InputStream in) {
this.code = code;
this.headers = headers;
this.body = body;
this.stream = in;
}
/**
* @return the HTTP response code
@ -68,6 +85,15 @@ public class Response {
public int getCode() {
return code;
}
/**
* Gets the input stream instance.
*
* @return an instance of InputStream class.
*/
public InputStream getStream(){
return this.stream;
}
/**
* @return the HTTP response headers

View File

@ -482,5 +482,103 @@ public class TestGetAndPutResource extends RowResourceBase {
}
return contains;
}
@Test
public void testSuffixGlobbingXMLWithNewScanner() throws IOException, JAXBException {
String path = "/" + TABLE + "/fakerow"; // deliberate nonexistent row
CellSetModel cellSetModel = new CellSetModel();
RowModel rowModel = new RowModel(ROW_1);
rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
Bytes.toBytes(VALUE_1)));
rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
Bytes.toBytes(VALUE_2)));
cellSetModel.addRow(rowModel);
rowModel = new RowModel(ROW_2);
rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
Bytes.toBytes(VALUE_3)));
rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
Bytes.toBytes(VALUE_4)));
cellSetModel.addRow(rowModel);
StringWriter writer = new StringWriter();
xmlMarshaller.marshal(cellSetModel, writer);
Response response = client.put(path, Constants.MIMETYPE_XML,
Bytes.toBytes(writer.toString()));
Thread.yield();
// make sure the fake row was not actually created
response = client.get(path, Constants.MIMETYPE_XML);
assertEquals(response.getCode(), 404);
// check that all of the values were created
StringBuilder query = new StringBuilder();
query.append('/');
query.append(TABLE);
query.append('/');
query.append("testrow*");
response = client.get(query.toString(), Constants.MIMETYPE_XML);
assertEquals(response.getCode(), 200);
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
CellSetModel cellSet = (CellSetModel)
xmlUnmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));
assertTrue(cellSet.getRows().size() == 2);
response = deleteRow(TABLE, ROW_1);
assertEquals(response.getCode(), 200);
response = deleteRow(TABLE, ROW_2);
assertEquals(response.getCode(), 200);
}
@Test
public void testSuffixGlobbingXML() throws IOException, JAXBException {
String path = "/" + TABLE + "/fakerow"; // deliberate nonexistent row
CellSetModel cellSetModel = new CellSetModel();
RowModel rowModel = new RowModel(ROW_1);
rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
Bytes.toBytes(VALUE_1)));
rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
Bytes.toBytes(VALUE_2)));
cellSetModel.addRow(rowModel);
rowModel = new RowModel(ROW_2);
rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1),
Bytes.toBytes(VALUE_3)));
rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2),
Bytes.toBytes(VALUE_4)));
cellSetModel.addRow(rowModel);
StringWriter writer = new StringWriter();
xmlMarshaller.marshal(cellSetModel, writer);
Response response = client.put(path, Constants.MIMETYPE_XML,
Bytes.toBytes(writer.toString()));
Thread.yield();
// make sure the fake row was not actually created
response = client.get(path, Constants.MIMETYPE_XML);
assertEquals(response.getCode(), 404);
// check that all of the values were created
StringBuilder query = new StringBuilder();
query.append('/');
query.append(TABLE);
query.append('/');
query.append("testrow*");
query.append('/');
query.append(COLUMN_1);
response = client.get(query.toString(), Constants.MIMETYPE_XML);
assertEquals(response.getCode(), 200);
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
CellSetModel cellSet = (CellSetModel)
xmlUnmarshaller.unmarshal(new ByteArrayInputStream(response.getBody()));
List<RowModel> rows = cellSet.getRows();
assertTrue(rows.size() == 2);
for (RowModel row : rows) {
assertTrue(row.getCells().size() == 1);
assertEquals(COLUMN_1, Bytes.toString(row.getCells().get(0).getColumn()));
}
response = deleteRow(TABLE, ROW_1);
assertEquals(response.getCode(), 200);
response = deleteRow(TABLE, ROW_2);
assertEquals(response.getCode(), 200);
}
}

View File

@ -73,11 +73,11 @@ public class TestScannerResource {
private static int expectedRows2;
private static Configuration conf;
private static int insertData(String tableName, String column, double prob)
static int insertData(Configuration conf, String tableName, String column, double prob)
throws IOException {
Random rng = new Random();
int count = 0;
HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName);
HTable table = new HTable(conf, tableName);
byte[] k = new byte[3];
byte [][] famAndQf = KeyValue.parseColumn(Bytes.toBytes(column));
for (byte b1 = 'a'; b1 < 'z'; b1++) {
@ -97,10 +97,11 @@ public class TestScannerResource {
}
}
table.flushCommits();
table.close();
return count;
}
private static int countCellSet(CellSetModel model) {
static int countCellSet(CellSetModel model) {
int count = 0;
Iterator<RowModel> rows = model.getRows().iterator();
while (rows.hasNext()) {
@ -170,8 +171,8 @@ public class TestScannerResource {
htd.addFamily(new HColumnDescriptor(CFA));
htd.addFamily(new HColumnDescriptor(CFB));
admin.createTable(htd);
expectedRows1 = insertData(TABLE, COLUMN_1, 1.0);
expectedRows2 = insertData(TABLE, COLUMN_2, 0.5);
expectedRows1 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_1, 1.0);
expectedRows2 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_2, 0.5);
}
@AfterClass

View File

@ -0,0 +1,508 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.rest;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.DataInputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import javax.ws.rs.core.MediaType;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.JAXBException;
import javax.xml.bind.Unmarshaller;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import javax.xml.parsers.SAXParserFactory;
import javax.xml.stream.XMLStreamException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.rest.client.Client;
import org.apache.hadoop.hbase.rest.client.Cluster;
import org.apache.hadoop.hbase.rest.client.Response;
import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.CellSetModel;
import org.apache.hadoop.hbase.rest.model.RowModel;
import org.apache.hadoop.hbase.rest.provider.JacksonProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.codehaus.jackson.JsonFactory;
import org.codehaus.jackson.JsonParser;
import org.codehaus.jackson.JsonToken;
import org.codehaus.jackson.map.ObjectMapper;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.xml.sax.InputSource;
import org.xml.sax.XMLReader;
@Category(MediumTests.class)
public class TestTableScan {
private static final String TABLE = "TestScanResource";
private static final String CFA = "a";
private static final String CFB = "b";
private static final String COLUMN_1 = CFA + ":1";
private static final String COLUMN_2 = CFB + ":2";
private static Client client;
private static int expectedRows1;
private static int expectedRows2;
private static Configuration conf;
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final HBaseRESTTestingUtility REST_TEST_UTIL =
new HBaseRESTTestingUtility();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = TEST_UTIL.getConfiguration();
TEST_UTIL.startMiniCluster();
REST_TEST_UTIL.startServletContainer(conf);
client = new Client(new Cluster().add("localhost",
REST_TEST_UTIL.getServletPort()));
HBaseAdmin admin = TEST_UTIL.getHBaseAdmin();
if (!admin.tableExists(TABLE)) {
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE));
htd.addFamily(new HColumnDescriptor(CFA));
htd.addFamily(new HColumnDescriptor(CFB));
admin.createTable(htd);
expectedRows1 = TestScannerResource.insertData(conf, TABLE, COLUMN_1, 1.0);
expectedRows2 = TestScannerResource.insertData(conf, TABLE, COLUMN_2, 0.5);
}
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
TEST_UTIL.getHBaseAdmin().deleteTable(TABLE);
REST_TEST_UTIL.shutdownServletContainer();
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testSimpleScannerXML() throws IOException, JAXBException, XMLStreamException {
// Test scanning particular columns
StringBuilder builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_LIMIT + "=10");
Response response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_XML);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
JAXBContext ctx = JAXBContext.newInstance(CellSetModel.class);
Unmarshaller ush = ctx.createUnmarshaller();
CellSetModel model = (CellSetModel) ush.unmarshal(response.getStream());
int count = TestScannerResource.countCellSet(model);
assertEquals(10, count);
checkRowsNotNull(model);
//Test with no limit.
builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_XML);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
model = (CellSetModel) ush.unmarshal(response.getStream());
count = TestScannerResource.countCellSet(model);
assertEquals(expectedRows1, count);
checkRowsNotNull(model);
//Test with start and end row.
builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_START_ROW + "=aaa");
builder.append("&");
builder.append(Constants.SCAN_END_ROW + "=aay");
response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_XML);
assertEquals(200, response.getCode());
model = (CellSetModel) ush.unmarshal(response.getStream());
count = TestScannerResource.countCellSet(model);
RowModel startRow = model.getRows().get(0);
assertEquals("aaa", Bytes.toString(startRow.getKey()));
RowModel endRow = model.getRows().get(model.getRows().size() - 1);
assertEquals("aax", Bytes.toString(endRow.getKey()));
assertEquals(24, count);
checkRowsNotNull(model);
//Test with start row and limit.
builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_START_ROW + "=aaa");
builder.append("&");
builder.append(Constants.SCAN_LIMIT + "=15");
response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_XML);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
model = (CellSetModel) ush.unmarshal(response.getStream());
startRow = model.getRows().get(0);
assertEquals("aaa", Bytes.toString(startRow.getKey()));
count = TestScannerResource.countCellSet(model);
assertEquals(15, count);
checkRowsNotNull(model);
}
@Test
public void testSimpleScannerJson() throws IOException, JAXBException {
// Test scanning particular columns with limit.
StringBuilder builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_LIMIT + "=20");
Response response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_JSON);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
ObjectMapper mapper = new JacksonProvider()
.locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE);
CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class);
int count = TestScannerResource.countCellSet(model);
assertEquals(20, count);
checkRowsNotNull(model);
//Test scanning with no limit.
builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2);
response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_JSON);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
model = mapper.readValue(response.getStream(), CellSetModel.class);
count = TestScannerResource.countCellSet(model);
assertEquals(expectedRows2, count);
checkRowsNotNull(model);
//Test with start row and end row.
builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_START_ROW + "=aaa");
builder.append("&");
builder.append(Constants.SCAN_END_ROW + "=aay");
response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_JSON);
assertEquals(200, response.getCode());
model = mapper.readValue(response.getStream(), CellSetModel.class);
RowModel startRow = model.getRows().get(0);
assertEquals("aaa", Bytes.toString(startRow.getKey()));
RowModel endRow = model.getRows().get(model.getRows().size() - 1);
assertEquals("aax", Bytes.toString(endRow.getKey()));
count = TestScannerResource.countCellSet(model);
assertEquals(24, count);
checkRowsNotNull(model);
}
/**
* An example to scan using listener in unmarshaller for XML.
* @throws Exception the exception
*/
@Test
public void testScanUsingListenerUnmarshallerXML() throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_LIMIT + "=10");
Response response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_XML);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type"));
JAXBContext context = JAXBContext.newInstance(ClientSideCellSetModel.class, RowModel.class,
CellModel.class);
Unmarshaller unmarshaller = context.createUnmarshaller();
final ClientSideCellSetModel.Listener listener = new ClientSideCellSetModel.Listener() {
@Override
public void handleRowModel(ClientSideCellSetModel helper, RowModel row) {
assertTrue(row.getKey() != null);
assertTrue(row.getCells().size() > 0);
}
};
// install the callback on all ClientSideCellSetModel instances
unmarshaller.setListener(new Unmarshaller.Listener() {
public void beforeUnmarshal(Object target, Object parent) {
if (target instanceof ClientSideCellSetModel) {
((ClientSideCellSetModel) target).setCellSetModelListener(listener);
}
}
public void afterUnmarshal(Object target, Object parent) {
if (target instanceof ClientSideCellSetModel) {
((ClientSideCellSetModel) target).setCellSetModelListener(null);
}
}
});
// create a new XML parser
SAXParserFactory factory = SAXParserFactory.newInstance();
factory.setNamespaceAware(true);
XMLReader reader = factory.newSAXParser().getXMLReader();
reader.setContentHandler(unmarshaller.getUnmarshallerHandler());
assertFalse(ClientSideCellSetModel.listenerInvoked);
reader.parse(new InputSource(response.getStream()));
assertTrue(ClientSideCellSetModel.listenerInvoked);
}
@Test
public void testStreamingJSON() throws Exception {
// Test scanning particular columns with limit.
StringBuilder builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_LIMIT + "=20");
Response response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_JSON);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
ObjectMapper mapper = new JacksonProvider()
.locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE);
CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class);
int count = TestScannerResource.countCellSet(model);
assertEquals(20, count);
checkRowsNotNull(model);
//Test scanning with no limit.
builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2);
response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_JSON);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
model = mapper.readValue(response.getStream(), CellSetModel.class);
count = TestScannerResource.countCellSet(model);
assertEquals(expectedRows2, count);
checkRowsNotNull(model);
//Test with start row and end row.
builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_START_ROW + "=aaa");
builder.append("&");
builder.append(Constants.SCAN_END_ROW + "=aay");
response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_JSON);
assertEquals(200, response.getCode());
count = 0;
JsonFactory jfactory = new JsonFactory(mapper);
JsonParser jParser = jfactory.createJsonParser(response.getStream());
boolean found = false;
while (jParser.nextToken() != JsonToken.END_OBJECT) {
if(jParser.getCurrentToken() == JsonToken.START_OBJECT && found) {
RowModel row = jParser.readValueAs(RowModel.class);
assertNotNull(row.getKey());
for (int i = 0; i < row.getCells().size(); i++) {
if (count == 0) {
assertEquals("aaa", Bytes.toString(row.getKey()));
}
if (count == 23) {
assertEquals("aax", Bytes.toString(row.getKey()));
}
count++;
}
jParser.skipChildren();
} else {
found = jParser.getCurrentToken() == JsonToken.START_ARRAY;
}
}
assertEquals(24, count);
}
@Test
public void testSimpleScannerProtobuf() throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_LIMIT + "=15");
Response response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
int rowCount = readProtobufStream(response.getStream());
assertEquals(15, rowCount);
//Test with start row and end row.
builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1);
builder.append("&");
builder.append(Constants.SCAN_START_ROW + "=aaa");
builder.append("&");
builder.append(Constants.SCAN_END_ROW + "=aay");
response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_PROTOBUF);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type"));
rowCount = readProtobufStream(response.getStream());
assertEquals(24, rowCount);
}
private void checkRowsNotNull(CellSetModel model) {
for (RowModel row: model.getRows()) {
assertTrue(row.getKey() != null);
assertTrue(row.getCells().size() > 0);
}
}
/**
* Read protobuf stream.
* @param inputStream the input stream
* @return The number of rows in the cell set model.
* @throws IOException Signals that an I/O exception has occurred.
*/
public int readProtobufStream(InputStream inputStream) throws IOException{
DataInputStream stream = new DataInputStream(inputStream);
CellSetModel model = null;
int rowCount = 0;
try {
while (true) {
byte[] lengthBytes = new byte[2];
int readBytes = stream.read(lengthBytes);
if (readBytes == -1) {
break;
}
assertEquals(2, readBytes);
int length = Bytes.toShort(lengthBytes);
byte[] cellset = new byte[length];
stream.read(cellset);
model = new CellSetModel();
model.getObjectFromMessage(cellset);
checkRowsNotNull(model);
rowCount = rowCount + TestScannerResource.countCellSet(model);
}
} catch (EOFException exp) {
exp.printStackTrace();
} finally {
stream.close();
}
return rowCount;
}
@Test
public void testScanningUnknownColumnJson() throws IOException, JAXBException {
// Test scanning particular columns with limit.
StringBuilder builder = new StringBuilder();
builder.append("/*");
builder.append("?");
builder.append(Constants.SCAN_COLUMN + "=a:test");
Response response = client.get("/" + TABLE + builder.toString(),
Constants.MIMETYPE_JSON);
assertEquals(200, response.getCode());
assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type"));
ObjectMapper mapper = new JacksonProvider().locateMapper(CellSetModel.class,
MediaType.APPLICATION_JSON_TYPE);
CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class);
int count = TestScannerResource.countCellSet(model);
assertEquals(0, count);
}
/**
* The Class ClientSideCellSetModel which mimics cell set model, and contains listener to perform
* user defined operations on the row model.
*/
@XmlRootElement(name = "CellSet")
@XmlAccessorType(XmlAccessType.FIELD)
public static class ClientSideCellSetModel implements Serializable {
private static final long serialVersionUID = 1L;
/**
* This list is not a real list; instead it will notify a listener whenever JAXB has
* unmarshalled the next row.
*/
@XmlElement(name="Row")
private List<RowModel> row;
static boolean listenerInvoked = false;
/**
* Install a listener for row model on this object. If l is null, the listener
* is removed again.
*/
public void setCellSetModelListener(final Listener l) {
row = (l == null) ? null : new ArrayList<RowModel>() {
private static final long serialVersionUID = 1L;
public boolean add(RowModel o) {
l.handleRowModel(ClientSideCellSetModel.this, o);
listenerInvoked = true;
return false;
}
};
}
/**
* This listener is invoked every time a new row model is unmarshalled.
*/
public static interface Listener {
void handleRowModel(ClientSideCellSetModel helper, RowModel rowModel);
}
}
}