HBASE-2793 Add ability to extract a specified list of versions of a column in a single roundtrip
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@962700 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b5a5aa6381
commit
c36ed07685
|
@ -754,6 +754,8 @@ Release 0.21.0 - Unreleased
|
|||
(Nicolas Spiegelberg via JD)
|
||||
HBASE-2786 TestHLog.testSplit hangs (Nicolas Spiegelberg via JD)
|
||||
HBASE-2790 Purge apache-forrest from TRUNK
|
||||
HBASE-2793 Add ability to extract a specified list of versions of a column
|
||||
in a single roundtrip (Kannan via Ryan)
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1961 HBase EC2 scripts
|
||||
|
|
|
@ -101,6 +101,10 @@ public interface Filter extends Writable {
|
|||
* Skip this KeyValue
|
||||
*/
|
||||
SKIP,
|
||||
/**
|
||||
* Skip this column. Go to the next column in this row.
|
||||
*/
|
||||
NEXT_COL,
|
||||
/**
|
||||
* Done with columns, skip to next row. Note that filterRow() will
|
||||
* still be called.
|
||||
|
|
|
@ -0,0 +1,91 @@
|
|||
package org.apache.hadoop.hbase.filter;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
|
||||
/**
|
||||
* Filter that returns only cells whose timestamp (version) is
|
||||
* in the specified list of timestamps (versions).
|
||||
* <p>
|
||||
* Note: Use of this filter overrides any time range/time stamp
|
||||
* options specified using {@link Get#setTimeRange(long, long)},
|
||||
* {@link Scan#setTimeRange(long, long)}, {@link Get#setTimeStamp(long)},
|
||||
* or {@link Scan#setTimeStamp(long)}.
|
||||
*/
|
||||
public class TimestampsFilter extends FilterBase {
|
||||
|
||||
TreeSet<Long> timestamps;
|
||||
|
||||
// Used during scans to hint the scan to stop early
|
||||
// once the timestamps fall below the minTimeStamp.
|
||||
long minTimeStamp = Long.MAX_VALUE;
|
||||
|
||||
/**
|
||||
* Used during deserialization. Do not use otherwise.
|
||||
*/
|
||||
public TimestampsFilter() {
|
||||
super();
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor for filter that retains only those
|
||||
* cells whose timestamp (version) is in the specified
|
||||
* list of timestamps.
|
||||
*
|
||||
* @param timestamps
|
||||
*/
|
||||
public TimestampsFilter(List<Long> timestamps) {
|
||||
this.timestamps = new TreeSet<Long>(timestamps);
|
||||
init();
|
||||
}
|
||||
|
||||
private void init() {
|
||||
if (this.timestamps.size() > 0) {
|
||||
minTimeStamp = this.timestamps.first();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Gets the minimum timestamp requested by filter.
|
||||
* @return minimum timestamp requested by filter.
|
||||
*/
|
||||
public long getMin() {
|
||||
return minTimeStamp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ReturnCode filterKeyValue(KeyValue v) {
|
||||
if (this.timestamps.contains(v.getTimestamp())) {
|
||||
return ReturnCode.INCLUDE;
|
||||
} else if (v.getTimestamp() < minTimeStamp) {
|
||||
// The remaining versions of this column are guaranteed
|
||||
// to be lesser than all of the other values.
|
||||
return ReturnCode.NEXT_COL;
|
||||
}
|
||||
return ReturnCode.SKIP;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
int numTimestamps = in.readInt();
|
||||
this.timestamps = new TreeSet<Long>();
|
||||
for (int idx = 0; idx < numTimestamps; idx++) {
|
||||
this.timestamps.add(in.readLong());
|
||||
}
|
||||
init();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void write(DataOutput out) throws IOException {
|
||||
int numTimestamps = this.timestamps.size();
|
||||
out.writeInt(numTimestamps);
|
||||
for (Long timestamp : this.timestamps) {
|
||||
out.writeLong(timestamp);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -173,8 +173,10 @@ public class ScanQueryMatcher extends QueryMatcher {
|
|||
|
||||
if (filterResponse == ReturnCode.SKIP)
|
||||
return MatchCode.SKIP;
|
||||
|
||||
else if (filterResponse == ReturnCode.NEXT_COL)
|
||||
return MatchCode.SEEK_NEXT_COL;
|
||||
// else if (filterResponse == ReturnCode.NEXT_ROW)
|
||||
|
||||
stickyNextRow = true;
|
||||
return MatchCode.SEEK_NEXT_ROW;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,342 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.filter.Filter;
|
||||
import org.apache.hadoop.hbase.filter.TimestampsFilter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Run tests related to {@link TimestampsFilter} using HBase client APIs.
|
||||
* Sets up the HBase mini cluster once at start. Each creates a table
|
||||
* named for the method and does its stuff against that.
|
||||
*/
|
||||
public class TestTimestampsFilter {
|
||||
final Log LOG = LogFactory.getLog(getClass());
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
/**
|
||||
* Test from client side for TimestampsFilter.
|
||||
*
|
||||
* The TimestampsFilter provides the ability to request cells (KeyValues)
|
||||
* whose timestamp/version is in the specified list of timestamps/version.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testTimestampsFilter() throws Exception {
|
||||
byte [] TABLE = Bytes.toBytes("testTimestampsFilter");
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
KeyValue kvs[];
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
|
||||
for (int colIdx = 0; colIdx < 5; colIdx++) {
|
||||
// insert versions 201..300
|
||||
putNVersions(ht, FAMILY, rowIdx, colIdx, 201, 300);
|
||||
// insert versions 1..100
|
||||
putNVersions(ht, FAMILY, rowIdx, colIdx, 1, 100);
|
||||
}
|
||||
}
|
||||
|
||||
// do some verification before flush
|
||||
verifyInsertedValues(ht, FAMILY);
|
||||
|
||||
flush();
|
||||
|
||||
// do some verification after flush
|
||||
verifyInsertedValues(ht, FAMILY);
|
||||
|
||||
// Insert some more versions after flush. These should be in memstore.
|
||||
// After this we should have data in both memstore & HFiles.
|
||||
for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
|
||||
for (int colIdx = 0; colIdx < 5; colIdx++) {
|
||||
putNVersions(ht, FAMILY, rowIdx, colIdx, 301, 400);
|
||||
putNVersions(ht, FAMILY, rowIdx, colIdx, 101, 200);
|
||||
}
|
||||
}
|
||||
|
||||
for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
|
||||
for (int colIdx = 0; colIdx < 5; colIdx++) {
|
||||
kvs = getNVersions(ht, FAMILY, rowIdx, colIdx,
|
||||
Arrays.asList(505L, 5L, 105L, 305L, 205L));
|
||||
assertEquals(4, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, rowIdx, colIdx, 305);
|
||||
checkOneCell(kvs[1], FAMILY, rowIdx, colIdx, 205);
|
||||
checkOneCell(kvs[2], FAMILY, rowIdx, colIdx, 105);
|
||||
checkOneCell(kvs[3], FAMILY, rowIdx, colIdx, 5);
|
||||
}
|
||||
}
|
||||
|
||||
// Request an empty list of versions using the Timestamps filter;
|
||||
// Should return none.
|
||||
kvs = getNVersions(ht, FAMILY, 2, 2, new ArrayList<Long>());
|
||||
assertEquals(0, kvs.length);
|
||||
|
||||
//
|
||||
// Test the filter using a Scan operation
|
||||
// Scan rows 0..4. For each row, get all its columns, but only
|
||||
// those versions of the columns with the specified timestamps.
|
||||
Result[] results = scanNVersions(ht, FAMILY, 0, 4,
|
||||
Arrays.asList(6L, 106L, 306L));
|
||||
assertEquals("# of rows returned from scan", 5, results.length);
|
||||
for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
|
||||
kvs = results[rowIdx].raw();
|
||||
// each row should have 5 columns.
|
||||
// And we have requested 3 versions for each.
|
||||
assertEquals("Number of KeyValues in result for row:" + rowIdx,
|
||||
3*5, kvs.length);
|
||||
for (int colIdx = 0; colIdx < 5; colIdx++) {
|
||||
int offset = colIdx * 3;
|
||||
checkOneCell(kvs[offset + 0], FAMILY, rowIdx, colIdx, 306);
|
||||
checkOneCell(kvs[offset + 1], FAMILY, rowIdx, colIdx, 106);
|
||||
checkOneCell(kvs[offset + 2], FAMILY, rowIdx, colIdx, 6);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test TimestampsFilter in the presence of version deletes.
|
||||
*
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testWithVersionDeletes() throws Exception {
|
||||
|
||||
// first test from memstore (without flushing).
|
||||
testWithVersionDeletes(false);
|
||||
|
||||
// run same test against HFiles (by forcing a flush).
|
||||
testWithVersionDeletes(true);
|
||||
}
|
||||
|
||||
private void testWithVersionDeletes(boolean flushTables) throws IOException {
|
||||
byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
|
||||
(flushTables ? "flush" : "noflush"));
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
// For row:0, col:0: insert versions 1 through 5.
|
||||
putNVersions(ht, FAMILY, 0, 0, 1, 5);
|
||||
|
||||
// delete version 4.
|
||||
deleteOneVersion(ht, FAMILY, 0, 0, 4);
|
||||
|
||||
if (flushTables) {
|
||||
flush();
|
||||
}
|
||||
|
||||
// request a bunch of versions including the deleted version. We should
|
||||
// only get back entries for the versions that exist.
|
||||
KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
|
||||
assertEquals(3, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 0, 0, 5);
|
||||
checkOneCell(kvs[1], FAMILY, 0, 0, 3);
|
||||
checkOneCell(kvs[2], FAMILY, 0, 0, 2);
|
||||
}
|
||||
|
||||
private void verifyInsertedValues(HTable ht, byte[] cf) throws IOException {
|
||||
for (int rowIdx = 0; rowIdx < 5; rowIdx++) {
|
||||
for (int colIdx = 0; colIdx < 5; colIdx++) {
|
||||
// ask for versions that exist.
|
||||
KeyValue[] kvs = getNVersions(ht, cf, rowIdx, colIdx,
|
||||
Arrays.asList(5L, 300L, 6L, 80L));
|
||||
assertEquals(4, kvs.length);
|
||||
checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
|
||||
checkOneCell(kvs[1], cf, rowIdx, colIdx, 80);
|
||||
checkOneCell(kvs[2], cf, rowIdx, colIdx, 6);
|
||||
checkOneCell(kvs[3], cf, rowIdx, colIdx, 5);
|
||||
|
||||
// ask for versions that do not exist.
|
||||
kvs = getNVersions(ht, cf, rowIdx, colIdx,
|
||||
Arrays.asList(101L, 102L));
|
||||
assertEquals(0, kvs.length);
|
||||
|
||||
// ask for some versions that exist and some that do not.
|
||||
kvs = getNVersions(ht, cf, rowIdx, colIdx,
|
||||
Arrays.asList(1L, 300L, 105L, 70L, 115L));
|
||||
assertEquals(3, kvs.length);
|
||||
checkOneCell(kvs[0], cf, rowIdx, colIdx, 300);
|
||||
checkOneCell(kvs[1], cf, rowIdx, colIdx, 70);
|
||||
checkOneCell(kvs[2], cf, rowIdx, colIdx, 1);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Flush tables. Since flushing is asynchronous, sleep for a bit.
|
||||
private void flush() throws IOException {
|
||||
TEST_UTIL.flush();
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException i) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the passed in KeyValue has expected contents for the
|
||||
* specified row, column & timestamp.
|
||||
*/
|
||||
private void checkOneCell(KeyValue kv, byte[] cf,
|
||||
int rowIdx, int colIdx, long ts) {
|
||||
|
||||
String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
|
||||
|
||||
assertEquals("Row mismatch which checking: " + ctx,
|
||||
"row:"+ rowIdx, Bytes.toString(kv.getRow()));
|
||||
|
||||
assertEquals("ColumnFamily mismatch while checking: " + ctx,
|
||||
Bytes.toString(cf), Bytes.toString(kv.getFamily()));
|
||||
|
||||
assertEquals("Column qualifier mismatch while checking: " + ctx,
|
||||
"column:" + colIdx,
|
||||
Bytes.toString(kv.getQualifier()));
|
||||
|
||||
assertEquals("Timestamp mismatch while checking: " + ctx,
|
||||
ts, kv.getTimestamp());
|
||||
|
||||
assertEquals("Value mismatch while checking: " + ctx,
|
||||
"value-version-" + ts, Bytes.toString(kv.getValue()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Uses the TimestampFilter on a Get to request a specified list of
|
||||
* versions for the row/column specified by rowIdx & colIdx.
|
||||
*
|
||||
*/
|
||||
private KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
|
||||
int colIdx, List<Long> versions)
|
||||
throws IOException {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
Filter filter = new TimestampsFilter(versions);
|
||||
Get get = new Get(row);
|
||||
get.addColumn(cf, column);
|
||||
get.setFilter(filter);
|
||||
get.setMaxVersions();
|
||||
Result result = ht.get(get);
|
||||
|
||||
return result.raw();
|
||||
}
|
||||
|
||||
/**
|
||||
* Uses the TimestampFilter on a Scan to request a specified list of
|
||||
* versions for the rows from startRowIdx to endRowIdx (both inclusive).
|
||||
*/
|
||||
private Result[] scanNVersions(HTable ht, byte[] cf, int startRowIdx,
|
||||
int endRowIdx, List<Long> versions)
|
||||
throws IOException {
|
||||
byte startRow[] = Bytes.toBytes("row:" + startRowIdx);
|
||||
byte endRow[] = Bytes.toBytes("row:" + endRowIdx + 1); // exclusive
|
||||
Filter filter = new TimestampsFilter(versions);
|
||||
Scan scan = new Scan(startRow, endRow);
|
||||
scan.setFilter(filter);
|
||||
scan.setMaxVersions();
|
||||
ResultScanner scanner = ht.getScanner(scan);
|
||||
return scanner.next(endRowIdx - startRowIdx + 1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert in specific row/column versions with timestamps
|
||||
* versionStart..versionEnd.
|
||||
*/
|
||||
private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
|
||||
long versionStart, long versionEnd)
|
||||
throws IOException {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
Put put = new Put(row);
|
||||
|
||||
for (long idx = versionStart; idx <= versionEnd; idx++) {
|
||||
put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
|
||||
}
|
||||
|
||||
ht.put(put);
|
||||
}
|
||||
|
||||
/**
|
||||
* For row/column specified by rowIdx/colIdx, delete the cell
|
||||
* corresponding to the specified version.
|
||||
*/
|
||||
private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
|
||||
int colIdx, long version)
|
||||
throws IOException {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
Delete del = new Delete(row);
|
||||
del.deleteColumn(cf, column, version);
|
||||
ht.delete(del);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue