HBASE-2265 HFile and Memstore should maintain minimum and maximum timestamps
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@963862 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c45b3bc413
commit
3c080bb8cc
|
@ -760,6 +760,8 @@ Release 0.21.0 - Unreleased
|
|||
in a single roundtrip (Kannan via Ryan)
|
||||
HBASE-2828 HTable unnecessarily coupled with HMaster
|
||||
(Nicolas Spiegelberg via Stack)
|
||||
HBASE-2265 HFile and Memstore should maintain minimum and maximum timestamps
|
||||
(Pranav via Ryan)
|
||||
|
||||
NEW FEATURES
|
||||
HBASE-1961 HBase EC2 scripts
|
||||
|
|
|
@ -924,6 +924,15 @@ public class KeyValue implements Writable, HeapSize {
|
|||
return getType() == Type.DeleteFamily.getCode();
|
||||
}
|
||||
|
||||
/**
|
||||
*
|
||||
* @return True if this KV is a delete family or column type.
|
||||
*/
|
||||
public boolean isDeleteColumnOrFamily() {
|
||||
int t = getType();
|
||||
return t == Type.DeleteColumn.getCode() || t == Type.DeleteFamily.getCode();
|
||||
}
|
||||
|
||||
/**
|
||||
* Primarily for use client-side. Returns the family of this KeyValue in a
|
||||
* new byte array.<p>
|
||||
|
|
|
@ -41,7 +41,7 @@ import java.util.PriorityQueue;
|
|||
* as an InternalScanner at the Store level, you will get runtime exceptions.
|
||||
*/
|
||||
public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
||||
private PriorityQueue<KeyValueScanner> heap;
|
||||
private PriorityQueue<KeyValueScanner> heap = null;
|
||||
private KeyValueScanner current = null;
|
||||
private KVScannerComparator comparator;
|
||||
|
||||
|
@ -51,8 +51,10 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
* @param scanners
|
||||
* @param comparator
|
||||
*/
|
||||
public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) {
|
||||
public KeyValueHeap(List<? extends KeyValueScanner> scanners,
|
||||
KVComparator comparator) {
|
||||
this.comparator = new KVScannerComparator(comparator);
|
||||
if (!scanners.isEmpty()) {
|
||||
this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
|
||||
this.comparator);
|
||||
for (KeyValueScanner scanner : scanners) {
|
||||
|
@ -64,6 +66,7 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
}
|
||||
this.current = heap.poll();
|
||||
}
|
||||
}
|
||||
|
||||
public KeyValue peek() {
|
||||
if (this.current == null) {
|
||||
|
@ -104,6 +107,9 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
* @return true if there are more keys, false if all scanners are done
|
||||
*/
|
||||
public boolean next(List<KeyValue> result, int limit) throws IOException {
|
||||
if (this.current == null) {
|
||||
return false;
|
||||
}
|
||||
InternalScanner currentAsInternal = (InternalScanner)this.current;
|
||||
currentAsInternal.next(result, limit);
|
||||
KeyValue pee = this.current.peek();
|
||||
|
@ -163,11 +169,13 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner {
|
|||
if (this.current != null) {
|
||||
this.current.close();
|
||||
}
|
||||
if (this.heap != null) {
|
||||
KeyValueScanner scanner;
|
||||
while ((scanner = this.heap.poll()) != null) {
|
||||
scanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Seeks all scanners at or below the specified seek key. If we earlied-out
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.HeapSize;
|
||||
import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -81,6 +82,9 @@ public class MemStore implements HeapSize {
|
|||
// Used to track own heapSize
|
||||
final AtomicLong size;
|
||||
|
||||
TimeRangeTracker timeRangeTracker;
|
||||
TimeRangeTracker snapshotTimeRangeTracker;
|
||||
|
||||
/**
|
||||
* Default constructor. Used for tests.
|
||||
*/
|
||||
|
@ -99,6 +103,8 @@ public class MemStore implements HeapSize {
|
|||
this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
|
||||
this.kvset = new KeyValueSkipListSet(c);
|
||||
this.snapshot = new KeyValueSkipListSet(c);
|
||||
timeRangeTracker = new TimeRangeTracker();
|
||||
snapshotTimeRangeTracker = new TimeRangeTracker();
|
||||
this.size = new AtomicLong(DEEP_OVERHEAD);
|
||||
}
|
||||
|
||||
|
@ -128,6 +134,8 @@ public class MemStore implements HeapSize {
|
|||
if (!this.kvset.isEmpty()) {
|
||||
this.snapshot = this.kvset;
|
||||
this.kvset = new KeyValueSkipListSet(this.comparator);
|
||||
this.snapshotTimeRangeTracker = this.timeRangeTracker;
|
||||
this.timeRangeTracker = new TimeRangeTracker();
|
||||
// Reset heap to not include any keys
|
||||
this.size.set(DEEP_OVERHEAD);
|
||||
}
|
||||
|
@ -167,6 +175,7 @@ public class MemStore implements HeapSize {
|
|||
// create a new snapshot and let the old one go.
|
||||
if (!ss.isEmpty()) {
|
||||
this.snapshot = new KeyValueSkipListSet(this.comparator);
|
||||
this.snapshotTimeRangeTracker = new TimeRangeTracker();
|
||||
}
|
||||
} finally {
|
||||
this.lock.writeLock().unlock();
|
||||
|
@ -183,6 +192,7 @@ public class MemStore implements HeapSize {
|
|||
this.lock.readLock().lock();
|
||||
try {
|
||||
s = heapSizeChange(kv, this.kvset.add(kv));
|
||||
timeRangeTracker.includeTimestamp(kv);
|
||||
this.size.addAndGet(s);
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
|
@ -198,9 +208,9 @@ public class MemStore implements HeapSize {
|
|||
long delete(final KeyValue delete) {
|
||||
long s = 0;
|
||||
this.lock.readLock().lock();
|
||||
|
||||
try {
|
||||
s += heapSizeChange(delete, this.kvset.add(delete));
|
||||
timeRangeTracker.includeTimestamp(delete);
|
||||
} finally {
|
||||
this.lock.readLock().unlock();
|
||||
}
|
||||
|
@ -487,6 +497,19 @@ public class MemStore implements HeapSize {
|
|||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this memstore may contain the required keys
|
||||
* @param scan
|
||||
* @return False if the key definitely does not exist in this Memstore
|
||||
*/
|
||||
public boolean shouldSeek(Scan scan) {
|
||||
return timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
|
||||
snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange());
|
||||
}
|
||||
|
||||
public TimeRangeTracker getSnapshotTimeRangeTracker() {
|
||||
return this.snapshotTimeRangeTracker;
|
||||
}
|
||||
|
||||
/*
|
||||
* MemStoreScanner implements the KeyValueScanner.
|
||||
|
|
|
@ -417,15 +417,17 @@ public class Store implements HeapSize {
|
|||
* Write out current snapshot. Presumes {@link #snapshot()} has been called
|
||||
* previously.
|
||||
* @param logCacheFlushId flush sequence number
|
||||
* @param snapshot
|
||||
* @return true if a compaction is needed
|
||||
* @throws IOException
|
||||
*/
|
||||
private StoreFile flushCache(final long logCacheFlushId,
|
||||
SortedSet<KeyValue> snapshot) throws IOException {
|
||||
SortedSet<KeyValue> snapshot,
|
||||
TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
|
||||
// If an exception happens flushing, we let it out without clearing
|
||||
// the memstore snapshot. The old snapshot will be returned when we say
|
||||
// 'snapshot', the next time flush comes around.
|
||||
return internalFlushCache(snapshot, logCacheFlushId);
|
||||
return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -435,7 +437,8 @@ public class Store implements HeapSize {
|
|||
* @throws IOException
|
||||
*/
|
||||
private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
|
||||
final long logCacheFlushId)
|
||||
final long logCacheFlushId,
|
||||
TimeRangeTracker snapshotTimeRangeTracker)
|
||||
throws IOException {
|
||||
StoreFile.Writer writer = null;
|
||||
long flushed = 0;
|
||||
|
@ -450,6 +453,7 @@ public class Store implements HeapSize {
|
|||
synchronized (flushLock) {
|
||||
// A. Write the map out to the disk
|
||||
writer = createWriterInTmp(set.size());
|
||||
writer.setTimeRangeTracker(snapshotTimeRangeTracker);
|
||||
int entries = 0;
|
||||
try {
|
||||
for (KeyValue kv: set) {
|
||||
|
@ -1436,6 +1440,7 @@ public class Store implements HeapSize {
|
|||
private long cacheFlushId;
|
||||
private SortedSet<KeyValue> snapshot;
|
||||
private StoreFile storeFile;
|
||||
private TimeRangeTracker snapshotTimeRangeTracker;
|
||||
|
||||
private StoreFlusherImpl(long cacheFlushId) {
|
||||
this.cacheFlushId = cacheFlushId;
|
||||
|
@ -1445,11 +1450,12 @@ public class Store implements HeapSize {
|
|||
public void prepare() {
|
||||
memstore.snapshot();
|
||||
this.snapshot = memstore.getSnapshot();
|
||||
this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void flushCache() throws IOException {
|
||||
storeFile = Store.this.flushCache(cacheFlushId, snapshot);
|
||||
storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -24,9 +24,9 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.KVComparator;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
|
||||
import org.apache.hadoop.hbase.io.Reference;
|
||||
import org.apache.hadoop.hbase.io.hfile.BlockCache;
|
||||
|
@ -38,7 +38,9 @@ import org.apache.hadoop.hbase.util.BloomFilter;
|
|||
import org.apache.hadoop.hbase.util.ByteBloomFilter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Hash;
|
||||
import org.apache.hadoop.hbase.util.Writables;
|
||||
import org.apache.hadoop.io.RawComparator;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
import com.google.common.base.Function;
|
||||
|
@ -105,6 +107,8 @@ public class StoreFile {
|
|||
public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
|
||||
/** Bloom filter Type in FileInfo */
|
||||
static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
|
||||
/** Key for Timerange information in metadata*/
|
||||
static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
|
||||
|
||||
/** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */
|
||||
static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
|
||||
|
@ -411,6 +415,17 @@ public class StoreFile {
|
|||
this.reader.loadBloomfilter();
|
||||
}
|
||||
|
||||
try {
|
||||
byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
|
||||
if (b != null) {
|
||||
this.reader.timeRangeTracker = new TimeRangeTracker();
|
||||
Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
|
||||
}
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.error("Error reading timestamp range data from meta -- " +
|
||||
"proceeding without", e);
|
||||
this.reader.timeRangeTracker = null;
|
||||
}
|
||||
return this.reader;
|
||||
}
|
||||
|
||||
|
@ -647,6 +662,14 @@ public class StoreFile {
|
|||
private KVComparator kvComparator;
|
||||
private KeyValue lastKv = null;
|
||||
private byte[] lastByteArray = null;
|
||||
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
|
||||
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
|
||||
* When flushing a memstore, we set TimeRange and use this variable to
|
||||
* indicate that it doesn't need to be calculated again while
|
||||
* appending KeyValues.
|
||||
* It is not set in cases of compactions when it is recalculated using only
|
||||
* the appended KeyValues*/
|
||||
boolean isTimeRangeTrackerSet = false;
|
||||
|
||||
protected HFile.Writer writer;
|
||||
/**
|
||||
|
@ -693,7 +716,49 @@ public class StoreFile {
|
|||
public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
|
||||
throws IOException {
|
||||
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
|
||||
writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
|
||||
writer.appendFileInfo(MAJOR_COMPACTION_KEY,
|
||||
Bytes.toBytes(majorCompaction));
|
||||
appendTimeRangeMetadata();
|
||||
}
|
||||
|
||||
/**
|
||||
* Add TimestampRange to Metadata
|
||||
*/
|
||||
public void appendTimeRangeMetadata() throws IOException {
|
||||
appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
|
||||
}
|
||||
|
||||
/**
|
||||
* Set TimeRangeTracker
|
||||
* @param trt
|
||||
*/
|
||||
public void setTimeRangeTracker(final TimeRangeTracker trt) {
|
||||
this.timeRangeTracker = trt;
|
||||
isTimeRangeTrackerSet = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* If the timeRangeTracker is not set,
|
||||
* update TimeRangeTracker to include the timestamp of this key
|
||||
* @param kv
|
||||
* @throws IOException
|
||||
*/
|
||||
public void includeInTimeRangeTracker(final KeyValue kv) {
|
||||
if (!isTimeRangeTrackerSet) {
|
||||
timeRangeTracker.includeTimestamp(kv);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If the timeRangeTracker is not set,
|
||||
* update TimeRangeTracker to include the timestamp of this key
|
||||
* @param key
|
||||
* @throws IOException
|
||||
*/
|
||||
public void includeInTimeRangeTracker(final byte [] key) {
|
||||
if (!isTimeRangeTrackerSet) {
|
||||
timeRangeTracker.includeTimestamp(key);
|
||||
}
|
||||
}
|
||||
|
||||
public void append(final KeyValue kv) throws IOException {
|
||||
|
@ -744,6 +809,7 @@ public class StoreFile {
|
|||
}
|
||||
}
|
||||
writer.append(kv);
|
||||
includeInTimeRangeTracker(kv);
|
||||
}
|
||||
|
||||
public Path getPath() {
|
||||
|
@ -759,6 +825,7 @@ public class StoreFile {
|
|||
}
|
||||
}
|
||||
writer.append(key, value);
|
||||
includeInTimeRangeTracker(key);
|
||||
}
|
||||
|
||||
public void close() throws IOException {
|
||||
|
@ -794,6 +861,7 @@ public class StoreFile {
|
|||
protected BloomFilter bloomFilter = null;
|
||||
protected BloomType bloomFilterType;
|
||||
private final HFile.Reader reader;
|
||||
protected TimeRangeTracker timeRangeTracker = null;
|
||||
|
||||
public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory)
|
||||
throws IOException {
|
||||
|
@ -834,13 +902,28 @@ public class StoreFile {
|
|||
reader.close();
|
||||
}
|
||||
|
||||
public boolean shouldSeek(final byte[] row,
|
||||
final SortedSet<byte[]> columns) {
|
||||
|
||||
if (this.bloomFilter == null) {
|
||||
return true;
|
||||
public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
|
||||
return (passesTimerangeFilter(scan) && passesBloomFilter(scan,columns));
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if this storeFile may contain keys within the TimeRange
|
||||
* @param scan
|
||||
* @return False if it definitely does not exist in this StoreFile
|
||||
*/
|
||||
private boolean passesTimerangeFilter(Scan scan) {
|
||||
if (timeRangeTracker == null) {
|
||||
return true;
|
||||
} else {
|
||||
return timeRangeTracker.includesTimeRange(scan.getTimeRange());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
|
||||
if (this.bloomFilter == null || !scan.isGetScan()) {
|
||||
return true;
|
||||
}
|
||||
byte[] row = scan.getStartRow();
|
||||
byte[] key;
|
||||
switch (this.bloomFilterType) {
|
||||
case ROW:
|
||||
|
|
|
@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -131,9 +132,8 @@ class StoreFileScanner implements KeyValueScanner {
|
|||
return true;
|
||||
}
|
||||
|
||||
// Bloom filter hook.
|
||||
public boolean shouldSeek(final byte[] row,
|
||||
final SortedSet<byte[]> columns) {
|
||||
return reader.shouldSeek(row, columns);
|
||||
// StoreFile filter hook.
|
||||
public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
|
||||
return reader.shouldSeek(scan, columns);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -154,17 +154,17 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb
|
|||
List<KeyValueScanner> scanners =
|
||||
new ArrayList<KeyValueScanner>(sfScanners.size()+1);
|
||||
|
||||
// exclude scan files that have failed file filters
|
||||
// include only those scan files which pass all filters
|
||||
for (StoreFileScanner sfs : sfScanners) {
|
||||
if (isGet &&
|
||||
!sfs.shouldSeek(scan.getStartRow(), columns)) {
|
||||
continue; // exclude this hfs
|
||||
}
|
||||
if (sfs.shouldSeek(scan, columns)) {
|
||||
scanners.add(sfs);
|
||||
}
|
||||
}
|
||||
|
||||
// Then the memstore scanners
|
||||
if (this.store.memstore.shouldSeek(scan)) {
|
||||
scanners.addAll(this.store.memstore.getScanners());
|
||||
}
|
||||
return scanners;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,147 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.KeyValue.Type;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
/**
|
||||
* Stores the minimum and maximum timestamp values.
|
||||
* Can be used to find if any given time range overlaps with its time range
|
||||
* MemStores use this class to track its minimum and maximum timestamps.
|
||||
* When writing StoreFiles, this information is stored in meta blocks and used
|
||||
* at read time to match against the required TimeRange
|
||||
*/
|
||||
public class TimeRangeTracker implements Writable {
|
||||
|
||||
long minimumTimestamp = -1;
|
||||
long maximumTimestamp = -1;
|
||||
|
||||
/**
|
||||
* Default constructor.
|
||||
* Initializes TimeRange to be null
|
||||
*/
|
||||
public TimeRangeTracker() {
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Copy Constructor
|
||||
* @param trt source TimeRangeTracker
|
||||
*/
|
||||
public TimeRangeTracker(final TimeRangeTracker trt) {
|
||||
this.minimumTimestamp = trt.getMinimumTimestamp();
|
||||
this.maximumTimestamp = trt.getMaximumTimestamp();
|
||||
}
|
||||
|
||||
public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
|
||||
this.minimumTimestamp = minimumTimestamp;
|
||||
this.maximumTimestamp = maximumTimestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the current TimestampRange to include the timestamp from KeyValue
|
||||
* If the Key is of type DeleteColumn or DeleteFamily, it includes the
|
||||
* entire time range from 0 to timestamp of the key.
|
||||
* @param kv the KeyValue to include
|
||||
*/
|
||||
public void includeTimestamp(final KeyValue kv) {
|
||||
includeTimestamp(kv.getTimestamp());
|
||||
if (kv.isDeleteColumnOrFamily()) {
|
||||
includeTimestamp(0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the current TimestampRange to include the timestamp from Key.
|
||||
* If the Key is of type DeleteColumn or DeleteFamily, it includes the
|
||||
* entire time range from 0 to timestamp of the key.
|
||||
* @param key
|
||||
*/
|
||||
public void includeTimestamp(final byte[] key) {
|
||||
includeTimestamp(Bytes.toLong(key,key.length-KeyValue.TIMESTAMP_TYPE_SIZE));
|
||||
int type = key[key.length - 1];
|
||||
if (type == Type.DeleteColumn.getCode() ||
|
||||
type == Type.DeleteFamily.getCode()) {
|
||||
includeTimestamp(0);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* If required, update the current TimestampRange to include timestamp
|
||||
* @param timestamp the timestamp value to include
|
||||
*/
|
||||
private void includeTimestamp(final long timestamp) {
|
||||
if (maximumTimestamp==-1) {
|
||||
minimumTimestamp = timestamp;
|
||||
maximumTimestamp = timestamp;
|
||||
}
|
||||
else if (minimumTimestamp > timestamp) {
|
||||
minimumTimestamp = timestamp;
|
||||
}
|
||||
else if (maximumTimestamp < timestamp) {
|
||||
maximumTimestamp = timestamp;
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the range has any overlap with TimeRange
|
||||
* @param tr TimeRange
|
||||
* @return True if there is overlap, false otherwise
|
||||
*/
|
||||
public boolean includesTimeRange(final TimeRange tr) {
|
||||
return (this.minimumTimestamp < tr.getMax() &&
|
||||
this.maximumTimestamp >= tr.getMin());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the minimumTimestamp
|
||||
*/
|
||||
public long getMinimumTimestamp() {
|
||||
return minimumTimestamp;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximumTimestamp
|
||||
*/
|
||||
public long getMaximumTimestamp() {
|
||||
return maximumTimestamp;
|
||||
}
|
||||
|
||||
public void write(final DataOutput out) throws IOException {
|
||||
out.writeLong(minimumTimestamp);
|
||||
out.writeLong(maximumTimestamp);
|
||||
}
|
||||
|
||||
public void readFields(final DataInput in) throws IOException {
|
||||
this.minimumTimestamp = in.readLong();
|
||||
this.maximumTimestamp = in.readLong();
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,304 @@
|
|||
/**
|
||||
* Copyright 2009 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.After;
|
||||
import org.junit.AfterClass;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Run tests related to {@link TimestampsFilter} using HBase client APIs.
|
||||
* Sets up the HBase mini cluster once at start. Each creates a table
|
||||
* named for the method and does its stuff against that.
|
||||
*/
|
||||
public class TestMultipleTimestamps {
|
||||
final Log LOG = LogFactory.getLog(getClass());
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniCluster(3);
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniCluster();
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithVersionDeletes() throws Exception {
|
||||
|
||||
// first test from memstore (without flushing).
|
||||
testWithVersionDeletes(false);
|
||||
|
||||
// run same test against HFiles (by forcing a flush).
|
||||
testWithVersionDeletes(true);
|
||||
}
|
||||
|
||||
public void testWithVersionDeletes(boolean flushTables) throws IOException {
|
||||
byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
|
||||
(flushTables ? "flush" : "noflush"));
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
// For row:0, col:0: insert versions 1 through 5.
|
||||
putNVersions(ht, FAMILY, 0, 0, 1, 5);
|
||||
|
||||
if (flushTables) {
|
||||
flush();
|
||||
}
|
||||
|
||||
// delete version 4.
|
||||
deleteOneVersion(ht, FAMILY, 0, 0, 4);
|
||||
|
||||
// request a bunch of versions including the deleted version. We should
|
||||
// only get back entries for the versions that exist.
|
||||
KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
|
||||
assertEquals(3, kvs.length);
|
||||
checkOneCell(kvs[0], FAMILY, 0, 0, 5);
|
||||
checkOneCell(kvs[1], FAMILY, 0, 0, 3);
|
||||
checkOneCell(kvs[2], FAMILY, 0, 0, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithMultipleVersionDeletes() throws IOException {
|
||||
byte [] TABLE = Bytes.toBytes("testWithMultipleVersionDeletes");
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
// For row:0, col:0: insert versions 1 through 5.
|
||||
putNVersions(ht, FAMILY, 0, 0, 1, 5);
|
||||
|
||||
flush();
|
||||
|
||||
// delete all versions before 4.
|
||||
deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
|
||||
|
||||
// request a bunch of versions including the deleted version. We should
|
||||
// only get back entries for the versions that exist.
|
||||
KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
|
||||
assertEquals(0, kvs.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithColumnDeletes() throws IOException {
|
||||
byte [] TABLE = Bytes.toBytes("testWithColumnDeletes");
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
// For row:0, col:0: insert versions 1 through 5.
|
||||
putNVersions(ht, FAMILY, 0, 0, 1, 5);
|
||||
|
||||
flush();
|
||||
|
||||
// delete all versions before 4.
|
||||
deleteColumn(ht, FAMILY, 0, 0);
|
||||
|
||||
// request a bunch of versions including the deleted version. We should
|
||||
// only get back entries for the versions that exist.
|
||||
KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
|
||||
assertEquals(0, kvs.length);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithFamilyDeletes() throws IOException {
|
||||
byte [] TABLE = Bytes.toBytes("testWithFamilyDeletes");
|
||||
byte [] FAMILY = Bytes.toBytes("event_log");
|
||||
byte [][] FAMILIES = new byte[][] { FAMILY };
|
||||
|
||||
// create table; set versions to max...
|
||||
HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
|
||||
|
||||
// For row:0, col:0: insert versions 1 through 5.
|
||||
putNVersions(ht, FAMILY, 0, 0, 1, 5);
|
||||
|
||||
flush();
|
||||
|
||||
// delete all versions before 4.
|
||||
deleteFamily(ht, FAMILY, 0);
|
||||
|
||||
// request a bunch of versions including the deleted version. We should
|
||||
// only get back entries for the versions that exist.
|
||||
KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
|
||||
assertEquals(0, kvs.length);
|
||||
}
|
||||
|
||||
// Flush tables. Since flushing is asynchronous, sleep for a bit.
|
||||
private void flush() throws IOException {
|
||||
TEST_UTIL.flush();
|
||||
try {
|
||||
Thread.sleep(3000);
|
||||
} catch (InterruptedException i) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Assert that the passed in KeyValue has expected contents for the
|
||||
* specified row, column & timestamp.
|
||||
*/
|
||||
private void checkOneCell(KeyValue kv, byte[] cf,
|
||||
int rowIdx, int colIdx, long ts) {
|
||||
|
||||
String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
|
||||
|
||||
assertEquals("Row mismatch which checking: " + ctx,
|
||||
"row:"+ rowIdx, Bytes.toString(kv.getRow()));
|
||||
|
||||
assertEquals("ColumnFamily mismatch while checking: " + ctx,
|
||||
Bytes.toString(cf), Bytes.toString(kv.getFamily()));
|
||||
|
||||
assertEquals("Column qualifier mismatch while checking: " + ctx,
|
||||
"column:" + colIdx,
|
||||
Bytes.toString(kv.getQualifier()));
|
||||
|
||||
assertEquals("Timestamp mismatch while checking: " + ctx,
|
||||
ts, kv.getTimestamp());
|
||||
|
||||
assertEquals("Value mismatch while checking: " + ctx,
|
||||
"value-version-" + ts, Bytes.toString(kv.getValue()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Uses the TimestampFilter on a Get to request a specified list of
|
||||
* versions for the row/column specified by rowIdx & colIdx.
|
||||
*
|
||||
*/
|
||||
private KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
|
||||
int colIdx, List<Long> versions)
|
||||
throws IOException {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
Get get = new Get(row);
|
||||
get.addColumn(cf, column);
|
||||
get.setMaxVersions();
|
||||
get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
|
||||
Result result = ht.get(get);
|
||||
|
||||
return result.raw();
|
||||
}
|
||||
|
||||
/**
|
||||
* Insert in specific row/column versions with timestamps
|
||||
* versionStart..versionEnd.
|
||||
*/
|
||||
private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
|
||||
long versionStart, long versionEnd)
|
||||
throws IOException {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
Put put = new Put(row);
|
||||
|
||||
for (long idx = versionStart; idx <= versionEnd; idx++) {
|
||||
put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
|
||||
}
|
||||
|
||||
ht.put(put);
|
||||
}
|
||||
|
||||
/**
|
||||
* For row/column specified by rowIdx/colIdx, delete the cell
|
||||
* corresponding to the specified version.
|
||||
*/
|
||||
private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
|
||||
int colIdx, long version)
|
||||
throws IOException {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
Delete del = new Delete(row);
|
||||
del.deleteColumn(cf, column, version);
|
||||
ht.delete(del);
|
||||
}
|
||||
|
||||
/**
|
||||
* For row/column specified by rowIdx/colIdx, delete all cells
|
||||
* preceeding the specified version.
|
||||
*/
|
||||
private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx,
|
||||
int colIdx, long version)
|
||||
throws IOException {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
Delete del = new Delete(row);
|
||||
del.deleteColumns(cf, column, version);
|
||||
ht.delete(del);
|
||||
}
|
||||
|
||||
private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
byte column[] = Bytes.toBytes("column:" + colIdx);
|
||||
Delete del = new Delete(row);
|
||||
del.deleteColumns(cf, column);
|
||||
ht.delete(del);
|
||||
}
|
||||
|
||||
private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException {
|
||||
byte row[] = Bytes.toBytes("row:" + rowIdx);
|
||||
Delete del = new Delete(row);
|
||||
del.deleteFamily(cf);
|
||||
ht.delete(del);
|
||||
}
|
||||
}
|
||||
|
|
@ -884,6 +884,39 @@ public class TestMemStore extends TestCase {
|
|||
}
|
||||
|
||||
|
||||
////////////////////////////////////
|
||||
//Test for timestamps
|
||||
////////////////////////////////////
|
||||
|
||||
/**
|
||||
* Test to ensure correctness when using Memstore with multiple timestamps
|
||||
*/
|
||||
public void testMultipleTimestamps() throws IOException {
|
||||
long[] timestamps = new long[] {20,10,5,1};
|
||||
Scan scan = new Scan();
|
||||
|
||||
for (long timestamp: timestamps)
|
||||
addRows(memstore,timestamp);
|
||||
|
||||
scan.setTimeRange(0, 2);
|
||||
assertTrue(memstore.shouldSeek(scan));
|
||||
|
||||
scan.setTimeRange(20, 82);
|
||||
assertTrue(memstore.shouldSeek(scan));
|
||||
|
||||
scan.setTimeRange(10, 20);
|
||||
assertTrue(memstore.shouldSeek(scan));
|
||||
|
||||
scan.setTimeRange(8, 12);
|
||||
assertTrue(memstore.shouldSeek(scan));
|
||||
|
||||
/*This test is not required for correctness but it should pass when
|
||||
* timestamp range optimization is on*/
|
||||
//scan.setTimeRange(28, 42);
|
||||
//assertTrue(!memstore.shouldSeek(scan));
|
||||
}
|
||||
|
||||
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
// Helpers
|
||||
//////////////////////////////////////////////////////////////////////////////
|
||||
|
|
|
@ -49,10 +49,12 @@ import org.apache.hadoop.hbase.HConstants;
|
|||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.regionserver.wal.HLog;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.security.UnixUserGroupInformation;
|
||||
|
||||
import com.google.common.base.Joiner;
|
||||
|
@ -420,4 +422,88 @@ public class TestStore extends TestCase {
|
|||
storeFlusher.flushCache();
|
||||
storeFlusher.commit();
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Generate a list of KeyValues for testing based on given parameters
|
||||
* @param timestamps
|
||||
* @param numRows
|
||||
* @param qualifier
|
||||
* @param family
|
||||
* @return
|
||||
*/
|
||||
List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
|
||||
byte[] qualifier, byte[] family) {
|
||||
List<KeyValue> kvList = new ArrayList<KeyValue>();
|
||||
for (int i=1;i<=numRows;i++) {
|
||||
byte[] b = Bytes.toBytes(i);
|
||||
for (long timestamp: timestamps) {
|
||||
kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
|
||||
}
|
||||
}
|
||||
return kvList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to ensure correctness when using Stores with multiple timestamps
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testMultipleTimestamps() throws IOException {
|
||||
int numRows = 1;
|
||||
long[] timestamps1 = new long[] {1,5,10,20};
|
||||
long[] timestamps2 = new long[] {30,80};
|
||||
|
||||
init(this.getName());
|
||||
|
||||
List<KeyValue> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
|
||||
for (KeyValue kv : kvList1) {
|
||||
this.store.add(kv);
|
||||
}
|
||||
|
||||
this.store.snapshot();
|
||||
flushStore(store, id++);
|
||||
|
||||
List<KeyValue> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
|
||||
for(KeyValue kv : kvList2) {
|
||||
this.store.add(kv);
|
||||
}
|
||||
|
||||
NavigableSet<byte[]> columns = new ConcurrentSkipListSet<byte[]>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
columns.add(qf1);
|
||||
List<KeyValue> result;
|
||||
Get get = new Get(Bytes.toBytes(1));
|
||||
get.addColumn(family,qf1);
|
||||
|
||||
get.setTimeRange(0,15);
|
||||
result = new ArrayList<KeyValue>();
|
||||
this.store.get(get, columns, result);
|
||||
assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(40,90);
|
||||
result = new ArrayList<KeyValue>();
|
||||
this.store.get(get, columns, result);
|
||||
assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(10,45);
|
||||
result = new ArrayList<KeyValue>();
|
||||
this.store.get(get, columns, result);
|
||||
assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(80,145);
|
||||
result = new ArrayList<KeyValue>();
|
||||
this.store.get(get, columns, result);
|
||||
assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(1,2);
|
||||
result = new ArrayList<KeyValue>();
|
||||
this.store.get(get, columns, result);
|
||||
assertTrue(result.size()>0);
|
||||
|
||||
get.setTimeRange(90,200);
|
||||
result = new ArrayList<KeyValue>();
|
||||
this.store.get(get, columns, result);
|
||||
assertTrue(result.size()==0);
|
||||
}
|
||||
}
|
|
@ -25,6 +25,7 @@ import java.util.ArrayList;
|
|||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.List;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestCase;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeyValue;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.io.Reference.Range;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFile;
|
||||
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
|
||||
|
@ -354,7 +356,9 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
TreeSet<byte[]> columns = new TreeSet<byte[]>();
|
||||
columns.add("family:col".getBytes());
|
||||
|
||||
boolean exists = scanner.shouldSeek(row.getBytes(), columns);
|
||||
Scan scan = new Scan(row.getBytes(),row.getBytes());
|
||||
scan.addColumn("family".getBytes(), "family:col".getBytes());
|
||||
boolean exists = scanner.shouldSeek(scan, columns);
|
||||
if (i % 2 == 0) {
|
||||
if (!exists) falseNeg++;
|
||||
} else {
|
||||
|
@ -428,7 +432,9 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
TreeSet<byte[]> columns = new TreeSet<byte[]>();
|
||||
columns.add(("col" + col).getBytes());
|
||||
|
||||
boolean exists = scanner.shouldSeek(row.getBytes(), columns);
|
||||
Scan scan = new Scan(row.getBytes(),row.getBytes());
|
||||
scan.addColumn("family".getBytes(), ("col"+col).getBytes());
|
||||
boolean exists = scanner.shouldSeek(scan, columns);
|
||||
boolean shouldRowExist = i % 2 == 0;
|
||||
boolean shouldColExist = j % 2 == 0;
|
||||
shouldColExist = shouldColExist || bt[x] == StoreFile.BloomType.ROW;
|
||||
|
@ -497,4 +503,76 @@ public class TestStoreFile extends HBaseTestCase {
|
|||
return mock;
|
||||
}
|
||||
|
||||
/**
|
||||
*Generate a list of KeyValues for testing based on given parameters
|
||||
* @param timestamps
|
||||
* @param numRows
|
||||
* @param qualifier
|
||||
* @param family
|
||||
* @return
|
||||
*/
|
||||
List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
|
||||
byte[] qualifier, byte[] family) {
|
||||
List<KeyValue> kvList = new ArrayList<KeyValue>();
|
||||
for (int i=1;i<=numRows;i++) {
|
||||
byte[] b = Bytes.toBytes(i) ;
|
||||
LOG.info(Bytes.toString(b));
|
||||
LOG.info(Bytes.toString(b));
|
||||
for (long timestamp: timestamps)
|
||||
{
|
||||
kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
|
||||
}
|
||||
}
|
||||
return kvList;
|
||||
}
|
||||
|
||||
/**
|
||||
* Test to ensure correctness when using StoreFile with multiple timestamps
|
||||
* @throws IOException
|
||||
*/
|
||||
public void testMultipleTimestamps() throws IOException {
|
||||
byte[] family = Bytes.toBytes("familyname");
|
||||
byte[] qualifier = Bytes.toBytes("qualifier");
|
||||
int numRows = 10;
|
||||
long[] timestamps = new long[] {20,10,5,1};
|
||||
Scan scan = new Scan();
|
||||
|
||||
Path storedir = new Path(new Path(this.testDir, "regionname"),
|
||||
"familyname");
|
||||
Path dir = new Path(storedir, "1234567890");
|
||||
StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024);
|
||||
|
||||
List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
|
||||
family, qualifier);
|
||||
|
||||
for (KeyValue kv : kvList) {
|
||||
writer.append(kv);
|
||||
}
|
||||
writer.appendMetadata(0, false);
|
||||
writer.close();
|
||||
|
||||
StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
|
||||
StoreFile.BloomType.NONE, false);
|
||||
StoreFile.Reader reader = hsf.createReader();
|
||||
StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
|
||||
TreeSet<byte[]> columns = new TreeSet<byte[]>();
|
||||
columns.add(qualifier);
|
||||
|
||||
scan.setTimeRange(20, 100);
|
||||
assertTrue(scanner.shouldSeek(scan, columns));
|
||||
|
||||
scan.setTimeRange(1, 2);
|
||||
assertTrue(scanner.shouldSeek(scan, columns));
|
||||
|
||||
scan.setTimeRange(8, 10);
|
||||
assertTrue(scanner.shouldSeek(scan, columns));
|
||||
|
||||
scan.setTimeRange(7, 50);
|
||||
assertTrue(scanner.shouldSeek(scan, columns));
|
||||
|
||||
/*This test is not required for correctness but it should pass when
|
||||
* timestamp range optimization is on*/
|
||||
//scan.setTimeRange(27, 50);
|
||||
//assertTrue(!scanner.shouldSeek(scan, columns));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue