diff --git a/CHANGES.txt b/CHANGES.txt index f1b005849a4..ec4341f6542 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -760,6 +760,8 @@ Release 0.21.0 - Unreleased in a single roundtrip (Kannan via Ryan) HBASE-2828 HTable unnecessarily coupled with HMaster (Nicolas Spiegelberg via Stack) + HBASE-2265 HFile and Memstore should maintain minimum and maximum timestamps + (Pranav via Ryan) NEW FEATURES HBASE-1961 HBase EC2 scripts diff --git a/src/main/java/org/apache/hadoop/hbase/KeyValue.java b/src/main/java/org/apache/hadoop/hbase/KeyValue.java index 702e4e710a6..e32d6838f4a 100644 --- a/src/main/java/org/apache/hadoop/hbase/KeyValue.java +++ b/src/main/java/org/apache/hadoop/hbase/KeyValue.java @@ -924,6 +924,15 @@ public class KeyValue implements Writable, HeapSize { return getType() == Type.DeleteFamily.getCode(); } + /** + * + * @return True if this KV is a delete family or column type. + */ + public boolean isDeleteColumnOrFamily() { + int t = getType(); + return t == Type.DeleteColumn.getCode() || t == Type.DeleteFamily.getCode(); + } + /** * Primarily for use client-side. Returns the family of this KeyValue in a * new byte array.

diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index b0c7e95059b..7749ccf32d9 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -41,7 +41,7 @@ import java.util.PriorityQueue; * as an InternalScanner at the Store level, you will get runtime exceptions. */ public class KeyValueHeap implements KeyValueScanner, InternalScanner { - private PriorityQueue heap; + private PriorityQueue heap = null; private KeyValueScanner current = null; private KVScannerComparator comparator; @@ -51,22 +51,25 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner { * @param scanners * @param comparator */ - public KeyValueHeap(List scanners, KVComparator comparator) { + public KeyValueHeap(List scanners, + KVComparator comparator) { this.comparator = new KVScannerComparator(comparator); - this.heap = new PriorityQueue(scanners.size(), - this.comparator); - for (KeyValueScanner scanner : scanners) { - if (scanner.peek() != null) { - this.heap.add(scanner); - } else { - scanner.close(); + if (!scanners.isEmpty()) { + this.heap = new PriorityQueue(scanners.size(), + this.comparator); + for (KeyValueScanner scanner : scanners) { + if (scanner.peek() != null) { + this.heap.add(scanner); + } else { + scanner.close(); + } } + this.current = heap.poll(); } - this.current = heap.poll(); } public KeyValue peek() { - if(this.current == null) { + if (this.current == null) { return null; } return this.current.peek(); @@ -78,12 +81,12 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner { } KeyValue kvReturn = this.current.next(); KeyValue kvNext = this.current.peek(); - if(kvNext == null) { + if (kvNext == null) { this.current.close(); this.current = this.heap.poll(); } else { KeyValueScanner topScanner = this.heap.peek(); - if(topScanner == null || + if (topScanner == null || this.comparator.compare(kvNext, topScanner.peek()) > 0) { this.heap.add(this.current); this.current = this.heap.poll(); @@ -104,6 +107,9 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner { * @return true if there are more keys, false if all scanners are done */ public boolean next(List result, int limit) throws IOException { + if (this.current == null) { + return false; + } InternalScanner currentAsInternal = (InternalScanner)this.current; currentAsInternal.next(result, limit); KeyValue pee = this.current.peek(); @@ -160,12 +166,14 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner { } public void close() { - if(this.current != null) { + if (this.current != null) { this.current.close(); } - KeyValueScanner scanner; - while((scanner = this.heap.poll()) != null) { - scanner.close(); + if (this.heap != null) { + KeyValueScanner scanner; + while ((scanner = this.heap.poll()) != null) { + scanner.close(); + } } } @@ -178,10 +186,10 @@ public class KeyValueHeap implements KeyValueScanner, InternalScanner { * automatically closed and removed from the heap. * @param seekKey KeyValue to seek at or after * @return true if KeyValues exist at or after specified key, false if not - * @throws IOException + * @throws IOException */ public boolean seek(KeyValue seekKey) throws IOException { - if(this.current == null) { + if (this.current == null) { return false; } this.heap.add(this.current); diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index 7eac0295a22..bab7d494bc5 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode; import org.apache.hadoop.hbase.util.Bytes; @@ -81,6 +82,9 @@ public class MemStore implements HeapSize { // Used to track own heapSize final AtomicLong size; + TimeRangeTracker timeRangeTracker; + TimeRangeTracker snapshotTimeRangeTracker; + /** * Default constructor. Used for tests. */ @@ -99,6 +103,8 @@ public class MemStore implements HeapSize { this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType(); this.kvset = new KeyValueSkipListSet(c); this.snapshot = new KeyValueSkipListSet(c); + timeRangeTracker = new TimeRangeTracker(); + snapshotTimeRangeTracker = new TimeRangeTracker(); this.size = new AtomicLong(DEEP_OVERHEAD); } @@ -128,6 +134,8 @@ public class MemStore implements HeapSize { if (!this.kvset.isEmpty()) { this.snapshot = this.kvset; this.kvset = new KeyValueSkipListSet(this.comparator); + this.snapshotTimeRangeTracker = this.timeRangeTracker; + this.timeRangeTracker = new TimeRangeTracker(); // Reset heap to not include any keys this.size.set(DEEP_OVERHEAD); } @@ -167,6 +175,7 @@ public class MemStore implements HeapSize { // create a new snapshot and let the old one go. if (!ss.isEmpty()) { this.snapshot = new KeyValueSkipListSet(this.comparator); + this.snapshotTimeRangeTracker = new TimeRangeTracker(); } } finally { this.lock.writeLock().unlock(); @@ -183,6 +192,7 @@ public class MemStore implements HeapSize { this.lock.readLock().lock(); try { s = heapSizeChange(kv, this.kvset.add(kv)); + timeRangeTracker.includeTimestamp(kv); this.size.addAndGet(s); } finally { this.lock.readLock().unlock(); @@ -198,9 +208,9 @@ public class MemStore implements HeapSize { long delete(final KeyValue delete) { long s = 0; this.lock.readLock().lock(); - try { s += heapSizeChange(delete, this.kvset.add(delete)); + timeRangeTracker.includeTimestamp(delete); } finally { this.lock.readLock().unlock(); } @@ -487,6 +497,19 @@ public class MemStore implements HeapSize { return false; } + /** + * Check if this memstore may contain the required keys + * @param scan + * @return False if the key definitely does not exist in this Memstore + */ + public boolean shouldSeek(Scan scan) { + return timeRangeTracker.includesTimeRange(scan.getTimeRange()) || + snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange()); + } + + public TimeRangeTracker getSnapshotTimeRangeTracker() { + return this.snapshotTimeRangeTracker; + } /* * MemStoreScanner implements the KeyValueScanner. @@ -520,7 +543,7 @@ public class MemStore implements HeapSize { StoreScanner level with coordination with MemStoreScanner. */ - + MemStoreScanner() { super(); @@ -531,7 +554,7 @@ public class MemStore implements HeapSize { KeyValue ret = null; long readPoint = ReadWriteConsistencyControl.getThreadReadPoint(); //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint); - + while (ret == null && it.hasNext()) { KeyValue v = it.next(); if (v.getMemstoreTS() <= readPoint) { @@ -566,7 +589,7 @@ public class MemStore implements HeapSize { //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " + // snapshot.size() + " threadread = " + readPoint); - + KeyValue lowest = getLowest(); // has data := (lowest != null) @@ -631,7 +654,7 @@ public class MemStore implements HeapSize { public final static long FIXED_OVERHEAD = ClassSize.align( ClassSize.OBJECT + (7 * ClassSize.REFERENCE)); - + public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG + ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST + diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java index 9c720b141f6..f671d02fce0 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -212,7 +212,7 @@ public class Store implements HeapSize { return new Path(tabledir, new Path(encodedName, new Path(Bytes.toString(family)))); } - + /** * Return the directory in which this store stores its * StoreFiles @@ -417,15 +417,17 @@ public class Store implements HeapSize { * Write out current snapshot. Presumes {@link #snapshot()} has been called * previously. * @param logCacheFlushId flush sequence number + * @param snapshot * @return true if a compaction is needed * @throws IOException */ private StoreFile flushCache(final long logCacheFlushId, - SortedSet snapshot) throws IOException { + SortedSet snapshot, + TimeRangeTracker snapshotTimeRangeTracker) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. - return internalFlushCache(snapshot, logCacheFlushId); + return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker); } /* @@ -435,7 +437,8 @@ public class Store implements HeapSize { * @throws IOException */ private StoreFile internalFlushCache(final SortedSet set, - final long logCacheFlushId) + final long logCacheFlushId, + TimeRangeTracker snapshotTimeRangeTracker) throws IOException { StoreFile.Writer writer = null; long flushed = 0; @@ -450,6 +453,7 @@ public class Store implements HeapSize { synchronized (flushLock) { // A. Write the map out to the disk writer = createWriterInTmp(set.size()); + writer.setTimeRangeTracker(snapshotTimeRangeTracker); int entries = 0; try { for (KeyValue kv: set) { @@ -466,12 +470,12 @@ public class Store implements HeapSize { writer.close(); } } - + // Write-out finished successfully, move into the right spot Path dstPath = StoreFile.getUniqueFile(fs, homedir); LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath); fs.rename(writer.getPath(), dstPath); - + StoreFile sf = new StoreFile(this.fs, dstPath, blockcache, this.conf, this.family.getBloomFilterType(), this.inMemory); StoreFile.Reader r = sf.createReader(); @@ -1436,6 +1440,7 @@ public class Store implements HeapSize { private long cacheFlushId; private SortedSet snapshot; private StoreFile storeFile; + private TimeRangeTracker snapshotTimeRangeTracker; private StoreFlusherImpl(long cacheFlushId) { this.cacheFlushId = cacheFlushId; @@ -1445,11 +1450,12 @@ public class Store implements HeapSize { public void prepare() { memstore.snapshot(); this.snapshot = memstore.getSnapshot(); + this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker(); } @Override public void flushCache() throws IOException { - storeFile = Store.this.flushCache(cacheFlushId, snapshot); + storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker); } @Override diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index 674726e2fe0..133eb092fca 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -24,9 +24,9 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue.KVComparator; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HalfStoreFileReader; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.BlockCache; @@ -38,7 +38,9 @@ import org.apache.hadoop.hbase.util.BloomFilter; import org.apache.hadoop.hbase.util.ByteBloomFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Hash; +import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.io.RawComparator; +import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.util.StringUtils; import com.google.common.base.Function; @@ -105,6 +107,8 @@ public class StoreFile { public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY"); /** Bloom filter Type in FileInfo */ static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE"); + /** Key for Timerange information in metadata*/ + static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE"); /** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */ static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META"; @@ -411,6 +415,17 @@ public class StoreFile { this.reader.loadBloomfilter(); } + try { + byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY); + if (b != null) { + this.reader.timeRangeTracker = new TimeRangeTracker(); + Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker); + } + } catch (IllegalArgumentException e) { + LOG.error("Error reading timestamp range data from meta -- " + + "proceeding without", e); + this.reader.timeRangeTracker = null; + } return this.reader; } @@ -647,6 +662,14 @@ public class StoreFile { private KVComparator kvComparator; private KeyValue lastKv = null; private byte[] lastByteArray = null; + TimeRangeTracker timeRangeTracker = new TimeRangeTracker(); + /* isTimeRangeTrackerSet keeps track if the timeRange has already been set + * When flushing a memstore, we set TimeRange and use this variable to + * indicate that it doesn't need to be calculated again while + * appending KeyValues. + * It is not set in cases of compactions when it is recalculated using only + * the appended KeyValues*/ + boolean isTimeRangeTrackerSet = false; protected HFile.Writer writer; /** @@ -693,7 +716,49 @@ public class StoreFile { public void appendMetadata(final long maxSequenceId, final boolean majorCompaction) throws IOException { writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId)); - writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction)); + writer.appendFileInfo(MAJOR_COMPACTION_KEY, + Bytes.toBytes(majorCompaction)); + appendTimeRangeMetadata(); + } + + /** + * Add TimestampRange to Metadata + */ + public void appendTimeRangeMetadata() throws IOException { + appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker)); + } + + /** + * Set TimeRangeTracker + * @param trt + */ + public void setTimeRangeTracker(final TimeRangeTracker trt) { + this.timeRangeTracker = trt; + isTimeRangeTrackerSet = true; + } + + /** + * If the timeRangeTracker is not set, + * update TimeRangeTracker to include the timestamp of this key + * @param kv + * @throws IOException + */ + public void includeInTimeRangeTracker(final KeyValue kv) { + if (!isTimeRangeTrackerSet) { + timeRangeTracker.includeTimestamp(kv); + } + } + + /** + * If the timeRangeTracker is not set, + * update TimeRangeTracker to include the timestamp of this key + * @param key + * @throws IOException + */ + public void includeInTimeRangeTracker(final byte [] key) { + if (!isTimeRangeTrackerSet) { + timeRangeTracker.includeTimestamp(key); + } } public void append(final KeyValue kv) throws IOException { @@ -744,6 +809,7 @@ public class StoreFile { } } writer.append(kv); + includeInTimeRangeTracker(kv); } public Path getPath() { @@ -759,6 +825,7 @@ public class StoreFile { } } writer.append(key, value); + includeInTimeRangeTracker(key); } public void close() throws IOException { @@ -794,6 +861,7 @@ public class StoreFile { protected BloomFilter bloomFilter = null; protected BloomType bloomFilterType; private final HFile.Reader reader; + protected TimeRangeTracker timeRangeTracker = null; public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory) throws IOException { @@ -834,13 +902,28 @@ public class StoreFile { reader.close(); } - public boolean shouldSeek(final byte[] row, - final SortedSet columns) { + public boolean shouldSeek(Scan scan, final SortedSet columns) { + return (passesTimerangeFilter(scan) && passesBloomFilter(scan,columns)); + } - if (this.bloomFilter == null) { + /** + * Check if this storeFile may contain keys within the TimeRange + * @param scan + * @return False if it definitely does not exist in this StoreFile + */ + private boolean passesTimerangeFilter(Scan scan) { + if (timeRangeTracker == null) { + return true; + } else { + return timeRangeTracker.includesTimeRange(scan.getTimeRange()); + } + } + + private boolean passesBloomFilter(Scan scan, final SortedSet columns) { + if (this.bloomFilter == null || !scan.isGetScan()) { return true; } - + byte[] row = scan.getStartRow(); byte[] key; switch (this.bloomFilterType) { case ROW: diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index 83a4acb90ab..a28ab44dca4 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionserver; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.hfile.HFileScanner; import java.io.IOException; @@ -131,9 +132,8 @@ class StoreFileScanner implements KeyValueScanner { return true; } - // Bloom filter hook. - public boolean shouldSeek(final byte[] row, - final SortedSet columns) { - return reader.shouldSeek(row, columns); + // StoreFile filter hook. + public boolean shouldSeek(Scan scan, final SortedSet columns) { + return reader.shouldSeek(scan, columns); } } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index 91eafd06276..7ee10316f21 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -154,17 +154,17 @@ class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersOb List scanners = new ArrayList(sfScanners.size()+1); - // exclude scan files that have failed file filters + // include only those scan files which pass all filters for (StoreFileScanner sfs : sfScanners) { - if (isGet && - !sfs.shouldSeek(scan.getStartRow(), columns)) { - continue; // exclude this hfs + if (sfs.shouldSeek(scan, columns)) { + scanners.add(sfs); } - scanners.add(sfs); } // Then the memstore scanners - scanners.addAll(this.store.memstore.getScanners()); + if (this.store.memstore.shouldSeek(scan)) { + scanners.addAll(this.store.memstore.getScanners()); + } return scanners; } diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java b/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java new file mode 100644 index 00000000000..8d2c90d6be3 --- /dev/null +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java @@ -0,0 +1,147 @@ +/** + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValue.Type; +import org.apache.hadoop.hbase.io.TimeRange; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.Writable; + +/** + * Stores the minimum and maximum timestamp values. + * Can be used to find if any given time range overlaps with its time range + * MemStores use this class to track its minimum and maximum timestamps. + * When writing StoreFiles, this information is stored in meta blocks and used + * at read time to match against the required TimeRange + */ +public class TimeRangeTracker implements Writable { + + long minimumTimestamp = -1; + long maximumTimestamp = -1; + + /** + * Default constructor. + * Initializes TimeRange to be null + */ + public TimeRangeTracker() { + + } + + /** + * Copy Constructor + * @param trt source TimeRangeTracker + */ + public TimeRangeTracker(final TimeRangeTracker trt) { + this.minimumTimestamp = trt.getMinimumTimestamp(); + this.maximumTimestamp = trt.getMaximumTimestamp(); + } + + public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) { + this.minimumTimestamp = minimumTimestamp; + this.maximumTimestamp = maximumTimestamp; + } + + /** + * Update the current TimestampRange to include the timestamp from KeyValue + * If the Key is of type DeleteColumn or DeleteFamily, it includes the + * entire time range from 0 to timestamp of the key. + * @param kv the KeyValue to include + */ + public void includeTimestamp(final KeyValue kv) { + includeTimestamp(kv.getTimestamp()); + if (kv.isDeleteColumnOrFamily()) { + includeTimestamp(0); + } + } + + /** + * Update the current TimestampRange to include the timestamp from Key. + * If the Key is of type DeleteColumn or DeleteFamily, it includes the + * entire time range from 0 to timestamp of the key. + * @param key + */ + public void includeTimestamp(final byte[] key) { + includeTimestamp(Bytes.toLong(key,key.length-KeyValue.TIMESTAMP_TYPE_SIZE)); + int type = key[key.length - 1]; + if (type == Type.DeleteColumn.getCode() || + type == Type.DeleteFamily.getCode()) { + includeTimestamp(0); + } + } + + /** + * If required, update the current TimestampRange to include timestamp + * @param timestamp the timestamp value to include + */ + private void includeTimestamp(final long timestamp) { + if (maximumTimestamp==-1) { + minimumTimestamp = timestamp; + maximumTimestamp = timestamp; + } + else if (minimumTimestamp > timestamp) { + minimumTimestamp = timestamp; + } + else if (maximumTimestamp < timestamp) { + maximumTimestamp = timestamp; + } + return; + } + + /** + * Check if the range has any overlap with TimeRange + * @param tr TimeRange + * @return True if there is overlap, false otherwise + */ + public boolean includesTimeRange(final TimeRange tr) { + return (this.minimumTimestamp < tr.getMax() && + this.maximumTimestamp >= tr.getMin()); + } + + /** + * @return the minimumTimestamp + */ + public long getMinimumTimestamp() { + return minimumTimestamp; + } + + /** + * @return the maximumTimestamp + */ + public long getMaximumTimestamp() { + return maximumTimestamp; + } + + public void write(final DataOutput out) throws IOException { + out.writeLong(minimumTimestamp); + out.writeLong(maximumTimestamp); + } + + public void readFields(final DataInput in) throws IOException { + this.minimumTimestamp = in.readLong(); + this.maximumTimestamp = in.readLong(); + } + +} + diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java b/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java new file mode 100644 index 00000000000..4a653ab84a0 --- /dev/null +++ b/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java @@ -0,0 +1,304 @@ +/** + * Copyright 2009 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * Run tests related to {@link TimestampsFilter} using HBase client APIs. + * Sets up the HBase mini cluster once at start. Each creates a table + * named for the method and does its stuff against that. + */ +public class TestMultipleTimestamps { + final Log LOG = LogFactory.getLog(getClass()); + private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); + + /** + * @throws java.lang.Exception + */ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + } + + /** + * @throws java.lang.Exception + */ + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + /** + * @throws java.lang.Exception + */ + @Before + public void setUp() throws Exception { + // Nothing to do. + } + + /** + * @throws java.lang.Exception + */ + @After + public void tearDown() throws Exception { + // Nothing to do. + } + + @Test + public void testWithVersionDeletes() throws Exception { + + // first test from memstore (without flushing). + testWithVersionDeletes(false); + + // run same test against HFiles (by forcing a flush). + testWithVersionDeletes(true); + } + + public void testWithVersionDeletes(boolean flushTables) throws IOException { + byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" + + (flushTables ? "flush" : "noflush")); + byte [] FAMILY = Bytes.toBytes("event_log"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + + // create table; set versions to max... + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE); + + // For row:0, col:0: insert versions 1 through 5. + putNVersions(ht, FAMILY, 0, 0, 1, 5); + + if (flushTables) { + flush(); + } + + // delete version 4. + deleteOneVersion(ht, FAMILY, 0, 0, 4); + + // request a bunch of versions including the deleted version. We should + // only get back entries for the versions that exist. + KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L)); + assertEquals(3, kvs.length); + checkOneCell(kvs[0], FAMILY, 0, 0, 5); + checkOneCell(kvs[1], FAMILY, 0, 0, 3); + checkOneCell(kvs[2], FAMILY, 0, 0, 2); + } + + @Test + public void testWithMultipleVersionDeletes() throws IOException { + byte [] TABLE = Bytes.toBytes("testWithMultipleVersionDeletes"); + byte [] FAMILY = Bytes.toBytes("event_log"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + + // create table; set versions to max... + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE); + + // For row:0, col:0: insert versions 1 through 5. + putNVersions(ht, FAMILY, 0, 0, 1, 5); + + flush(); + + // delete all versions before 4. + deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4); + + // request a bunch of versions including the deleted version. We should + // only get back entries for the versions that exist. + KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); + assertEquals(0, kvs.length); + } + + @Test + public void testWithColumnDeletes() throws IOException { + byte [] TABLE = Bytes.toBytes("testWithColumnDeletes"); + byte [] FAMILY = Bytes.toBytes("event_log"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + + // create table; set versions to max... + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE); + + // For row:0, col:0: insert versions 1 through 5. + putNVersions(ht, FAMILY, 0, 0, 1, 5); + + flush(); + + // delete all versions before 4. + deleteColumn(ht, FAMILY, 0, 0); + + // request a bunch of versions including the deleted version. We should + // only get back entries for the versions that exist. + KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); + assertEquals(0, kvs.length); + } + + @Test + public void testWithFamilyDeletes() throws IOException { + byte [] TABLE = Bytes.toBytes("testWithFamilyDeletes"); + byte [] FAMILY = Bytes.toBytes("event_log"); + byte [][] FAMILIES = new byte[][] { FAMILY }; + + // create table; set versions to max... + HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE); + + // For row:0, col:0: insert versions 1 through 5. + putNVersions(ht, FAMILY, 0, 0, 1, 5); + + flush(); + + // delete all versions before 4. + deleteFamily(ht, FAMILY, 0); + + // request a bunch of versions including the deleted version. We should + // only get back entries for the versions that exist. + KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L)); + assertEquals(0, kvs.length); + } + + // Flush tables. Since flushing is asynchronous, sleep for a bit. + private void flush() throws IOException { + TEST_UTIL.flush(); + try { + Thread.sleep(3000); + } catch (InterruptedException i) { + // ignore + } + } + + /** + * Assert that the passed in KeyValue has expected contents for the + * specified row, column & timestamp. + */ + private void checkOneCell(KeyValue kv, byte[] cf, + int rowIdx, int colIdx, long ts) { + + String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts; + + assertEquals("Row mismatch which checking: " + ctx, + "row:"+ rowIdx, Bytes.toString(kv.getRow())); + + assertEquals("ColumnFamily mismatch while checking: " + ctx, + Bytes.toString(cf), Bytes.toString(kv.getFamily())); + + assertEquals("Column qualifier mismatch while checking: " + ctx, + "column:" + colIdx, + Bytes.toString(kv.getQualifier())); + + assertEquals("Timestamp mismatch while checking: " + ctx, + ts, kv.getTimestamp()); + + assertEquals("Value mismatch while checking: " + ctx, + "value-version-" + ts, Bytes.toString(kv.getValue())); + } + + /** + * Uses the TimestampFilter on a Get to request a specified list of + * versions for the row/column specified by rowIdx & colIdx. + * + */ + private KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx, + int colIdx, List versions) + throws IOException { + byte row[] = Bytes.toBytes("row:" + rowIdx); + byte column[] = Bytes.toBytes("column:" + colIdx); + Get get = new Get(row); + get.addColumn(cf, column); + get.setMaxVersions(); + get.setTimeRange(Collections.min(versions), Collections.max(versions)+1); + Result result = ht.get(get); + + return result.raw(); + } + + /** + * Insert in specific row/column versions with timestamps + * versionStart..versionEnd. + */ + private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx, + long versionStart, long versionEnd) + throws IOException { + byte row[] = Bytes.toBytes("row:" + rowIdx); + byte column[] = Bytes.toBytes("column:" + colIdx); + Put put = new Put(row); + + for (long idx = versionStart; idx <= versionEnd; idx++) { + put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx)); + } + + ht.put(put); + } + + /** + * For row/column specified by rowIdx/colIdx, delete the cell + * corresponding to the specified version. + */ + private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx, + int colIdx, long version) + throws IOException { + byte row[] = Bytes.toBytes("row:" + rowIdx); + byte column[] = Bytes.toBytes("column:" + colIdx); + Delete del = new Delete(row); + del.deleteColumn(cf, column, version); + ht.delete(del); + } + + /** + * For row/column specified by rowIdx/colIdx, delete all cells + * preceeding the specified version. + */ + private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx, + int colIdx, long version) + throws IOException { + byte row[] = Bytes.toBytes("row:" + rowIdx); + byte column[] = Bytes.toBytes("column:" + colIdx); + Delete del = new Delete(row); + del.deleteColumns(cf, column, version); + ht.delete(del); + } + + private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException { + byte row[] = Bytes.toBytes("row:" + rowIdx); + byte column[] = Bytes.toBytes("column:" + colIdx); + Delete del = new Delete(row); + del.deleteColumns(cf, column); + ht.delete(del); + } + + private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException { + byte row[] = Bytes.toBytes("row:" + rowIdx); + Delete del = new Delete(row); + del.deleteFamily(cf); + ht.delete(del); + } +} + diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java index 4f27ea51777..ef99c9dedfd 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java @@ -163,7 +163,7 @@ public class TestMemStore extends TestCase { /** * A simple test which verifies the 3 possible states when scanning across snapshot. - * @throws IOException + * @throws IOException */ public void testScanAcrossSnapshot2() throws IOException { // we are going to the scanning across snapshot with two kvs @@ -210,7 +210,7 @@ public class TestMemStore extends TestCase { throws IOException { scanner.seek(KeyValue.createFirstOnRow(new byte[]{})); List returned = Lists.newArrayList(); - + while (true) { KeyValue next = scanner.next(); if (next == null) break; @@ -313,15 +313,15 @@ public class TestMemStore extends TestCase { // COMPLETE INSERT 2 rwcc.completeMemstoreInsert(w); - + // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS. // See HBASE-1485 for discussion about what we should do with // the duplicate-TS inserts ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); s = this.memstore.getScanners().get(0); - assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); + assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12}); } - + /** * When we insert a higher-memstoreTS deletion of a cell but with * the same timestamp, we still need to provide consistent reads @@ -369,9 +369,9 @@ public class TestMemStore extends TestCase { // NOW WE SHOULD SEE DELETE ReadWriteConsistencyControl.resetThreadReadPoint(rwcc); s = this.memstore.getScanners().get(0); - assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); + assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12}); } - + private static class ReadOwnWritesTester extends Thread { static final int NUM_TRIES = 1000; @@ -454,7 +454,7 @@ public class TestMemStore extends TestCase { } } - /** + /** * Test memstore snapshots * @throws IOException */ @@ -785,7 +785,7 @@ public class TestMemStore extends TestCase { expected.add(put2); expected.add(put1); - + assertEquals(4, memstore.kvset.size()); int i = 0; for (KeyValue kv: memstore.kvset) { @@ -825,7 +825,7 @@ public class TestMemStore extends TestCase { expected.add(put3); - + assertEquals(5, memstore.kvset.size()); int i = 0; for (KeyValue kv: memstore.kvset) { @@ -884,9 +884,42 @@ public class TestMemStore extends TestCase { } + //////////////////////////////////// + //Test for timestamps + //////////////////////////////////// + + /** + * Test to ensure correctness when using Memstore with multiple timestamps + */ + public void testMultipleTimestamps() throws IOException { + long[] timestamps = new long[] {20,10,5,1}; + Scan scan = new Scan(); + + for (long timestamp: timestamps) + addRows(memstore,timestamp); + + scan.setTimeRange(0, 2); + assertTrue(memstore.shouldSeek(scan)); + + scan.setTimeRange(20, 82); + assertTrue(memstore.shouldSeek(scan)); + + scan.setTimeRange(10, 20); + assertTrue(memstore.shouldSeek(scan)); + + scan.setTimeRange(8, 12); + assertTrue(memstore.shouldSeek(scan)); + + /*This test is not required for correctness but it should pass when + * timestamp range optimization is on*/ + //scan.setTimeRange(28, 42); + //assertTrue(!memstore.shouldSeek(scan)); + } + + ////////////////////////////////////////////////////////////////////////////// // Helpers - ////////////////////////////////////////////////////////////////////////////// + ////////////////////////////////////////////////////////////////////////////// private static byte [] makeQualifier(final int i1, final int i2){ return Bytes.toBytes(Integer.toString(i1) + ";" + Integer.toString(i2)); @@ -1008,5 +1041,5 @@ public class TestMemStore extends TestCase { } - + } diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index b15ae53af57..81a2c8c35cd 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -49,10 +49,12 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.security.UnixUserGroupInformation; import com.google.common.base.Joiner; @@ -420,4 +422,88 @@ public class TestStore extends TestCase { storeFlusher.flushCache(); storeFlusher.commit(); } + + + + /** + * Generate a list of KeyValues for testing based on given parameters + * @param timestamps + * @param numRows + * @param qualifier + * @param family + * @return + */ + List getKeyValueSet(long[] timestamps, int numRows, + byte[] qualifier, byte[] family) { + List kvList = new ArrayList(); + for (int i=1;i<=numRows;i++) { + byte[] b = Bytes.toBytes(i); + for (long timestamp: timestamps) { + kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); + } + } + return kvList; + } + + /** + * Test to ensure correctness when using Stores with multiple timestamps + * @throws IOException + */ + public void testMultipleTimestamps() throws IOException { + int numRows = 1; + long[] timestamps1 = new long[] {1,5,10,20}; + long[] timestamps2 = new long[] {30,80}; + + init(this.getName()); + + List kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family); + for (KeyValue kv : kvList1) { + this.store.add(kv); + } + + this.store.snapshot(); + flushStore(store, id++); + + List kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family); + for(KeyValue kv : kvList2) { + this.store.add(kv); + } + + NavigableSet columns = new ConcurrentSkipListSet( + Bytes.BYTES_COMPARATOR); + columns.add(qf1); + List result; + Get get = new Get(Bytes.toBytes(1)); + get.addColumn(family,qf1); + + get.setTimeRange(0,15); + result = new ArrayList(); + this.store.get(get, columns, result); + assertTrue(result.size()>0); + + get.setTimeRange(40,90); + result = new ArrayList(); + this.store.get(get, columns, result); + assertTrue(result.size()>0); + + get.setTimeRange(10,45); + result = new ArrayList(); + this.store.get(get, columns, result); + assertTrue(result.size()>0); + + get.setTimeRange(80,145); + result = new ArrayList(); + this.store.get(get, columns, result); + assertTrue(result.size()>0); + + get.setTimeRange(1,2); + result = new ArrayList(); + this.store.get(get, columns, result); + assertTrue(result.size()>0); + + get.setTimeRange(90,200); + result = new ArrayList(); + this.store.get(get, columns, result); + assertTrue(result.size()==0); + } } \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java index 57269ef5f4c..38ef52001ee 100644 --- a/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java +++ b/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java @@ -25,6 +25,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Comparator; +import java.util.List; import java.util.TreeSet; import org.apache.commons.logging.Log; @@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTestCase; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.Reference.Range; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileScanner; @@ -354,7 +356,9 @@ public class TestStoreFile extends HBaseTestCase { TreeSet columns = new TreeSet(); columns.add("family:col".getBytes()); - boolean exists = scanner.shouldSeek(row.getBytes(), columns); + Scan scan = new Scan(row.getBytes(),row.getBytes()); + scan.addColumn("family".getBytes(), "family:col".getBytes()); + boolean exists = scanner.shouldSeek(scan, columns); if (i % 2 == 0) { if (!exists) falseNeg++; } else { @@ -428,7 +432,9 @@ public class TestStoreFile extends HBaseTestCase { TreeSet columns = new TreeSet(); columns.add(("col" + col).getBytes()); - boolean exists = scanner.shouldSeek(row.getBytes(), columns); + Scan scan = new Scan(row.getBytes(),row.getBytes()); + scan.addColumn("family".getBytes(), ("col"+col).getBytes()); + boolean exists = scanner.shouldSeek(scan, columns); boolean shouldRowExist = i % 2 == 0; boolean shouldColExist = j % 2 == 0; shouldColExist = shouldColExist || bt[x] == StoreFile.BloomType.ROW; @@ -496,5 +502,77 @@ public class TestStoreFile extends HBaseTestCase { Mockito.doReturn(name).when(mock).toString(); return mock; } - + + /** + *Generate a list of KeyValues for testing based on given parameters + * @param timestamps + * @param numRows + * @param qualifier + * @param family + * @return + */ + List getKeyValueSet(long[] timestamps, int numRows, + byte[] qualifier, byte[] family) { + List kvList = new ArrayList(); + for (int i=1;i<=numRows;i++) { + byte[] b = Bytes.toBytes(i) ; + LOG.info(Bytes.toString(b)); + LOG.info(Bytes.toString(b)); + for (long timestamp: timestamps) + { + kvList.add(new KeyValue(b, family, qualifier, timestamp, b)); + } + } + return kvList; + } + + /** + * Test to ensure correctness when using StoreFile with multiple timestamps + * @throws IOException + */ + public void testMultipleTimestamps() throws IOException { + byte[] family = Bytes.toBytes("familyname"); + byte[] qualifier = Bytes.toBytes("qualifier"); + int numRows = 10; + long[] timestamps = new long[] {20,10,5,1}; + Scan scan = new Scan(); + + Path storedir = new Path(new Path(this.testDir, "regionname"), + "familyname"); + Path dir = new Path(storedir, "1234567890"); + StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024); + + List kvList = getKeyValueSet(timestamps,numRows, + family, qualifier); + + for (KeyValue kv : kvList) { + writer.append(kv); + } + writer.appendMetadata(0, false); + writer.close(); + + StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf, + StoreFile.BloomType.NONE, false); + StoreFile.Reader reader = hsf.createReader(); + StoreFileScanner scanner = reader.getStoreFileScanner(false, false); + TreeSet columns = new TreeSet(); + columns.add(qualifier); + + scan.setTimeRange(20, 100); + assertTrue(scanner.shouldSeek(scan, columns)); + + scan.setTimeRange(1, 2); + assertTrue(scanner.shouldSeek(scan, columns)); + + scan.setTimeRange(8, 10); + assertTrue(scanner.shouldSeek(scan, columns)); + + scan.setTimeRange(7, 50); + assertTrue(scanner.shouldSeek(scan, columns)); + + /*This test is not required for correctness but it should pass when + * timestamp range optimization is on*/ + //scan.setTimeRange(27, 50); + //assertTrue(!scanner.shouldSeek(scan, columns)); + } }