From a8ec7232cc6217c9310e23ef2088bc67c8596216 Mon Sep 17 00:00:00 2001 From: ndimiduk Date: Fri, 17 Jan 2014 02:31:00 +0000 Subject: [PATCH] HBASE-9343 Implement stateless scanner for Stargate (Vandana Ayyalasomayajula) git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1558994 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hbase/rest/MetricsRESTSource.java | 18 + .../hbase/rest/MetricsRESTSourceImpl.java | 14 + .../hbase/rest/MetricsRESTSourceImpl.java | 14 + .../apache/hadoop/hbase/rest/Constants.java | 11 + .../apache/hadoop/hbase/rest/MetricsREST.java | 18 +- .../hbase/rest/ProtobufStreamingUtil.java | 102 ++++ .../hadoop/hbase/rest/TableResource.java | 88 ++- .../hadoop/hbase/rest/TableScanResource.java | 168 ++++++ .../hadoop/hbase/rest/client/Client.java | 4 +- .../hadoop/hbase/rest/client/Response.java | 26 + .../hbase/rest/TestGetAndPutResource.java | 98 ++++ .../hbase/rest/TestScannerResource.java | 11 +- .../hadoop/hbase/rest/TestTableScan.java | 508 ++++++++++++++++++ 13 files changed, 1071 insertions(+), 9 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java index c1629f788b3..4ecd73bbbb8 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSource.java @@ -46,6 +46,10 @@ public interface MetricsRESTSource extends BaseSource { String FAILED_PUT_KEY = "failedPut"; String FAILED_DELETE_KEY = "failedDelete"; + + String SUCCESSFUL_SCAN_KEY = "successfulScanCount"; + + String FAILED_SCAN_KEY = "failedScanCount"; /** * Increment the number of requests @@ -95,4 +99,18 @@ public interface MetricsRESTSource extends BaseSource { * @param inc The number of failed delete requests. */ void incrementFailedDeleteRequests(int inc); + + /** + * Increment the number of successful scan requests. + * + * @param inc Number of successful scan requests. + */ + void incrementSucessfulScanRequests(final int inc); + + /** + * Increment the number failed scan requests. + * + * @param inc the inc + */ + void incrementFailedScanRequests(final int inc); } diff --git a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java index 94551a61720..64316bb5152 100644 --- a/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java +++ b/hbase-hadoop1-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java @@ -33,9 +33,11 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST private MetricMutableCounterLong sucGet; private MetricMutableCounterLong sucPut; private MetricMutableCounterLong sucDel; + private MetricMutableCounterLong sucScan; private MetricMutableCounterLong fGet; private MetricMutableCounterLong fPut; private MetricMutableCounterLong fDel; + private MetricMutableCounterLong fScan; public MetricsRESTSourceImpl() { this(METRICS_NAME, METRICS_DESCRIPTION, CONTEXT, JMX_CONTEXT); @@ -56,10 +58,12 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST sucGet = getMetricsRegistry().getLongCounter(SUCCESSFUL_GET_KEY, 0l); sucPut = getMetricsRegistry().getLongCounter(SUCCESSFUL_PUT_KEY, 0l); sucDel = getMetricsRegistry().getLongCounter(SUCCESSFUL_DELETE_KEY, 0l); + sucScan = getMetricsRegistry().getLongCounter(SUCCESSFUL_SCAN_KEY, 0L); fGet = getMetricsRegistry().getLongCounter(FAILED_GET_KEY, 0l); fPut = getMetricsRegistry().getLongCounter(FAILED_PUT_KEY, 0l); fDel = getMetricsRegistry().getLongCounter(FAILED_DELETE_KEY, 0l); + fScan = getMetricsRegistry().getLongCounter(FAILED_SCAN_KEY, 0l); } @Override @@ -96,4 +100,14 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST public void incrementFailedDeleteRequests(int inc) { fDel.incr(inc); } + + @Override + public void incrementSucessfulScanRequests(int inc) { + sucScan.incr(inc); + } + + @Override + public void incrementFailedScanRequests(int inc) { + fScan.incr(inc); + } } diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java index 4a4eb080012..52d76a25a53 100644 --- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java +++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/rest/MetricsRESTSourceImpl.java @@ -35,9 +35,11 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST private MutableCounterLong sucGet; private MutableCounterLong sucPut; private MutableCounterLong sucDel; + private MutableCounterLong sucScan; private MutableCounterLong fGet; private MutableCounterLong fPut; private MutableCounterLong fDel; + private MutableCounterLong fScan; public MetricsRESTSourceImpl() { this(METRICS_NAME, METRICS_DESCRIPTION, CONTEXT, JMX_CONTEXT); @@ -58,10 +60,12 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST sucGet = getMetricsRegistry().getLongCounter(SUCCESSFUL_GET_KEY, 0l); sucPut = getMetricsRegistry().getLongCounter(SUCCESSFUL_PUT_KEY, 0l); sucDel = getMetricsRegistry().getLongCounter(SUCCESSFUL_DELETE_KEY, 0l); + sucScan = getMetricsRegistry().getLongCounter(SUCCESSFUL_SCAN_KEY, 0L); fGet = getMetricsRegistry().getLongCounter(FAILED_GET_KEY, 0l); fPut = getMetricsRegistry().getLongCounter(FAILED_PUT_KEY, 0l); fDel = getMetricsRegistry().getLongCounter(FAILED_DELETE_KEY, 0l); + fScan = getMetricsRegistry().getLongCounter(FAILED_SCAN_KEY, 0l); } @Override @@ -98,4 +102,14 @@ public class MetricsRESTSourceImpl extends BaseSourceImpl implements MetricsREST public void incrementFailedDeleteRequests(int inc) { fDel.incr(inc); } + + @Override + public void incrementSucessfulScanRequests(int inc) { + sucScan.incr(inc); + } + + @Override + public void incrementFailedScanRequests(int inc) { + fScan.incr(inc); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java index ec99e08a04e..8c101d838b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/Constants.java @@ -57,5 +57,16 @@ public interface Constants { static final String REST_DNS_NAMESERVER = "hbase.rest.dns.nameserver"; static final String REST_DNS_INTERFACE = "hbase.rest.dns.interface"; + public static final String FILTER_CLASSES = "hbase.rest.filter.classes"; + public static final String SCAN_START_ROW = "startrow"; + public static final String SCAN_END_ROW = "endrow"; + public static final String SCAN_COLUMN = "column"; + public static final String SCAN_START_TIME = "starttime"; + public static final String SCAN_END_TIME = "endtime"; + public static final String SCAN_MAX_VERSIONS = "maxversions"; + public static final String SCAN_BATCH_SIZE = "batchsize"; + public static final String SCAN_LIMIT = "limit"; + public static final String SCAN_FETCH_SIZE = "hbase.rest.scan.fetchsize"; + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java index 98f48a70180..82ccfa5a284 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/MetricsREST.java @@ -78,12 +78,26 @@ public class MetricsREST { public void incrementSucessfulDeleteRequests(final int inc) { source.incrementSucessfulDeleteRequests(inc); } - + /** * @param inc How much to add to failedDeleteCount. */ public void incrementFailedDeleteRequests(final int inc) { source.incrementFailedDeleteRequests(inc); } - + + /** + * @param inc How much to add to sucessfulScanCount. + */ + public synchronized void incrementSucessfulScanRequests(final int inc) { + source.incrementSucessfulScanRequests(inc); + } + + /** + * @param inc How much to add to failedScanCount. + */ + public void incrementFailedScanRequests(final int inc) { + source.incrementFailedScanRequests(inc); + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java new file mode 100644 index 00000000000..93bb94017cf --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/ProtobufStreamingUtil.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rest; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.List; + +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.StreamingOutput; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.rest.model.CellModel; +import org.apache.hadoop.hbase.rest.model.CellSetModel; +import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.util.Bytes; + + +public class ProtobufStreamingUtil implements StreamingOutput { + + private static final Log LOG = LogFactory.getLog(ProtobufStreamingUtil.class); + private String contentType; + private ResultScanner resultScanner; + private int limit; + private int fetchSize; + + protected ProtobufStreamingUtil(ResultScanner scanner, String type, int limit, int fetchSize) { + this.resultScanner = scanner; + this.contentType = type; + this.limit = limit; + this.fetchSize = fetchSize; + LOG.debug("Created ScanStreamingUtil with content type = " + this.contentType + " user limit : " + + this.limit + " scan fetch size : " + this.fetchSize); + } + + @Override + public void write(OutputStream outStream) throws IOException, WebApplicationException { + Result[] rowsToSend; + if(limit < fetchSize){ + rowsToSend = this.resultScanner.next(limit); + writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream); + } else { + int count = limit; + while (count > 0) { + if (count < fetchSize) { + rowsToSend = this.resultScanner.next(count); + } else { + rowsToSend = this.resultScanner.next(this.fetchSize); + } + if(rowsToSend.length == 0){ + break; + } + count = count - rowsToSend.length; + writeToStream(createModelFromResults(rowsToSend), this.contentType, outStream); + } + } + } + + private void writeToStream(CellSetModel model, String contentType, OutputStream outStream) + throws IOException { + byte[] objectBytes = model.createProtobufOutput(); + outStream.write(Bytes.toBytes((short)objectBytes.length)); + outStream.write(objectBytes); + outStream.flush(); + LOG.trace("Wrote " + model.getRows().size() + " rows to stream successfully."); + } + + private CellSetModel createModelFromResults(Result[] results) { + CellSetModel cellSetModel = new CellSetModel(); + for (Result rs : results) { + byte[] rowKey = rs.getRow(); + RowModel rModel = new RowModel(rowKey); + List kvs = rs.listCells(); + for (Cell kv : kvs) { + rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), kv + .getTimestamp(), CellUtil.cloneValue(kv))); + } + cellSetModel.addRow(rModel); + } + return cellSetModel; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java index 5172fa3cd95..963e900d4c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableResource.java @@ -20,18 +20,32 @@ package org.apache.hadoop.hbase.rest; import java.io.IOException; +import java.util.List; +import javax.ws.rs.DefaultValue; import javax.ws.rs.Encoded; +import javax.ws.rs.HeaderParam; import javax.ws.rs.Path; import javax.ws.rs.PathParam; import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.UriInfo; +import org.apache.commons.lang.StringUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.PrefixFilter; +import org.apache.hadoop.hbase.util.Bytes; @InterfaceAudience.Private public class TableResource extends ResourceBase { String table; + private static final Log LOG = LogFactory.getLog(TableResource.class); /** * Constructor @@ -82,7 +96,7 @@ public class TableResource extends ResourceBase { return new MultiRowResource(this, versions); } - @Path("{rowspec: .+}") + @Path("{rowspec: [^*]+}") public RowResource getRowResource( // We need the @Encoded decorator so Jersey won't urldecode before // the RowSpec constructor has a chance to parse @@ -91,4 +105,76 @@ public class TableResource extends ResourceBase { final @QueryParam("check") String check) throws IOException { return new RowResource(this, rowspec, versions, check); } + + @Path("{suffixglobbingspec: .*\\*/.+}") + public RowResource getRowResourceWithSuffixGlobbing( + // We need the @Encoded decorator so Jersey won't urldecode before + // the RowSpec constructor has a chance to parse + final @PathParam("suffixglobbingspec") @Encoded String suffixglobbingspec, + final @QueryParam("v") String versions, + final @QueryParam("check") String check) throws IOException { + return new RowResource(this, suffixglobbingspec, versions, check); + } + + @Path("{scanspec: .*[*]$}") + public TableScanResource getScanResource( + final @Context UriInfo uriInfo, + final @PathParam("scanspec") String scanSpec, + final @HeaderParam("Accept") String contentType, + @DefaultValue(Integer.MAX_VALUE + "") + @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit, + @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow, + @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow, + @DefaultValue("") @QueryParam(Constants.SCAN_COLUMN) List column, + @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions, + @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize, + @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime, + @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime, + @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) { + try { + Filter filter = null; + if (scanSpec.indexOf('*') > 0) { + String prefix = scanSpec.substring(0, scanSpec.indexOf('*')); + filter = new PrefixFilter(Bytes.toBytes(prefix)); + } + LOG.debug("Query parameters : Table Name = > " + this.table + " Start Row => " + startRow + + " End Row => " + endRow + " Columns => " + column + " Start Time => " + startTime + + " End Time => " + endTime + " Cache Blocks => " + cacheBlocks + " Max Versions => " + + maxVersions + " Batch Size => " + batchSize); + HTableInterface hTable = RESTServlet.getInstance().getTable(this.table); + Scan tableScan = new Scan(); + tableScan.setBatch(batchSize); + tableScan.setMaxVersions(maxVersions); + tableScan.setTimeRange(startTime, endTime); + tableScan.setStartRow(Bytes.toBytes(startRow)); + tableScan.setStopRow(Bytes.toBytes(endRow)); + for (String csplit : column) { + String[] familysplit = csplit.trim().split(":"); + if (familysplit.length == 2) { + if (familysplit[1].length() > 0) { + LOG.debug("Scan family and column : " + familysplit[0] + " " + familysplit[1]); + tableScan.addColumn(Bytes.toBytes(familysplit[0]), Bytes.toBytes(familysplit[1])); + } else { + tableScan.addFamily(Bytes.toBytes(familysplit[0])); + LOG.debug("Scan family : " + familysplit[0] + " and empty qualifier."); + tableScan.addColumn(Bytes.toBytes(familysplit[0]), null); + } + } else if (StringUtils.isNotEmpty(familysplit[0])){ + LOG.debug("Scan family : " + familysplit[0]); + tableScan.addFamily(Bytes.toBytes(familysplit[0])); + } + } + if (filter != null) { + tableScan.setFilter(filter); + } + int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10); + tableScan.setCaching(fetchSize); + return new TableScanResource(hTable.getScanner(tableScan), userRequestedLimit); + } catch (Exception exp) { + servlet.getMetrics().incrementFailedScanRequests(1); + processException(exp); + LOG.warn(exp); + return null; + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java new file mode 100644 index 00000000000..8ff52de5600 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/TableScanResource.java @@ -0,0 +1,168 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rest; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.HeaderParam; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.Response.ResponseBuilder; +import javax.ws.rs.core.UriInfo; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.rest.model.CellModel; +import org.apache.hadoop.hbase.rest.model.RowModel; +import org.codehaus.jackson.annotate.JsonIgnore; +import org.codehaus.jackson.annotate.JsonProperty; + +@InterfaceAudience.Private +public class TableScanResource extends ResourceBase { + + private static final Log LOG = LogFactory.getLog(TableScanResource.class); + TableResource tableResource; + ResultScanner results; + int userRequestedLimit; + + public TableScanResource(ResultScanner scanner, int userRequestedLimit) throws IOException { + super(); + this.results = scanner; + this.userRequestedLimit = userRequestedLimit; + } + + @GET + @Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON }) + public CellSetModelStream get(final @Context UriInfo uriInfo) { + servlet.getMetrics().incrementRequests(1); + final int rowsToSend = userRequestedLimit; + servlet.getMetrics().incrementSucessfulScanRequests(1); + final Iterator itr = results.iterator(); + return new CellSetModelStream(new ArrayList() { + public Iterator iterator() { + return new Iterator() { + int count = rowsToSend; + + @Override + public boolean hasNext() { + if (count > 0) { + return itr.hasNext(); + } else { + return false; + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException( + "Remove method cannot be used in CellSetModelStream"); + } + + @Override + public RowModel next() { + Result rs = itr.next(); + if ((rs == null) || (count <= 0)) { + return null; + } + byte[] rowKey = rs.getRow(); + RowModel rModel = new RowModel(rowKey); + List kvs = rs.listCells(); + for (Cell kv : kvs) { + rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv), + kv.getTimestamp(), CellUtil.cloneValue(kv))); + } + count--; + return rModel; + } + }; + } + }); + } + + @GET + @Produces({ Constants.MIMETYPE_PROTOBUF, Constants.MIMETYPE_PROTOBUF_IETF }) + public Response getProtobuf( + final @Context UriInfo uriInfo, + final @PathParam("scanspec") String scanSpec, + final @HeaderParam("Accept") String contentType, + @DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit, + @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow, + @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow, + @DefaultValue("column") @QueryParam(Constants.SCAN_COLUMN) List column, + @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions, + @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize, + @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime, + @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime, + @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) { + servlet.getMetrics().incrementRequests(1); + try { + int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10); + ProtobufStreamingUtil stream = new ProtobufStreamingUtil(this.results, contentType, + userRequestedLimit, fetchSize); + servlet.getMetrics().incrementSucessfulScanRequests(1); + ResponseBuilder response = Response.ok(stream); + response.header("content-type", contentType); + return response.build(); + } catch (Exception exp) { + servlet.getMetrics().incrementFailedScanRequests(1); + processException(exp); + LOG.warn(exp); + return null; + } + } + + @XmlRootElement(name = "CellSet") + @XmlAccessorType(XmlAccessType.FIELD) + public static class CellSetModelStream { + // JAXB needs an arraylist for streaming + @XmlElement(name = "Row") + @JsonIgnore + private ArrayList Row; + + public CellSetModelStream() { + } + + public CellSetModelStream(final ArrayList rowList) { + this.Row = rowList; + } + + // jackson needs an iterator for streaming + @JsonProperty("Row") + public Iterator getIterator() { + return Row.iterator(); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java index 3cb6fcd2f4b..77feb244d87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Client.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.rest.client; import java.io.IOException; +import java.io.InputStream; import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -333,7 +334,8 @@ public class Client { int code = execute(c, method, headers, path); headers = method.getResponseHeaders(); byte[] body = method.getResponseBody(); - return new Response(code, headers, body); + InputStream in = method.getResponseBodyAsStream(); + return new Response(code, headers, body, in); } finally { method.releaseConnection(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java index 16ee9d2cc9c..c2810fe12b2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/rest/client/Response.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.rest.client; +import java.io.InputStream; + import org.apache.commons.httpclient.Header; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -32,6 +34,7 @@ public class Response { private int code; private Header[] headers; private byte[] body; + private InputStream stream; /** * Constructor @@ -61,6 +64,20 @@ public class Response { this.headers = headers; this.body = body; } + + /** + * Constructor + * @param code the HTTP response code + * @param headers headers the HTTP response headers + * @param body the response body, can be null + * @param in Inputstream if the response had one. + */ + public Response(int code, Header[] headers, byte[] body, InputStream in) { + this.code = code; + this.headers = headers; + this.body = body; + this.stream = in; + } /** * @return the HTTP response code @@ -68,6 +85,15 @@ public class Response { public int getCode() { return code; } + + /** + * Gets the input stream instance. + * + * @return an instance of InputStream class. + */ + public InputStream getStream(){ + return this.stream; + } /** * @return the HTTP response headers diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java index 6c64f8d5f55..53e603f0045 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestGetAndPutResource.java @@ -482,5 +482,103 @@ public class TestGetAndPutResource extends RowResourceBase { } return contains; } + + @Test + public void testSuffixGlobbingXMLWithNewScanner() throws IOException, JAXBException { + String path = "/" + TABLE + "/fakerow"; // deliberate nonexistent row + + CellSetModel cellSetModel = new CellSetModel(); + RowModel rowModel = new RowModel(ROW_1); + rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1), + Bytes.toBytes(VALUE_1))); + rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2), + Bytes.toBytes(VALUE_2))); + cellSetModel.addRow(rowModel); + rowModel = new RowModel(ROW_2); + rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1), + Bytes.toBytes(VALUE_3))); + rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2), + Bytes.toBytes(VALUE_4))); + cellSetModel.addRow(rowModel); + StringWriter writer = new StringWriter(); + xmlMarshaller.marshal(cellSetModel, writer); + Response response = client.put(path, Constants.MIMETYPE_XML, + Bytes.toBytes(writer.toString())); + Thread.yield(); + + // make sure the fake row was not actually created + response = client.get(path, Constants.MIMETYPE_XML); + assertEquals(response.getCode(), 404); + + // check that all of the values were created + StringBuilder query = new StringBuilder(); + query.append('/'); + query.append(TABLE); + query.append('/'); + query.append("testrow*"); + response = client.get(query.toString(), Constants.MIMETYPE_XML); + assertEquals(response.getCode(), 200); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + CellSetModel cellSet = (CellSetModel) + xmlUnmarshaller.unmarshal(new ByteArrayInputStream(response.getBody())); + assertTrue(cellSet.getRows().size() == 2); + + response = deleteRow(TABLE, ROW_1); + assertEquals(response.getCode(), 200); + response = deleteRow(TABLE, ROW_2); + assertEquals(response.getCode(), 200); + } + + @Test + public void testSuffixGlobbingXML() throws IOException, JAXBException { + String path = "/" + TABLE + "/fakerow"; // deliberate nonexistent row + + CellSetModel cellSetModel = new CellSetModel(); + RowModel rowModel = new RowModel(ROW_1); + rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1), + Bytes.toBytes(VALUE_1))); + rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2), + Bytes.toBytes(VALUE_2))); + cellSetModel.addRow(rowModel); + rowModel = new RowModel(ROW_2); + rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_1), + Bytes.toBytes(VALUE_3))); + rowModel.addCell(new CellModel(Bytes.toBytes(COLUMN_2), + Bytes.toBytes(VALUE_4))); + cellSetModel.addRow(rowModel); + StringWriter writer = new StringWriter(); + xmlMarshaller.marshal(cellSetModel, writer); + Response response = client.put(path, Constants.MIMETYPE_XML, + Bytes.toBytes(writer.toString())); + Thread.yield(); + + // make sure the fake row was not actually created + response = client.get(path, Constants.MIMETYPE_XML); + assertEquals(response.getCode(), 404); + + // check that all of the values were created + StringBuilder query = new StringBuilder(); + query.append('/'); + query.append(TABLE); + query.append('/'); + query.append("testrow*"); + query.append('/'); + query.append(COLUMN_1); + response = client.get(query.toString(), Constants.MIMETYPE_XML); + assertEquals(response.getCode(), 200); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + CellSetModel cellSet = (CellSetModel) + xmlUnmarshaller.unmarshal(new ByteArrayInputStream(response.getBody())); + List rows = cellSet.getRows(); + assertTrue(rows.size() == 2); + for (RowModel row : rows) { + assertTrue(row.getCells().size() == 1); + assertEquals(COLUMN_1, Bytes.toString(row.getCells().get(0).getColumn())); + } + response = deleteRow(TABLE, ROW_1); + assertEquals(response.getCode(), 200); + response = deleteRow(TABLE, ROW_2); + assertEquals(response.getCode(), 200); + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java index e932a800c2d..84aa994749d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestScannerResource.java @@ -73,11 +73,11 @@ public class TestScannerResource { private static int expectedRows2; private static Configuration conf; - private static int insertData(String tableName, String column, double prob) + static int insertData(Configuration conf, String tableName, String column, double prob) throws IOException { Random rng = new Random(); int count = 0; - HTable table = new HTable(TEST_UTIL.getConfiguration(), tableName); + HTable table = new HTable(conf, tableName); byte[] k = new byte[3]; byte [][] famAndQf = KeyValue.parseColumn(Bytes.toBytes(column)); for (byte b1 = 'a'; b1 < 'z'; b1++) { @@ -97,10 +97,11 @@ public class TestScannerResource { } } table.flushCommits(); + table.close(); return count; } - private static int countCellSet(CellSetModel model) { + static int countCellSet(CellSetModel model) { int count = 0; Iterator rows = model.getRows().iterator(); while (rows.hasNext()) { @@ -170,8 +171,8 @@ public class TestScannerResource { htd.addFamily(new HColumnDescriptor(CFA)); htd.addFamily(new HColumnDescriptor(CFB)); admin.createTable(htd); - expectedRows1 = insertData(TABLE, COLUMN_1, 1.0); - expectedRows2 = insertData(TABLE, COLUMN_2, 0.5); + expectedRows1 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_1, 1.0); + expectedRows2 = insertData(TEST_UTIL.getConfiguration(), TABLE, COLUMN_2, 0.5); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java new file mode 100644 index 00000000000..2e5518105bb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/rest/TestTableScan.java @@ -0,0 +1,508 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.rest; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.DataInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; + +import javax.ws.rs.core.MediaType; +import javax.xml.bind.JAXBContext; +import javax.xml.bind.JAXBException; +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.annotation.XmlAccessType; +import javax.xml.bind.annotation.XmlAccessorType; +import javax.xml.bind.annotation.XmlElement; +import javax.xml.bind.annotation.XmlRootElement; +import javax.xml.parsers.SAXParserFactory; +import javax.xml.stream.XMLStreamException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MediumTests; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.rest.client.Client; +import org.apache.hadoop.hbase.rest.client.Cluster; +import org.apache.hadoop.hbase.rest.client.Response; +import org.apache.hadoop.hbase.rest.model.CellModel; +import org.apache.hadoop.hbase.rest.model.CellSetModel; +import org.apache.hadoop.hbase.rest.model.RowModel; +import org.apache.hadoop.hbase.rest.provider.JacksonProvider; +import org.apache.hadoop.hbase.util.Bytes; +import org.codehaus.jackson.JsonFactory; +import org.codehaus.jackson.JsonParser; +import org.codehaus.jackson.JsonToken; +import org.codehaus.jackson.map.ObjectMapper; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.xml.sax.InputSource; +import org.xml.sax.XMLReader; + +@Category(MediumTests.class) +public class TestTableScan { + + private static final String TABLE = "TestScanResource"; + private static final String CFA = "a"; + private static final String CFB = "b"; + private static final String COLUMN_1 = CFA + ":1"; + private static final String COLUMN_2 = CFB + ":2"; + private static Client client; + private static int expectedRows1; + private static int expectedRows2; + private static Configuration conf; + + private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + private static final HBaseRESTTestingUtility REST_TEST_UTIL = + new HBaseRESTTestingUtility(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + conf = TEST_UTIL.getConfiguration(); + TEST_UTIL.startMiniCluster(); + REST_TEST_UTIL.startServletContainer(conf); + client = new Client(new Cluster().add("localhost", + REST_TEST_UTIL.getServletPort())); + HBaseAdmin admin = TEST_UTIL.getHBaseAdmin(); + if (!admin.tableExists(TABLE)) { + HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(TABLE)); + htd.addFamily(new HColumnDescriptor(CFA)); + htd.addFamily(new HColumnDescriptor(CFB)); + admin.createTable(htd); + expectedRows1 = TestScannerResource.insertData(conf, TABLE, COLUMN_1, 1.0); + expectedRows2 = TestScannerResource.insertData(conf, TABLE, COLUMN_2, 0.5); + } + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.getHBaseAdmin().disableTable(TABLE); + TEST_UTIL.getHBaseAdmin().deleteTable(TABLE); + REST_TEST_UTIL.shutdownServletContainer(); + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testSimpleScannerXML() throws IOException, JAXBException, XMLStreamException { + // Test scanning particular columns + StringBuilder builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=10"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + JAXBContext ctx = JAXBContext.newInstance(CellSetModel.class); + Unmarshaller ush = ctx.createUnmarshaller(); + CellSetModel model = (CellSetModel) ush.unmarshal(response.getStream()); + int count = TestScannerResource.countCellSet(model); + assertEquals(10, count); + checkRowsNotNull(model); + + //Test with no limit. + builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + model = (CellSetModel) ush.unmarshal(response.getStream()); + count = TestScannerResource.countCellSet(model); + assertEquals(expectedRows1, count); + checkRowsNotNull(model); + + //Test with start and end row. + builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_END_ROW + "=aay"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + model = (CellSetModel) ush.unmarshal(response.getStream()); + count = TestScannerResource.countCellSet(model); + RowModel startRow = model.getRows().get(0); + assertEquals("aaa", Bytes.toString(startRow.getKey())); + RowModel endRow = model.getRows().get(model.getRows().size() - 1); + assertEquals("aax", Bytes.toString(endRow.getKey())); + assertEquals(24, count); + checkRowsNotNull(model); + + //Test with start row and limit. + builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=15"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + model = (CellSetModel) ush.unmarshal(response.getStream()); + startRow = model.getRows().get(0); + assertEquals("aaa", Bytes.toString(startRow.getKey())); + count = TestScannerResource.countCellSet(model); + assertEquals(15, count); + checkRowsNotNull(model); + + } + + @Test + public void testSimpleScannerJson() throws IOException, JAXBException { + // Test scanning particular columns with limit. + StringBuilder builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=20"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + ObjectMapper mapper = new JacksonProvider() + .locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE); + CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class); + int count = TestScannerResource.countCellSet(model); + assertEquals(20, count); + checkRowsNotNull(model); + + //Test scanning with no limit. + builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + model = mapper.readValue(response.getStream(), CellSetModel.class); + count = TestScannerResource.countCellSet(model); + assertEquals(expectedRows2, count); + checkRowsNotNull(model); + + //Test with start row and end row. + builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_END_ROW + "=aay"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + model = mapper.readValue(response.getStream(), CellSetModel.class); + RowModel startRow = model.getRows().get(0); + assertEquals("aaa", Bytes.toString(startRow.getKey())); + RowModel endRow = model.getRows().get(model.getRows().size() - 1); + assertEquals("aax", Bytes.toString(endRow.getKey())); + count = TestScannerResource.countCellSet(model); + assertEquals(24, count); + checkRowsNotNull(model); + } + + /** + * An example to scan using listener in unmarshaller for XML. + * @throws Exception the exception + */ + @Test + public void testScanUsingListenerUnmarshallerXML() throws Exception { + StringBuilder builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=10"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_XML); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_XML, response.getHeader("content-type")); + JAXBContext context = JAXBContext.newInstance(ClientSideCellSetModel.class, RowModel.class, + CellModel.class); + Unmarshaller unmarshaller = context.createUnmarshaller(); + + final ClientSideCellSetModel.Listener listener = new ClientSideCellSetModel.Listener() { + @Override + public void handleRowModel(ClientSideCellSetModel helper, RowModel row) { + assertTrue(row.getKey() != null); + assertTrue(row.getCells().size() > 0); + } + }; + + // install the callback on all ClientSideCellSetModel instances + unmarshaller.setListener(new Unmarshaller.Listener() { + public void beforeUnmarshal(Object target, Object parent) { + if (target instanceof ClientSideCellSetModel) { + ((ClientSideCellSetModel) target).setCellSetModelListener(listener); + } + } + + public void afterUnmarshal(Object target, Object parent) { + if (target instanceof ClientSideCellSetModel) { + ((ClientSideCellSetModel) target).setCellSetModelListener(null); + } + } + }); + + // create a new XML parser + SAXParserFactory factory = SAXParserFactory.newInstance(); + factory.setNamespaceAware(true); + XMLReader reader = factory.newSAXParser().getXMLReader(); + reader.setContentHandler(unmarshaller.getUnmarshallerHandler()); + assertFalse(ClientSideCellSetModel.listenerInvoked); + reader.parse(new InputSource(response.getStream())); + assertTrue(ClientSideCellSetModel.listenerInvoked); + + } + + @Test + public void testStreamingJSON() throws Exception { + // Test scanning particular columns with limit. + StringBuilder builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=20"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + ObjectMapper mapper = new JacksonProvider() + .locateMapper(CellSetModel.class, MediaType.APPLICATION_JSON_TYPE); + CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class); + int count = TestScannerResource.countCellSet(model); + assertEquals(20, count); + checkRowsNotNull(model); + + //Test scanning with no limit. + builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_2); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + model = mapper.readValue(response.getStream(), CellSetModel.class); + count = TestScannerResource.countCellSet(model); + assertEquals(expectedRows2, count); + checkRowsNotNull(model); + + //Test with start row and end row. + builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_END_ROW + "=aay"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + + count = 0; + JsonFactory jfactory = new JsonFactory(mapper); + JsonParser jParser = jfactory.createJsonParser(response.getStream()); + boolean found = false; + while (jParser.nextToken() != JsonToken.END_OBJECT) { + if(jParser.getCurrentToken() == JsonToken.START_OBJECT && found) { + RowModel row = jParser.readValueAs(RowModel.class); + assertNotNull(row.getKey()); + for (int i = 0; i < row.getCells().size(); i++) { + if (count == 0) { + assertEquals("aaa", Bytes.toString(row.getKey())); + } + if (count == 23) { + assertEquals("aax", Bytes.toString(row.getKey())); + } + count++; + } + jParser.skipChildren(); + } else { + found = jParser.getCurrentToken() == JsonToken.START_ARRAY; + } + } + assertEquals(24, count); + } + + @Test + public void testSimpleScannerProtobuf() throws Exception { + StringBuilder builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_LIMIT + "=15"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_PROTOBUF); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); + int rowCount = readProtobufStream(response.getStream()); + assertEquals(15, rowCount); + + //Test with start row and end row. + builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=" + COLUMN_1); + builder.append("&"); + builder.append(Constants.SCAN_START_ROW + "=aaa"); + builder.append("&"); + builder.append(Constants.SCAN_END_ROW + "=aay"); + response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_PROTOBUF); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_PROTOBUF, response.getHeader("content-type")); + rowCount = readProtobufStream(response.getStream()); + assertEquals(24, rowCount); + } + + private void checkRowsNotNull(CellSetModel model) { + for (RowModel row: model.getRows()) { + assertTrue(row.getKey() != null); + assertTrue(row.getCells().size() > 0); + } + } + + /** + * Read protobuf stream. + * @param inputStream the input stream + * @return The number of rows in the cell set model. + * @throws IOException Signals that an I/O exception has occurred. + */ + public int readProtobufStream(InputStream inputStream) throws IOException{ + DataInputStream stream = new DataInputStream(inputStream); + CellSetModel model = null; + int rowCount = 0; + try { + while (true) { + byte[] lengthBytes = new byte[2]; + int readBytes = stream.read(lengthBytes); + if (readBytes == -1) { + break; + } + assertEquals(2, readBytes); + int length = Bytes.toShort(lengthBytes); + byte[] cellset = new byte[length]; + stream.read(cellset); + model = new CellSetModel(); + model.getObjectFromMessage(cellset); + checkRowsNotNull(model); + rowCount = rowCount + TestScannerResource.countCellSet(model); + } + } catch (EOFException exp) { + exp.printStackTrace(); + } finally { + stream.close(); + } + return rowCount; + } + + @Test + public void testScanningUnknownColumnJson() throws IOException, JAXBException { + // Test scanning particular columns with limit. + StringBuilder builder = new StringBuilder(); + builder.append("/*"); + builder.append("?"); + builder.append(Constants.SCAN_COLUMN + "=a:test"); + Response response = client.get("/" + TABLE + builder.toString(), + Constants.MIMETYPE_JSON); + assertEquals(200, response.getCode()); + assertEquals(Constants.MIMETYPE_JSON, response.getHeader("content-type")); + ObjectMapper mapper = new JacksonProvider().locateMapper(CellSetModel.class, + MediaType.APPLICATION_JSON_TYPE); + CellSetModel model = mapper.readValue(response.getStream(), CellSetModel.class); + int count = TestScannerResource.countCellSet(model); + assertEquals(0, count); + } + + /** + * The Class ClientSideCellSetModel which mimics cell set model, and contains listener to perform + * user defined operations on the row model. + */ + @XmlRootElement(name = "CellSet") + @XmlAccessorType(XmlAccessType.FIELD) + public static class ClientSideCellSetModel implements Serializable { + + private static final long serialVersionUID = 1L; + + /** + * This list is not a real list; instead it will notify a listener whenever JAXB has + * unmarshalled the next row. + */ + @XmlElement(name="Row") + private List row; + + static boolean listenerInvoked = false; + + /** + * Install a listener for row model on this object. If l is null, the listener + * is removed again. + */ + public void setCellSetModelListener(final Listener l) { + row = (l == null) ? null : new ArrayList() { + private static final long serialVersionUID = 1L; + + public boolean add(RowModel o) { + l.handleRowModel(ClientSideCellSetModel.this, o); + listenerInvoked = true; + return false; + } + }; + } + + /** + * This listener is invoked every time a new row model is unmarshalled. + */ + public static interface Listener { + void handleRowModel(ClientSideCellSetModel helper, RowModel rowModel); + } + } +} + + +