diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java new file mode 100644 index 00000000000..18d2f8a526a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractMemStore.java @@ -0,0 +1,497 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import com.google.common.annotations.VisibleForTesting; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.SortedSet; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; + +/** + * An abstract class, which implements the behaviour shared by all concrete memstore instances. + */ +@InterfaceAudience.Private +public abstract class AbstractMemStore implements MemStore { + + private static final long NO_SNAPSHOT_ID = -1; + + private final Configuration conf; + private final CellComparator comparator; + + // active segment absorbs write operations + private volatile MutableSegment active; + // Snapshot of memstore. Made for flusher. + private volatile ImmutableSegment snapshot; + protected volatile long snapshotId; + // Used to track when to flush + private volatile long timeOfOldestEdit; + + public final static long FIXED_OVERHEAD = ClassSize.align( + ClassSize.OBJECT + + (4 * ClassSize.REFERENCE) + + (2 * Bytes.SIZEOF_LONG)); + + public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + + 2 * (ClassSize.ATOMIC_LONG + ClassSize.TIMERANGE_TRACKER + + ClassSize.CELL_SKIPLIST_SET + ClassSize.CONCURRENT_SKIPLISTMAP)); + + + protected AbstractMemStore(final Configuration conf, final CellComparator c) { + this.conf = conf; + this.comparator = c; + resetCellSet(); + this.snapshot = SegmentFactory.instance().createImmutableSegment(conf, c, 0); + this.snapshotId = NO_SNAPSHOT_ID; + } + + protected void resetCellSet() { + // Reset heap to not include any keys + this.active = SegmentFactory.instance().createMutableSegment( + conf, comparator, DEEP_OVERHEAD); + this.timeOfOldestEdit = Long.MAX_VALUE; + } + + /* + * Calculate how the MemStore size has changed. Includes overhead of the + * backing Map. + * @param cell + * @param notPresent True if the cell was NOT present in the set. + * @return change in size + */ + static long heapSizeChange(final Cell cell, final boolean notPresent) { + return notPresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + + CellUtil.estimatedHeapSizeOf(cell)) : 0; + } + + /** + * Updates the wal with the lowest sequence id (oldest entry) that is still in memory + * @param onlyIfMoreRecent a flag that marks whether to update the sequence id no matter what or + * only if it is greater than the previous sequence id + */ + public abstract void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent); + + /** + * Write an update + * @param cell the cell to be added + * @return approximate size of the passed cell & newly added cell which maybe different than the + * passed-in cell + */ + @Override + public long add(Cell cell) { + Cell toAdd = maybeCloneWithAllocator(cell); + return internalAdd(toAdd); + } + + /** + * Update or insert the specified Cells. + *

+ * For each Cell, insert into MemStore. This will atomically upsert the + * value for that row/family/qualifier. If a Cell did already exist, + * it will then be removed. + *

+ * Currently the memstoreTS is kept at 0 so as each insert happens, it will + * be immediately visible. May want to change this so it is atomic across + * all Cells. + *

+ * This is called under row lock, so Get operations will still see updates + * atomically. Scans will only see each Cell update as atomic. + * + * @param cells the cells to be updated + * @param readpoint readpoint below which we can safely remove duplicate KVs + * @return change in memstore size + */ + @Override + public long upsert(Iterable cells, long readpoint) { + long size = 0; + for (Cell cell : cells) { + size += upsert(cell, readpoint); + } + return size; + } + + /** + * @return Oldest timestamp of all the Cells in the MemStore + */ + @Override + public long timeOfOldestEdit() { + return timeOfOldestEdit; + } + + + /** + * Write a delete + * @param deleteCell the cell to be deleted + * @return approximate size of the passed key and value. + */ + @Override + public long delete(Cell deleteCell) { + Cell toAdd = maybeCloneWithAllocator(deleteCell); + long s = internalAdd(toAdd); + return s; + } + + /** + * An override on snapshot so the no arg version of the method implies zero seq num, + * like for cases without wal + */ + public MemStoreSnapshot snapshot() { + return snapshot(0); + } + + /** + * The passed snapshot was successfully persisted; it can be let go. + * @param id Id of the snapshot to clean out. + * @see MemStore#snapshot(long) + */ + @Override + public void clearSnapshot(long id) throws UnexpectedStateException { + if (this.snapshotId != id) { + throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " + + id); + } + // OK. Passed in snapshot is same as current snapshot. If not-empty, + // create a new snapshot and let the old one go. + Segment oldSnapshot = this.snapshot; + if (!this.snapshot.isEmpty()) { + this.snapshot = SegmentFactory.instance().createImmutableSegment( + getComparator(), 0); + } + this.snapshotId = NO_SNAPSHOT_ID; + oldSnapshot.close(); + } + + /** + * Get the entire heap usage for this MemStore not including keys in the + * snapshot. + */ + @Override + public long heapSize() { + return getActive().getSize(); + } + + /** + * On flush, how much memory we will clear from the active cell set. + * + * @return size of data that is going to be flushed from active set + */ + @Override + public long getFlushableSize() { + long snapshotSize = getSnapshot().getSize(); + return snapshotSize > 0 ? snapshotSize : keySize(); + } + + + /** + * @return a list containing a single memstore scanner. + */ + @Override + public List getScanners(long readPt) throws IOException { + return Collections. singletonList(new MemStoreScanner(this, readPt)); + } + + @Override + public long getSnapshotSize() { + return getSnapshot().getSize(); + } + + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + int i = 1; + try { + for (Segment segment : getListOfSegments()) { + buf.append("Segment (" + i + ") " + segment.toString() + "; "); + i++; + } + } catch (IOException e){ + return e.toString(); + } + return buf.toString(); + } + + protected void rollbackInSnapshot(Cell cell) { + // If the key is in the snapshot, delete it. We should not update + // this.size, because that tracks the size of only the memstore and + // not the snapshot. The flush of this snapshot to disk has not + // yet started because Store.flush() waits for all rwcc transactions to + // commit before starting the flush to disk. + snapshot.rollback(cell); + } + + protected void rollbackInActive(Cell cell) { + // If the key is in the memstore, delete it. Update this.size. + long sz = active.rollback(cell); + if (sz != 0) { + setOldestEditTimeToNow(); + } + } + + protected Configuration getConfiguration() { + return conf; + } + + protected void dump(Log log) { + active.dump(log); + snapshot.dump(log); + } + + + /** + * Inserts the specified Cell into MemStore and deletes any existing + * versions of the same row/family/qualifier as the specified Cell. + *

+ * First, the specified Cell is inserted into the Memstore. + *

+ * If there are any existing Cell in this MemStore with the same row, + * family, and qualifier, they are removed. + *

+ * Callers must hold the read lock. + * + * @param cell the cell to be updated + * @param readpoint readpoint below which we can safely remove duplicate KVs + * @return change in size of MemStore + */ + private long upsert(Cell cell, long readpoint) { + // Add the Cell to the MemStore + // Use the internalAdd method here since we (a) already have a lock + // and (b) cannot safely use the MSLAB here without potentially + // hitting OOME - see TestMemStore.testUpsertMSLAB for a + // test that triggers the pathological case if we don't avoid MSLAB + // here. + long addedSize = internalAdd(cell); + + // Get the Cells for the row/family/qualifier regardless of timestamp. + // For this case we want to clean up any other puts + Cell firstCell = KeyValueUtil.createFirstOnRow( + cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), + cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), + cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + SortedSet ss = active.tailSet(firstCell); + Iterator it = ss.iterator(); + // versions visible to oldest scanner + int versionsVisible = 0; + while (it.hasNext()) { + Cell cur = it.next(); + + if (cell == cur) { + // ignore the one just put in + continue; + } + // check that this is the row and column we are interested in, otherwise bail + if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { + // only remove Puts that concurrent scanners cannot possibly see + if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && + cur.getSequenceId() <= readpoint) { + if (versionsVisible >= 1) { + // if we get here we have seen at least one version visible to the oldest scanner, + // which means we can prove that no scanner will see this version + + // false means there was a change, so give us the size. + long delta = heapSizeChange(cur, true); + addedSize -= delta; + active.incSize(-delta); + it.remove(); + setOldestEditTimeToNow(); + } else { + versionsVisible++; + } + } + } else { + // past the row or column, done + break; + } + } + return addedSize; + } + + /* + * @param a + * @param b + * @return Return lowest of a or b or null if both a and b are null + */ + protected Cell getLowest(final Cell a, final Cell b) { + if (a == null) { + return b; + } + if (b == null) { + return a; + } + return comparator.compareRows(a, b) <= 0? a: b; + } + + /* + * @param key Find row that follows this one. If null, return first. + * @param set Set to look in for a row beyond row. + * @return Next row or null if none found. If one found, will be a new + * KeyValue -- can be destroyed by subsequent calls to this method. + */ + protected Cell getNextRow(final Cell key, + final NavigableSet set) { + Cell result = null; + SortedSet tail = key == null? set: set.tailSet(key); + // Iterate until we fall into the next row; i.e. move off current row + for (Cell cell: tail) { + if (comparator.compareRows(cell, key) <= 0) { + continue; + } + // Note: Not suppressing deletes or expired cells. Needs to be handled + // by higher up functions. + result = cell; + break; + } + return result; + } + + /** + * Given the specs of a column, update it, first by inserting a new record, + * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS + * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying + * store will ensure that the insert/delete each are atomic. A scanner/reader will either + * get the new value, or the old value and all readers will eventually only see the new + * value after the old was removed. + */ + @VisibleForTesting + @Override + public long updateColumnValue(byte[] row, byte[] family, byte[] qualifier, + long newValue, long now) { + Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier); + // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. + Cell snc = snapshot.getFirstAfter(firstCell); + if(snc != null) { + // is there a matching Cell in the snapshot? + if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { + if (snc.getTimestamp() == now) { + now += 1; + } + } + } + // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary. + // But the timestamp should also be max(now, mostRecentTsInMemstore) + + // so we cant add the new Cell w/o knowing what's there already, but we also + // want to take this chance to delete some cells. So two loops (sad) + + SortedSet ss = getActive().tailSet(firstCell); + for (Cell cell : ss) { + // if this isnt the row we are interested in, then bail: + if (!CellUtil.matchingColumn(cell, family, qualifier) + || !CellUtil.matchingRow(cell, firstCell)) { + break; // rows dont match, bail. + } + + // if the qualifier matches and it's a put, just RM it out of the active. + if (cell.getTypeByte() == KeyValue.Type.Put.getCode() && + cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) { + now = cell.getTimestamp(); + } + } + + // create or update (upsert) a new Cell with + // 'now' and a 0 memstoreTS == immediately visible + List cells = new ArrayList(1); + cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); + return upsert(cells, 1L); + } + + private Cell maybeCloneWithAllocator(Cell cell) { + return active.maybeCloneWithAllocator(cell); + } + + /** + * Internal version of add() that doesn't clone Cells with the + * allocator, and doesn't take the lock. + * + * Callers should ensure they already have the read lock taken + */ + private long internalAdd(final Cell toAdd) { + long s = active.add(toAdd); + setOldestEditTimeToNow(); + checkActiveSize(); + return s; + } + + private void setOldestEditTimeToNow() { + if (timeOfOldestEdit == Long.MAX_VALUE) { + timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); + } + } + + protected long keySize() { + return heapSize() - DEEP_OVERHEAD; + } + + protected CellComparator getComparator() { + return comparator; + } + + protected MutableSegment getActive() { + return active; + } + + protected ImmutableSegment getSnapshot() { + return snapshot; + } + + protected AbstractMemStore setSnapshot(ImmutableSegment snapshot) { + this.snapshot = snapshot; + return this; + } + + protected void setSnapshotSize(long snapshotSize) { + getSnapshot().setSize(snapshotSize); + } + + /** + * Check whether anything need to be done based on the current active set size + */ + protected abstract void checkActiveSize(); + + /** + * Returns a list of Store segment scanners, one per each store segment + * @param readPt the version number required to initialize the scanners + * @return a list of Store segment scanners, one per each store segment + */ + protected abstract List getListOfScanners(long readPt) throws IOException; + + /** + * Returns an ordered list of segments from most recent to oldest in memstore + * @return an ordered list of segments from most recent to oldest in memstore + */ + protected abstract List getListOfSegments() throws IOException; + + public long getActiveSize() { + return getActive().getSize(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java similarity index 79% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java index e9941b35a50..443330283a1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSkipListSet.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CellSet.java @@ -31,28 +31,26 @@ import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.classification.InterfaceAudience; /** - * A {@link java.util.Set} of {@link Cell}s implemented on top of a - * {@link java.util.concurrent.ConcurrentSkipListMap}. Works like a - * {@link java.util.concurrent.ConcurrentSkipListSet} in all but one regard: - * An add will overwrite if already an entry for the added key. In other words, - * where CSLS does "Adds the specified element to this set if it is not already - * present.", this implementation "Adds the specified element to this set EVEN - * if it is already present overwriting what was there previous". The call to - * add returns true if no value in the backing map or false if there was an - * entry with same key (though value may be different). - *

Otherwise, - * has same attributes as ConcurrentSkipListSet: e.g. tolerant of concurrent - * get and set and won't throw ConcurrentModificationException when iterating. + * A {@link java.util.Set} of {@link Cell}s, where an add will overwrite the entry if already + * exists in the set. The call to add returns true if no value in the backing map or false if + * there was an entry with same key (though value may be different). + * implementation is tolerant of concurrent get and set and won't throw + * ConcurrentModificationException when iterating. */ @InterfaceAudience.Private -public class CellSkipListSet implements NavigableSet { +public class CellSet implements NavigableSet { + // Implemented on top of a {@link java.util.concurrent.ConcurrentSkipListMap} + // Differ from CSLS in one respect, where CSLS does "Adds the specified element to this set if it + // is not already present.", this implementation "Adds the specified element to this set EVEN + // if it is already present overwriting what was there previous". + // Otherwise, has same attributes as ConcurrentSkipListSet private final ConcurrentNavigableMap delegatee; - CellSkipListSet(final CellComparator c) { + CellSet(final CellComparator c) { this.delegatee = new ConcurrentSkipListMap(c); } - CellSkipListSet(final ConcurrentNavigableMap m) { + CellSet(final ConcurrentNavigableMap m) { this.delegatee = m; } @@ -78,7 +76,7 @@ public class CellSkipListSet implements NavigableSet { public NavigableSet headSet(final Cell toElement, boolean inclusive) { - return new CellSkipListSet(this.delegatee.headMap(toElement, inclusive)); + return new CellSet(this.delegatee.headMap(toElement, inclusive)); } public Cell higher(Cell e) { @@ -115,7 +113,7 @@ public class CellSkipListSet implements NavigableSet { } public NavigableSet tailSet(Cell fromElement, boolean inclusive) { - return new CellSkipListSet(this.delegatee.tailMap(fromElement, inclusive)); + return new CellSet(this.delegatee.tailMap(fromElement, inclusive)); } public Comparator comparator() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java index f61d871be85..82d40b6a0f3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultMemStore.java @@ -19,35 +19,22 @@ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; import java.util.ArrayList; -import java.util.Collections; -import java.util.Iterator; import java.util.List; -import java.util.NavigableSet; -import java.util.SortedSet; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.io.TimeRange; -import org.apache.hadoop.hbase.util.ByteRange; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ClassSize; -import org.apache.hadoop.hbase.util.CollectionBackedScanner; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.ReflectionUtils; -import org.apache.htrace.Trace; /** * The MemStore holds in-memory modifications to the Store. Modifications @@ -66,40 +53,8 @@ import org.apache.htrace.Trace; * in KV size. */ @InterfaceAudience.Private -public class DefaultMemStore implements MemStore { +public class DefaultMemStore extends AbstractMemStore { private static final Log LOG = LogFactory.getLog(DefaultMemStore.class); - static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; - private static final boolean USEMSLAB_DEFAULT = true; - static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; - - private Configuration conf; - - // MemStore. Use a CellSkipListSet rather than SkipListSet because of the - // better semantics. The Map will overwrite if passed a key it already had - // whereas the Set will not add new Cell if key is same though value might be - // different. Value is not important -- just make sure always same - // reference passed. - volatile CellSkipListSet cellSet; - - // Snapshot of memstore. Made for flusher. - volatile CellSkipListSet snapshot; - - final CellComparator comparator; - - // Used to track own heapSize - final AtomicLong size; - private volatile long snapshotSize; - - // Used to track when to flush - volatile long timeOfOldestEdit = Long.MAX_VALUE; - - TimeRangeTracker timeRangeTracker; - TimeRangeTracker snapshotTimeRangeTracker; - - volatile MemStoreLAB allocator; - volatile MemStoreLAB snapshotAllocator; - volatile long snapshotId; - volatile boolean tagsPresent; /** * Default constructor. Used for tests. @@ -112,183 +67,54 @@ public class DefaultMemStore implements MemStore { * Constructor. * @param c Comparator */ - public DefaultMemStore(final Configuration conf, - final CellComparator c) { - this.conf = conf; - this.comparator = c; - this.cellSet = new CellSkipListSet(c); - this.snapshot = new CellSkipListSet(c); - timeRangeTracker = new TimeRangeTracker(); - snapshotTimeRangeTracker = new TimeRangeTracker(); - this.size = new AtomicLong(DEEP_OVERHEAD); - this.snapshotSize = 0; - if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { - String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); - this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, - new Class[] { Configuration.class }, new Object[] { conf }); - } else { - this.allocator = null; - } + public DefaultMemStore(final Configuration conf, final CellComparator c) { + super(conf, c); } void dump() { - for (Cell cell: this.cellSet) { - LOG.info(cell); - } - for (Cell cell: this.snapshot) { - LOG.info(cell); - } + super.dump(LOG); } /** * Creates a snapshot of the current memstore. * Snapshot must be cleared by call to {@link #clearSnapshot(long)} + * @param flushOpSeqId the sequence id that is attached to the flush operation in the wal */ @Override - public MemStoreSnapshot snapshot() { + public MemStoreSnapshot snapshot(long flushOpSeqId) { // If snapshot currently has entries, then flusher failed or didn't call // cleanup. Log a warning. - if (!this.snapshot.isEmpty()) { + if (!getSnapshot().isEmpty()) { LOG.warn("Snapshot called again without clearing previous. " + "Doing nothing. Another ongoing flush or did we fail last attempt?"); } else { this.snapshotId = EnvironmentEdgeManager.currentTime(); - this.snapshotSize = keySize(); - if (!this.cellSet.isEmpty()) { - this.snapshot = this.cellSet; - this.cellSet = new CellSkipListSet(this.comparator); - this.snapshotTimeRangeTracker = this.timeRangeTracker; - this.timeRangeTracker = new TimeRangeTracker(); - // Reset heap to not include any keys - this.size.set(DEEP_OVERHEAD); - this.snapshotAllocator = this.allocator; - // Reset allocator so we get a fresh buffer for the new memstore - if (allocator != null) { - String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); - this.allocator = ReflectionUtils.instantiateWithCustomCtor(className, - new Class[] { Configuration.class }, new Object[] { conf }); - } - timeOfOldestEdit = Long.MAX_VALUE; + if (!getActive().isEmpty()) { + ImmutableSegment immutableSegment = SegmentFactory.instance(). + createImmutableSegment(getConfiguration(), getActive()); + setSnapshot(immutableSegment); + setSnapshotSize(keySize()); + resetCellSet(); } } - MemStoreSnapshot memStoreSnapshot = new MemStoreSnapshot(this.snapshotId, snapshot.size(), this.snapshotSize, - this.snapshotTimeRangeTracker, new CollectionBackedScanner(snapshot, this.comparator), - this.tagsPresent); - this.tagsPresent = false; - return memStoreSnapshot; - } + return new MemStoreSnapshot(this.snapshotId, getSnapshot()); - /** - * The passed snapshot was successfully persisted; it can be let go. - * @param id Id of the snapshot to clean out. - * @throws UnexpectedStateException - * @see #snapshot() - */ - @Override - public void clearSnapshot(long id) throws UnexpectedStateException { - MemStoreLAB tmpAllocator = null; - if (this.snapshotId != id) { - throw new UnexpectedStateException("Current snapshot id is " + this.snapshotId + ",passed " - + id); - } - // OK. Passed in snapshot is same as current snapshot. If not-empty, - // create a new snapshot and let the old one go. - if (!this.snapshot.isEmpty()) { - this.snapshot = new CellSkipListSet(this.comparator); - this.snapshotTimeRangeTracker = new TimeRangeTracker(); - } - this.snapshotSize = 0; - this.snapshotId = -1; - if (this.snapshotAllocator != null) { - tmpAllocator = this.snapshotAllocator; - this.snapshotAllocator = null; - } - if (tmpAllocator != null) { - tmpAllocator.close(); - } } @Override - public long getFlushableSize() { - return this.snapshotSize > 0 ? this.snapshotSize : keySize(); + protected List getListOfScanners(long readPt) throws IOException { + List list = new ArrayList(2); + list.add(0, getActive().getSegmentScanner(readPt)); + list.add(1, getSnapshot().getSegmentScanner(readPt)); + return list; } @Override - public long getSnapshotSize() { - return this.snapshotSize; - } - - /** - * Write an update - * @param cell - * @return approximate size of the passed Cell. - */ - @Override - public long add(Cell cell) { - Cell toAdd = maybeCloneWithAllocator(cell); - return internalAdd(toAdd); - } - - @Override - public long timeOfOldestEdit() { - return timeOfOldestEdit; - } - - private boolean addToCellSet(Cell e) { - boolean b = this.cellSet.add(e); - // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. - // When we use ACL CP or Visibility CP which deals with Tags during - // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not - // parse the byte[] to identify the tags length. - if(e.getTagsLength() > 0) { - tagsPresent = true; - } - setOldestEditTimeToNow(); - return b; - } - - private boolean removeFromCellSet(Cell e) { - boolean b = this.cellSet.remove(e); - setOldestEditTimeToNow(); - return b; - } - - void setOldestEditTimeToNow() { - if (timeOfOldestEdit == Long.MAX_VALUE) { - timeOfOldestEdit = EnvironmentEdgeManager.currentTime(); - } - } - - /** - * Internal version of add() that doesn't clone Cells with the - * allocator, and doesn't take the lock. - * - * Callers should ensure they already have the read lock taken - */ - private long internalAdd(final Cell toAdd) { - long s = heapSizeChange(toAdd, addToCellSet(toAdd)); - timeRangeTracker.includeTimestamp(toAdd); - this.size.addAndGet(s); - return s; - } - - private Cell maybeCloneWithAllocator(Cell cell) { - if (allocator == null) { - return cell; - } - - int len = KeyValueUtil.length(cell); - ByteRange alloc = allocator.allocateBytes(len); - if (alloc == null) { - // The allocation was too large, allocator decided - // not to do anything with it. - return cell; - } - assert alloc.getBytes() != null; - KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset()); - KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); - newKv.setSequenceId(cell.getSequenceId()); - return newKv; + protected List getListOfSegments() throws IOException { + List list = new ArrayList(2); + list.add(0, getActive()); + list.add(1, getSnapshot()); + return list; } /** @@ -301,39 +127,8 @@ public class DefaultMemStore implements MemStore { */ @Override public void rollback(Cell cell) { - // If the key is in the snapshot, delete it. We should not update - // this.size, because that tracks the size of only the memstore and - // not the snapshot. The flush of this snapshot to disk has not - // yet started because Store.flush() waits for all rwcc transactions to - // commit before starting the flush to disk. - Cell found = this.snapshot.get(cell); - if (found != null && found.getSequenceId() == cell.getSequenceId()) { - this.snapshot.remove(cell); - long sz = heapSizeChange(cell, true); - this.snapshotSize -= sz; - } - // If the key is in the memstore, delete it. Update this.size. - found = this.cellSet.get(cell); - if (found != null && found.getSequenceId() == cell.getSequenceId()) { - removeFromCellSet(cell); - long s = heapSizeChange(cell, true); - this.size.addAndGet(-s); - } - } - - /** - * Write a delete - * @param deleteCell - * @return approximate size of the passed key and value. - */ - @Override - public long delete(Cell deleteCell) { - long s = 0; - Cell toAdd = maybeCloneWithAllocator(deleteCell); - s += heapSizeChange(toAdd, addToCellSet(toAdd)); - timeRangeTracker.includeTimestamp(toAdd); - this.size.addAndGet(s); - return s; + rollbackInSnapshot(cell); + rollbackInActive(cell); } /** @@ -342,606 +137,31 @@ public class DefaultMemStore implements MemStore { * @return Next row or null if none found. */ Cell getNextRow(final Cell cell) { - return getLowest(getNextRow(cell, this.cellSet), getNextRow(cell, this.snapshot)); + return getLowest( + getNextRow(cell, getActive().getCellSet()), + getNextRow(cell, getSnapshot().getCellSet())); } - /* - * @param a - * @param b - * @return Return lowest of a or b or null if both a and b are null - */ - private Cell getLowest(final Cell a, final Cell b) { - if (a == null) { - return b; - } - if (b == null) { - return a; - } - return comparator.compareRows(a, b) <= 0? a: b; - } - - /* - * @param key Find row that follows this one. If null, return first. - * @param map Set to look in for a row beyond row. - * @return Next row or null if none found. If one found, will be a new - * KeyValue -- can be destroyed by subsequent calls to this method. - */ - private Cell getNextRow(final Cell key, - final NavigableSet set) { - Cell result = null; - SortedSet tail = key == null? set: set.tailSet(key); - // Iterate until we fall into the next row; i.e. move off current row - for (Cell cell: tail) { - if (comparator.compareRows(cell, key) <= 0) - continue; - // Note: Not suppressing deletes or expired cells. Needs to be handled - // by higher up functions. - result = cell; - break; - } - return result; + @Override public void updateLowestUnflushedSequenceIdInWal(boolean onlyIfMoreRecent) { } /** - * Only used by tests. TODO: Remove - * - * Given the specs of a column, update it, first by inserting a new record, - * then removing the old one. Since there is only 1 KeyValue involved, the memstoreTS - * will be set to 0, thus ensuring that they instantly appear to anyone. The underlying - * store will ensure that the insert/delete each are atomic. A scanner/reader will either - * get the new value, or the old value and all readers will eventually only see the new - * value after the old was removed. - * - * @param row - * @param family - * @param qualifier - * @param newValue - * @param now - * @return Timestamp + * @return Total memory occupied by this MemStore. */ - @Override - public long updateColumnValue(byte[] row, - byte[] family, - byte[] qualifier, - long newValue, - long now) { - Cell firstCell = KeyValueUtil.createFirstOnRow(row, family, qualifier); - // Is there a Cell in 'snapshot' with the same TS? If so, upgrade the timestamp a bit. - SortedSet snSs = snapshot.tailSet(firstCell); - if (!snSs.isEmpty()) { - Cell snc = snSs.first(); - // is there a matching Cell in the snapshot? - if (CellUtil.matchingRow(snc, firstCell) && CellUtil.matchingQualifier(snc, firstCell)) { - if (snc.getTimestamp() == now) { - // poop, - now += 1; - } - } - } - - // logic here: the new ts MUST be at least 'now'. But it could be larger if necessary. - // But the timestamp should also be max(now, mostRecentTsInMemstore) - - // so we cant add the new Cell w/o knowing what's there already, but we also - // want to take this chance to delete some cells. So two loops (sad) - - SortedSet ss = cellSet.tailSet(firstCell); - for (Cell cell : ss) { - // if this isnt the row we are interested in, then bail: - if (!CellUtil.matchingColumn(cell, family, qualifier) - || !CellUtil.matchingRow(cell, firstCell)) { - break; // rows dont match, bail. - } - - // if the qualifier matches and it's a put, just RM it out of the cellSet. - if (cell.getTypeByte() == KeyValue.Type.Put.getCode() && - cell.getTimestamp() > now && CellUtil.matchingQualifier(firstCell, cell)) { - now = cell.getTimestamp(); - } - } - - // create or update (upsert) a new Cell with - // 'now' and a 0 memstoreTS == immediately visible - List cells = new ArrayList(1); - cells.add(new KeyValue(row, family, qualifier, now, Bytes.toBytes(newValue))); - return upsert(cells, 1L); - } - - /** - * Update or insert the specified KeyValues. - *

- * For each KeyValue, insert into MemStore. This will atomically upsert the - * value for that row/family/qualifier. If a KeyValue did already exist, - * it will then be removed. - *

- * This is called under row lock, so Get operations will still see updates - * atomically. Scans will only see each KeyValue update as atomic. - * - * @param readpoint readpoint below which we can safely remove duplicate KVs - * @return change in memstore size - */ - @Override - public long upsert(Iterable cells, long readpoint) { - long size = 0; - for (Cell cell : cells) { - size += upsert(cell, readpoint); - } - return size; - } - - /** - * Inserts the specified KeyValue into MemStore and deletes any existing - * versions of the same row/family/qualifier as the specified KeyValue. - *

- * First, the specified KeyValue is inserted into the Memstore. - *

- * If there are any existing KeyValues in this MemStore with the same row, - * family, and qualifier, they are removed. - *

- * Callers must hold the read lock. - * @param readpoint Smallest outstanding readpoint; below which we can remove duplicate Cells. - * @return change in size of MemStore - */ - private long upsert(Cell cell, long readpoint) { - // Add the Cell to the MemStore - // Use the internalAdd method here since we (a) already have a lock - // and (b) cannot safely use the MSLAB here without potentially - // hitting OOME - see TestMemStore.testUpsertMSLAB for a - // test that triggers the pathological case if we don't avoid MSLAB - // here. - long addedSize = internalAdd(cell); - - // Get the Cells for the row/family/qualifier regardless of timestamp. - // For this case we want to clean up any other puts - Cell firstCell = KeyValueUtil.createFirstOnRow( - cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), - cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(), - cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); - SortedSet ss = cellSet.tailSet(firstCell); - Iterator it = ss.iterator(); - // Versions visible to oldest scanner. - int versionsVisible = 0; - while ( it.hasNext() ) { - Cell cur = it.next(); - - if (cell == cur) { - // ignore the one just put in - continue; - } - // check that this is the row and column we are interested in, otherwise bail - if (CellUtil.matchingRow(cell, cur) && CellUtil.matchingQualifier(cell, cur)) { - // only remove Puts that concurrent scanners cannot possibly see - if (cur.getTypeByte() == KeyValue.Type.Put.getCode() && - cur.getSequenceId() <= readpoint) { - if (versionsVisible >= 1) { - // if we get here we have seen at least one version visible to the oldest scanner, - // which means we can prove that no scanner will see this version - - // false means there was a change, so give us the size. - long delta = heapSizeChange(cur, true); - addedSize -= delta; - this.size.addAndGet(-delta); - it.remove(); - setOldestEditTimeToNow(); - } else { - versionsVisible++; - } - } - } else { - // past the row or column, done - break; - } - } - return addedSize; - } - - /** - * @return scanner on memstore and snapshot in this order. - */ - @Override - public List getScanners(long readPt) { - return Collections. singletonList(new MemStoreScanner(readPt)); - } - - /** - * Check if this memstore may contain the required keys - * @param scan scan - * @param store holds reference to cf - * @param oldestUnexpiredTS - * @return False if the key definitely does not exist in this Memstore - */ - public boolean shouldSeek(Scan scan, Store store, long oldestUnexpiredTS) { - byte[] cf = store.getFamily().getName(); - TimeRange timeRange = scan.getColumnFamilyTimeRange().get(cf); - if (timeRange == null) { - timeRange = scan.getTimeRange(); - } - return (timeRangeTracker.includesTimeRange(timeRange) || - snapshotTimeRangeTracker.includesTimeRange(timeRange)) - && (Math.max(timeRangeTracker.getMaximumTimestamp(), - snapshotTimeRangeTracker.getMaximumTimestamp()) >= - oldestUnexpiredTS); - } - - /* - * MemStoreScanner implements the KeyValueScanner. - * It lets the caller scan the contents of a memstore -- both current - * map and snapshot. - * This behaves as if it were a real scanner but does not maintain position. - */ - protected class MemStoreScanner extends NonLazyKeyValueScanner { - // Next row information for either cellSet or snapshot - private Cell cellSetNextRow = null; - private Cell snapshotNextRow = null; - - // last iterated Cells for cellSet and snapshot (to restore iterator state after reseek) - private Cell cellSetItRow = null; - private Cell snapshotItRow = null; - - // iterator based scanning. - private Iterator cellSetIt; - private Iterator snapshotIt; - - // The cellSet and snapshot at the time of creating this scanner - private CellSkipListSet cellSetAtCreation; - private CellSkipListSet snapshotAtCreation; - - // the pre-calculated Cell to be returned by peek() or next() - private Cell theNext; - - // The allocator and snapshot allocator at the time of creating this scanner - volatile MemStoreLAB allocatorAtCreation; - volatile MemStoreLAB snapshotAllocatorAtCreation; - - // A flag represents whether could stop skipping Cells for MVCC - // if have encountered the next row. Only used for reversed scan - private boolean stopSkippingCellsIfNextRow = false; - - private long readPoint; - - /* - Some notes... - - So memstorescanner is fixed at creation time. this includes pointers/iterators into - existing kvset/snapshot. during a snapshot creation, the kvset is null, and the - snapshot is moved. since kvset is null there is no point on reseeking on both, - we can save us the trouble. During the snapshot->hfile transition, the memstore - scanner is re-created by StoreScanner#updateReaders(). StoreScanner should - potentially do something smarter by adjusting the existing memstore scanner. - - But there is a greater problem here, that being once a scanner has progressed - during a snapshot scenario, we currently iterate past the kvset then 'finish' up. - if a scan lasts a little while, there is a chance for new entries in kvset to - become available but we will never see them. This needs to be handled at the - StoreScanner level with coordination with MemStoreScanner. - - Currently, this problem is only partly managed: during the small amount of time - when the StoreScanner has not yet created a new MemStoreScanner, we will miss - the adds to kvset in the MemStoreScanner. - */ - - MemStoreScanner(long readPoint) { - super(); - - this.readPoint = readPoint; - cellSetAtCreation = cellSet; - snapshotAtCreation = snapshot; - if (allocator != null) { - this.allocatorAtCreation = allocator; - this.allocatorAtCreation.incScannerCount(); - } - if (snapshotAllocator != null) { - this.snapshotAllocatorAtCreation = snapshotAllocator; - this.snapshotAllocatorAtCreation.incScannerCount(); - } - if (Trace.isTracing() && Trace.currentSpan() != null) { - Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); - } - } - - /** - * Lock on 'this' must be held by caller. - * @param it - * @return Next Cell - */ - private Cell getNext(Iterator it) { - Cell startCell = theNext; - Cell v = null; - try { - while (it.hasNext()) { - v = it.next(); - if (v.getSequenceId() <= this.readPoint) { - return v; - } - if (stopSkippingCellsIfNextRow && startCell != null - && comparator.compareRows(v, startCell) > 0) { - return null; - } - } - - return null; - } finally { - if (v != null) { - // in all cases, remember the last Cell iterated to - if (it == snapshotIt) { - snapshotItRow = v; - } else { - cellSetItRow = v; - } - } - } - } - - /** - * Set the scanner at the seek key. - * Must be called only once: there is no thread safety between the scanner - * and the memStore. - * @param key seek value - * @return false if the key is null or if there is no data - */ - @Override - public synchronized boolean seek(Cell key) { - if (key == null) { - close(); - return false; - } - // kvset and snapshot will never be null. - // if tailSet can't find anything, SortedSet is empty (not null). - cellSetIt = cellSetAtCreation.tailSet(key).iterator(); - snapshotIt = snapshotAtCreation.tailSet(key).iterator(); - cellSetItRow = null; - snapshotItRow = null; - - return seekInSubLists(key); - } - - - /** - * (Re)initialize the iterators after a seek or a reseek. - */ - private synchronized boolean seekInSubLists(Cell key){ - cellSetNextRow = getNext(cellSetIt); - snapshotNextRow = getNext(snapshotIt); - - // Calculate the next value - theNext = getLowest(cellSetNextRow, snapshotNextRow); - - // has data - return (theNext != null); - } - - - /** - * Move forward on the sub-lists set previously by seek. - * @param key seek value (should be non-null) - * @return true if there is at least one KV to read, false otherwise - */ - @Override - public synchronized boolean reseek(Cell key) { - /* - See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. - This code is executed concurrently with flush and puts, without locks. - Two points must be known when working on this code: - 1) It's not possible to use the 'kvTail' and 'snapshot' - variables, as they are modified during a flush. - 2) The ideal implementation for performance would use the sub skip list - implicitly pointed by the iterators 'kvsetIt' and - 'snapshotIt'. Unfortunately the Java API does not offer a method to - get it. So we remember the last keys we iterated to and restore - the reseeked set to at least that point. - */ - cellSetIt = cellSetAtCreation.tailSet(getHighest(key, cellSetItRow)).iterator(); - snapshotIt = snapshotAtCreation.tailSet(getHighest(key, snapshotItRow)).iterator(); - - return seekInSubLists(key); - } - - - @Override - public synchronized Cell peek() { - //DebugPrint.println(" MS@" + hashCode() + " peek = " + getLowest()); - return theNext; - } - - @Override - public synchronized Cell next() { - if (theNext == null) { - return null; - } - - final Cell ret = theNext; - - // Advance one of the iterators - if (theNext == cellSetNextRow) { - cellSetNextRow = getNext(cellSetIt); - } else { - snapshotNextRow = getNext(snapshotIt); - } - - // Calculate the next value - theNext = getLowest(cellSetNextRow, snapshotNextRow); - - //long readpoint = ReadWriteConsistencyControl.getThreadReadPoint(); - //DebugPrint.println(" MS@" + hashCode() + " next: " + theNext + " next_next: " + - // getLowest() + " threadpoint=" + readpoint); - return ret; - } - - /* - * Returns the lower of the two key values, or null if they are both null. - * This uses comparator.compare() to compare the KeyValue using the memstore - * comparator. - */ - private Cell getLowest(Cell first, Cell second) { - if (first == null && second == null) { - return null; - } - if (first != null && second != null) { - int compare = comparator.compare(first, second); - return (compare <= 0 ? first : second); - } - return (first != null ? first : second); - } - - /* - * Returns the higher of the two cells, or null if they are both null. - * This uses comparator.compare() to compare the Cell using the memstore - * comparator. - */ - private Cell getHighest(Cell first, Cell second) { - if (first == null && second == null) { - return null; - } - if (first != null && second != null) { - int compare = comparator.compare(first, second); - return (compare > 0 ? first : second); - } - return (first != null ? first : second); - } - - public synchronized void close() { - this.cellSetNextRow = null; - this.snapshotNextRow = null; - - this.cellSetIt = null; - this.snapshotIt = null; - - if (allocatorAtCreation != null) { - this.allocatorAtCreation.decScannerCount(); - this.allocatorAtCreation = null; - } - if (snapshotAllocatorAtCreation != null) { - this.snapshotAllocatorAtCreation.decScannerCount(); - this.snapshotAllocatorAtCreation = null; - } - - this.cellSetItRow = null; - this.snapshotItRow = null; - } - - /** - * MemStoreScanner returns max value as sequence id because it will - * always have the latest data among all files. - */ - @Override - public long getSequenceID() { - return Long.MAX_VALUE; - } - - @Override - public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { - return shouldSeek(scan, store, oldestUnexpiredTS); - } - - /** - * Seek scanner to the given key first. If it returns false(means - * peek()==null) or scanner's peek row is bigger than row of given key, seek - * the scanner to the previous row of given key - */ - @Override - public synchronized boolean backwardSeek(Cell key) { - seek(key); - if (peek() == null || comparator.compareRows(peek(), key) > 0) { - return seekToPreviousRow(key); - } - return true; - } - - /** - * Separately get the KeyValue before the specified key from kvset and - * snapshotset, and use the row of higher one as the previous row of - * specified key, then seek to the first KeyValue of previous row - */ - @Override - public synchronized boolean seekToPreviousRow(Cell originalKey) { - boolean keepSeeking = false; - Cell key = originalKey; - do { - Cell firstKeyOnRow = CellUtil.createFirstOnRow(key); - SortedSet cellHead = cellSetAtCreation.headSet(firstKeyOnRow); - Cell cellSetBeforeRow = cellHead.isEmpty() ? null : cellHead.last(); - SortedSet snapshotHead = snapshotAtCreation - .headSet(firstKeyOnRow); - Cell snapshotBeforeRow = snapshotHead.isEmpty() ? null : snapshotHead - .last(); - Cell lastCellBeforeRow = getHighest(cellSetBeforeRow, snapshotBeforeRow); - if (lastCellBeforeRow == null) { - theNext = null; - return false; - } - Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow); - this.stopSkippingCellsIfNextRow = true; - seek(firstKeyOnPreviousRow); - this.stopSkippingCellsIfNextRow = false; - if (peek() == null - || comparator.compareRows(peek(), firstKeyOnPreviousRow) > 0) { - keepSeeking = true; - key = firstKeyOnPreviousRow; - continue; - } else { - keepSeeking = false; - } - } while (keepSeeking); - return true; - } - - @Override - public synchronized boolean seekToLastRow() { - Cell first = cellSetAtCreation.isEmpty() ? null : cellSetAtCreation - .last(); - Cell second = snapshotAtCreation.isEmpty() ? null - : snapshotAtCreation.last(); - Cell higherCell = getHighest(first, second); - if (higherCell == null) { - return false; - } - Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell); - if (seek(firstCellOnLastRow)) { - return true; - } else { - return seekToPreviousRow(higherCell); - } - - } - } - - public final static long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT - + (9 * ClassSize.REFERENCE) + (3 * Bytes.SIZEOF_LONG) + Bytes.SIZEOF_BOOLEAN); - - public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD + - ClassSize.ATOMIC_LONG + (2 * ClassSize.TIMERANGE_TRACKER) + - (2 * ClassSize.CELL_SKIPLIST_SET) + (2 * ClassSize.CONCURRENT_SKIPLISTMAP)); - - /* - * Calculate how the MemStore size has changed. Includes overhead of the - * backing Map. - * @param cell - * @param notpresent True if the cell was NOT present in the set. - * @return Size - */ - static long heapSizeChange(final Cell cell, final boolean notpresent) { - return notpresent ? ClassSize.align(ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY - + CellUtil.estimatedHeapSizeOf(cell)) : 0; - } - - private long keySize() { - return heapSize() - DEEP_OVERHEAD; - } - - /** - * Get the entire heap usage for this MemStore not including keys in the - * snapshot. - */ - @Override - public long heapSize() { - return size.get(); - } - @Override public long size() { return heapSize(); } + /** + * Check whether anything need to be done based on the current active set size + * Nothing need to be done for the DefaultMemStore + */ + @Override + protected void checkActiveSize() { + return; + } + /** * Code to help figure if our approximation of object heap sizes is close * enough. See hbase-900. Fills memstores then waits so user can heap @@ -978,9 +198,6 @@ public class DefaultMemStore implements MemStore { LOG.info("memstore2 estimated size=" + size); final int seconds = 30; LOG.info("Waiting " + seconds + " seconds while heap dump is taken"); - for (int i = 0; i < seconds; i++) { - // Thread.sleep(1000); - } LOG.info("Exiting."); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index c65326aa49c..5c29fb41ed1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -18,6 +18,13 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableCollection; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; + import java.io.IOException; import java.io.InterruptedIOException; import java.net.InetSocketAddress; @@ -91,13 +98,6 @@ import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableCollection; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - /** * A Store holds a column family in a Region. Its a memstore and a set of zero * or more StoreFiles, which stretch backwards over time. @@ -1636,7 +1636,7 @@ public class HStore implements Store { this.lock.readLock().unlock(); } - LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating " + (request.isMajor() ? "major" : "minor") + " compaction" + (request.isAllFiles() ? " (all files)" : "")); this.region.reportCompactionRequestStart(request.isMajor()); @@ -1990,8 +1990,6 @@ public class HStore implements Store { } /** - * Used in tests. TODO: Remove - * * Updates the value for the given row/family/qualifier. This function will always be seen as * atomic by other readers because it only puts a single KV to memstore. Thus no read/write * control necessary. @@ -2002,6 +2000,7 @@ public class HStore implements Store { * @return memstore size delta * @throws IOException */ + @VisibleForTesting public long updateColumnValue(byte [] row, byte [] f, byte [] qualifier, long newValue) throws IOException { @@ -2055,7 +2054,8 @@ public class HStore implements Store { */ @Override public void prepare() { - this.snapshot = memstore.snapshot(); + // passing the current sequence number of the wal - to allow bookkeeping in the memstore + this.snapshot = memstore.snapshot(cacheFlushSeqNum); this.cacheFlushCount = snapshot.getCellsCount(); this.cacheFlushSize = snapshot.getSize(); committedFiles = new ArrayList(1); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java new file mode 100644 index 00000000000..cfcd81e0ea6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegment.java @@ -0,0 +1,72 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * ImmutableSegment is an abstract class that extends the API supported by a {@link Segment}, + * and is not needed for a {@link MutableSegment}. Specifically, the method + * {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the + * {@link MemStoreSnapshot} object. + * In addition, this class overrides methods that are not likely to be supported by an immutable + * segment, e.g. {@link Segment#rollback(Cell)} and {@link Segment#getCellSet()}, which + * can be very inefficient. + */ +@InterfaceAudience.Private +public abstract class ImmutableSegment extends Segment { + + public ImmutableSegment(Segment segment) { + super(segment); + } + + /** + * Removes the given cell from this segment. + * By default immutable store segment can not rollback + * It may be invoked by tests in specific cases where it is known to be supported {@link + * ImmutableSegmentAdapter} + */ + @Override + public long rollback(Cell cell) { + return 0; + } + + /** + * Returns a set of all the cells in the segment. + * The implementation of this method might be very inefficient for some immutable segments + * that do not maintain a cell set. Therefore by default this method is not supported. + * It may be invoked by tests in specific cases where it is known to be supported {@link + * ImmutableSegmentAdapter} + */ + @Override + public CellSet getCellSet() { + throw new NotImplementedException("Immutable Segment does not support this operation by " + + "default"); + } + + /** + * Builds a special scanner for the MemStoreSnapshot object that may be different than the + * general segment scanner. + * @return a special scanner for the MemStoreSnapshot object + */ + public abstract KeyValueScanner getKeyValueScanner(); + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java new file mode 100644 index 00000000000..058865a93c6 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ImmutableSegmentAdapter.java @@ -0,0 +1,107 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; + +/** + * This segment is adapting a mutable segment making it into an immutable segment. + * This is used when a mutable segment is moved to being a snapshot or pushed into a compaction + * pipeline, that consists only of immutable segments. + * The compaction may generate different type of immutable segment + */ +@InterfaceAudience.Private +public class ImmutableSegmentAdapter extends ImmutableSegment { + + final private MutableSegment adaptee; + + public ImmutableSegmentAdapter(MutableSegment segment) { + super(segment); + this.adaptee = segment; + } + + @Override + public KeyValueScanner getKeyValueScanner() { + return new CollectionBackedScanner(adaptee.getCellSet(), adaptee.getComparator()); + } + + @Override + public SegmentScanner getSegmentScanner(long readPoint) { + return adaptee.getSegmentScanner(readPoint); + } + + @Override + public boolean isEmpty() { + return adaptee.isEmpty(); + } + + @Override + public int getCellsCount() { + return adaptee.getCellsCount(); + } + + @Override + public long add(Cell cell) { + return adaptee.add(cell); + } + + @Override + public Cell getFirstAfter(Cell cell) { + return adaptee.getFirstAfter(cell); + } + + @Override + public void close() { + adaptee.close(); + } + + @Override + public Cell maybeCloneWithAllocator(Cell cell) { + return adaptee.maybeCloneWithAllocator(cell); + } + + @Override + public Segment setSize(long size) { + adaptee.setSize(size); + return this; + } + + @Override + public long getSize() { + return adaptee.getSize(); + } + + @Override + public long rollback(Cell cell) { + return adaptee.rollback(cell); + } + + @Override + public CellSet getCellSet() { + return adaptee.getCellSet(); + } + + @Override + public void dump(Log log) { + adaptee.dump(log); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java index e9f810392b9..a10ccd990e7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java @@ -17,10 +17,11 @@ */ package org.apache.hadoop.hbase.regionserver; +import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.io.HeapSize; /** @@ -40,11 +41,20 @@ public interface MemStore extends HeapSize { */ MemStoreSnapshot snapshot(); + /** + * Creates a snapshot of the current memstore. Snapshot must be cleared by call to + * {@link #clearSnapshot(long)}. + * @param flushOpSeqId the current sequence number of the wal; to be attached to the flushed + * segment + * @return {@link MemStoreSnapshot} + */ + MemStoreSnapshot snapshot(long flushOpSeqId); + /** * Clears the current snapshot of the Memstore. * @param id * @throws UnexpectedStateException - * @see #snapshot() + * @see #snapshot(long) */ void clearSnapshot(long id) throws UnexpectedStateException; @@ -128,7 +138,7 @@ public interface MemStore extends HeapSize { * @return scanner over the memstore. This might include scanner over the snapshot when one is * present. */ - List getScanners(long readPt); + List getScanners(long readPt) throws IOException; /** * @return Total memory occupied by this MemStore. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java new file mode 100644 index 00000000000..dfcec25e1b1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreScanner.java @@ -0,0 +1,348 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.htrace.Trace; + +/** + * This is the scanner for any MemStore implementation, derived from MemStore. + * The MemStoreScanner combines SegmentScanner from different Segments and + * uses the key-value heap and the reversed key-value heap for the aggregated key-values set. + * It is assumed that only traversing forward or backward is used (without zigzagging in between) + */ +@InterfaceAudience.Private +public class MemStoreScanner extends NonLazyKeyValueScanner { + /** + * Types of cell MemStoreScanner + */ + static public enum Type { + UNDEFINED, + COMPACT_FORWARD, + USER_SCAN_FORWARD, + USER_SCAN_BACKWARD + } + + // heap of scanners used for traversing forward + private KeyValueHeap forwardHeap; + // reversed scanners heap for traversing backward + private ReversedKeyValueHeap backwardHeap; + + // The type of the scan is defined by constructor + // or according to the first usage + private Type type = Type.UNDEFINED; + + private long readPoint; + // remember the initial version of the scanners list + List scanners; + // pointer back to the relevant MemStore + // is needed for shouldSeek() method + private AbstractMemStore backwardReferenceToMemStore; + + /** + * Constructor. + * If UNDEFINED type for MemStoreScanner is provided, the forward heap is used as default! + * After constructor only one heap is going to be initialized for entire lifespan + * of the MemStoreScanner. A specific scanner can only be one directional! + * + * @param ms Pointer back to the MemStore + * @param readPoint Read point below which we can safely remove duplicate KVs + * @param type The scan type COMPACT_FORWARD should be used for compaction + */ + public MemStoreScanner(AbstractMemStore ms, long readPoint, Type type) throws IOException { + this(ms, ms.getListOfScanners(readPoint), readPoint, type); + } + + /* Constructor used only when the scan usage is unknown + and need to be defined according to the first move */ + public MemStoreScanner(AbstractMemStore ms, long readPt) throws IOException { + this(ms, readPt, Type.UNDEFINED); + } + + public MemStoreScanner(AbstractMemStore ms, List scanners, long readPoint, + Type type) throws IOException { + super(); + this.readPoint = readPoint; + this.type = type; + switch (type) { + case UNDEFINED: + case USER_SCAN_FORWARD: + case COMPACT_FORWARD: + this.forwardHeap = new KeyValueHeap(scanners, ms.getComparator()); + break; + case USER_SCAN_BACKWARD: + this.backwardHeap = new ReversedKeyValueHeap(scanners, ms.getComparator()); + break; + default: + throw new IllegalArgumentException("Unknown scanner type in MemStoreScanner"); + } + this.backwardReferenceToMemStore = ms; + this.scanners = scanners; + if (Trace.isTracing() && Trace.currentSpan() != null) { + Trace.currentSpan().addTimelineAnnotation("Creating MemStoreScanner"); + } + } + + /** + * Returns the cell from the top-most scanner without advancing the iterator. + * The backward traversal is assumed, only if specified explicitly + */ + @Override + public synchronized Cell peek() { + if (type == Type.USER_SCAN_BACKWARD) { + return backwardHeap.peek(); + } + return forwardHeap.peek(); + } + + /** + * Gets the next cell from the top-most scanner. Assumed forward scanning. + */ + @Override + public synchronized Cell next() throws IOException { + KeyValueHeap heap = (Type.USER_SCAN_BACKWARD == type) ? backwardHeap : forwardHeap; + + // loop over till the next suitable value + // take next value from the heap + for (Cell currentCell = heap.next(); + currentCell != null; + currentCell = heap.next()) { + + // all the logic of presenting cells is inside the internal SegmentScanners + // located inside the heap + + return currentCell; + } + return null; + } + + /** + * Set the scanner at the seek key. Assumed forward scanning. + * Must be called only once: there is no thread safety between the scanner + * and the memStore. + * + * @param cell seek value + * @return false if the key is null or if there is no data + */ + @Override + public synchronized boolean seek(Cell cell) throws IOException { + assertForward(); + + if (cell == null) { + close(); + return false; + } + + return forwardHeap.seek(cell); + } + + /** + * Move forward on the sub-lists set previously by seek. Assumed forward scanning. + * + * @param cell seek value (should be non-null) + * @return true if there is at least one KV to read, false otherwise + */ + @Override + public synchronized boolean reseek(Cell cell) throws IOException { + /* + * See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. + * This code is executed concurrently with flush and puts, without locks. + * Two points must be known when working on this code: + * 1) It's not possible to use the 'kvTail' and 'snapshot' + * variables, as they are modified during a flush. + * 2) The ideal implementation for performance would use the sub skip list + * implicitly pointed by the iterators 'kvsetIt' and + * 'snapshotIt'. Unfortunately the Java API does not offer a method to + * get it. So we remember the last keys we iterated to and restore + * the reseeked set to at least that point. + * + * TODO: The above comment copied from the original MemStoreScanner + */ + assertForward(); + return forwardHeap.reseek(cell); + } + + /** + * MemStoreScanner returns max value as sequence id because it will + * always have the latest data among all files. + */ + @Override + public synchronized long getSequenceID() { + return Long.MAX_VALUE; + } + + @Override + public synchronized void close() { + + if (forwardHeap != null) { + assert ((type == Type.USER_SCAN_FORWARD) || + (type == Type.COMPACT_FORWARD) || (type == Type.UNDEFINED)); + forwardHeap.close(); + forwardHeap = null; + if (backwardHeap != null) { + backwardHeap.close(); + backwardHeap = null; + } + } else if (backwardHeap != null) { + assert (type == Type.USER_SCAN_BACKWARD); + backwardHeap.close(); + backwardHeap = null; + } + } + + /** + * Set the scanner at the seek key. Assumed backward scanning. + * + * @param cell seek value + * @return false if the key is null or if there is no data + */ + @Override + public synchronized boolean backwardSeek(Cell cell) throws IOException { + initBackwardHeapIfNeeded(cell, false); + return backwardHeap.backwardSeek(cell); + } + + /** + * Assumed backward scanning. + * + * @param cell seek value + * @return false if the key is null or if there is no data + */ + @Override + public synchronized boolean seekToPreviousRow(Cell cell) throws IOException { + initBackwardHeapIfNeeded(cell, false); + if (backwardHeap.peek() == null) { + restartBackwardHeap(cell); + } + return backwardHeap.seekToPreviousRow(cell); + } + + @Override + public synchronized boolean seekToLastRow() throws IOException { + // TODO: it looks like this is how it should be, however ReversedKeyValueHeap class doesn't + // implement seekToLastRow() method :( + // however seekToLastRow() was implemented in internal MemStoreScanner + // so I wonder whether we need to come with our own workaround, or to update + // ReversedKeyValueHeap + return initBackwardHeapIfNeeded(KeyValue.LOWESTKEY, true); + } + + /** + * Check if this memstore may contain the required keys + * @return False if the key definitely does not exist in this Memstore + */ + @Override + public synchronized boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { + + if (type == Type.COMPACT_FORWARD) { + return true; + } + + for (SegmentScanner sc : scanners) { + if (sc.shouldSeek(scan, oldestUnexpiredTS)) { + return true; + } + } + return false; + } + + // debug method + @Override + public String toString() { + StringBuffer buf = new StringBuffer(); + int i = 1; + for (SegmentScanner scanner : scanners) { + buf.append("scanner (" + i + ") " + scanner.toString() + " ||| "); + i++; + } + return buf.toString(); + } + /****************** Private methods ******************/ + /** + * Restructure the ended backward heap after rerunning a seekToPreviousRow() + * on each scanner + * @return false if given Cell does not exist in any scanner + */ + private boolean restartBackwardHeap(Cell cell) throws IOException { + boolean res = false; + for (SegmentScanner scan : scanners) { + res |= scan.seekToPreviousRow(cell); + } + this.backwardHeap = + new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator()); + return res; + } + + /** + * Checks whether the type of the scan suits the assumption of moving backward + */ + private boolean initBackwardHeapIfNeeded(Cell cell, boolean toLast) throws IOException { + boolean res = false; + if (toLast && (type != Type.UNDEFINED)) { + throw new IllegalStateException( + "Wrong usage of initBackwardHeapIfNeeded in parameters. The type is:" + type.toString()); + } + if (type == Type.UNDEFINED) { + // In case we started from peek, release the forward heap + // and build backward. Set the correct type. Thus this turn + // can happen only once + if ((backwardHeap == null) && (forwardHeap != null)) { + forwardHeap.close(); + forwardHeap = null; + // before building the heap seek for the relevant key on the scanners, + // for the heap to be built from the scanners correctly + for (SegmentScanner scan : scanners) { + if (toLast) { + res |= scan.seekToLastRow(); + } else { + res |= scan.backwardSeek(cell); + } + } + this.backwardHeap = + new ReversedKeyValueHeap(scanners, backwardReferenceToMemStore.getComparator()); + type = Type.USER_SCAN_BACKWARD; + } + } + + if (type == Type.USER_SCAN_FORWARD) { + throw new IllegalStateException("Traversing backward with forward scan"); + } + return res; + } + + /** + * Checks whether the type of the scan suits the assumption of moving forward + */ + private void assertForward() throws IllegalStateException { + if (type == Type.UNDEFINED) { + type = Type.USER_SCAN_FORWARD; + } + + if (type == Type.USER_SCAN_BACKWARD) { + throw new IllegalStateException("Traversing forward with backward scan"); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java index be853c5f595..28ab693a09d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreSnapshot.java @@ -34,14 +34,13 @@ public class MemStoreSnapshot { private final KeyValueScanner scanner; private final boolean tagsPresent; - public MemStoreSnapshot(long id, int cellsCount, long size, TimeRangeTracker timeRangeTracker, - KeyValueScanner scanner, boolean tagsPresent) { + public MemStoreSnapshot(long id, ImmutableSegment snapshot) { this.id = id; - this.cellsCount = cellsCount; - this.size = size; - this.timeRangeTracker = timeRangeTracker; - this.scanner = scanner; - this.tagsPresent = tagsPresent; + this.cellsCount = snapshot.getCellsCount(); + this.size = snapshot.getSize(); + this.timeRangeTracker = snapshot.getTimeRangeTracker(); + this.scanner = snapshot.getKeyValueScanner(); + this.tagsPresent = snapshot.isTagsPresent(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegment.java new file mode 100644 index 00000000000..743416ce82d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegment.java @@ -0,0 +1,153 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.Iterator; +import java.util.SortedSet; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * This mutable store segment encapsulates a mutable cell set and its respective memory allocation + * buffers (MSLAB). + */ +@InterfaceAudience.Private +final class MutableCellSetSegment extends MutableSegment { + + private volatile CellSet cellSet; + private final CellComparator comparator; + + // Instantiate objects only using factory + MutableCellSetSegment(CellSet cellSet, MemStoreLAB memStoreLAB, long size, + CellComparator comparator) { + super(memStoreLAB, size); + this.cellSet = cellSet; + this.comparator = comparator; + } + + @Override + public SegmentScanner getSegmentScanner(long readPoint) { + return new MutableCellSetSegmentScanner(this, readPoint); + } + + @Override + public boolean isEmpty() { + return getCellSet().isEmpty(); + } + + @Override + public int getCellsCount() { + return getCellSet().size(); + } + + @Override + public long add(Cell cell) { + boolean succ = getCellSet().add(cell); + long s = AbstractMemStore.heapSizeChange(cell, succ); + updateMetaInfo(cell, s); + // In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call. + // When we use ACL CP or Visibility CP which deals with Tags during + // mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not + // parse the byte[] to identify the tags length. + if(cell.getTagsLength() > 0) { + tagsPresent = true; + } + return s; + } + + @Override + public long rollback(Cell cell) { + Cell found = get(cell); + if (found != null && found.getSequenceId() == cell.getSequenceId()) { + long sz = AbstractMemStore.heapSizeChange(cell, true); + remove(cell); + incSize(-sz); + return sz; + } + return 0; + } + + @Override + public Cell getFirstAfter(Cell cell) { + SortedSet snTailSet = tailSet(cell); + if (!snTailSet.isEmpty()) { + return snTailSet.first(); + } + return null; + } + + @Override + public void dump(Log log) { + for (Cell cell: getCellSet()) { + log.debug(cell); + } + } + + @Override + public SortedSet tailSet(Cell firstCell) { + return getCellSet().tailSet(firstCell); + } + @Override + public CellSet getCellSet() { + return cellSet; + } + @Override + public CellComparator getComparator() { + return comparator; + } + + //*** Methods for MemStoreSegmentsScanner + public Cell last() { + return getCellSet().last(); + } + + public Iterator iterator() { + return getCellSet().iterator(); + } + + public SortedSet headSet(Cell firstKeyOnRow) { + return getCellSet().headSet(firstKeyOnRow); + } + + public int compare(Cell left, Cell right) { + return getComparator().compare(left, right); + } + + public int compareRows(Cell left, Cell right) { + return getComparator().compareRows(left, right); + } + + private Cell get(Cell cell) { + return getCellSet().get(cell); + } + + private boolean remove(Cell e) { + return getCellSet().remove(e); + } + + // methods for tests + @Override + Cell first() { + return this.getCellSet().first(); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegmentScanner.java new file mode 100644 index 00000000000..17791ff33cd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableCellSetSegmentScanner.java @@ -0,0 +1,258 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.Iterator; +import java.util.SortedSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * A scanner of a single cells segment {@link MutableCellSetSegment}. + */ +@InterfaceAudience.Private +class MutableCellSetSegmentScanner extends SegmentScanner { + + // the observed structure + private final MutableCellSetSegment segment; + // the highest relevant MVCC + private long readPoint; + // the current iterator that can be reinitialized by + // seek(), backwardSeek(), or reseek() + private Iterator iter; + // the pre-calculated cell to be returned by peek() + private Cell current = null; + // or next() + // A flag represents whether could stop skipping KeyValues for MVCC + // if have encountered the next row. Only used for reversed scan + private boolean stopSkippingKVsIfNextRow = false; + // last iterated KVs by seek (to restore the iterator state after reseek) + private Cell last = null; + + public MutableCellSetSegmentScanner(MutableCellSetSegment segment, long readPoint) { + super(); + this.segment = segment; + this.readPoint = readPoint; + iter = segment.iterator(); + // the initialization of the current is required for working with heap of SegmentScanners + current = getNext(); + //increase the reference count so the underlying structure will not be de-allocated + this.segment.incScannerCount(); + } + + /** + * Look at the next Cell in this scanner, but do not iterate the scanner + * @return the currently observed Cell + */ + @Override + public Cell peek() { // sanity check, the current should be always valid + if (current!=null && current.getSequenceId() > readPoint) { + throw new RuntimeException("current is invalid: read point is "+readPoint+", " + + "while current sequence id is " +current.getSequenceId()); + } + + return current; + } + + /** + * Return the next Cell in this scanner, iterating the scanner + * @return the next Cell or null if end of scanner + */ + @Override + public Cell next() throws IOException { + Cell oldCurrent = current; + current = getNext(); // update the currently observed Cell + return oldCurrent; + } + + /** + * Seek the scanner at or after the specified Cell. + * @param cell seek value + * @return true if scanner has values left, false if end of scanner + */ + @Override + public boolean seek(Cell cell) throws IOException { + if(cell == null) { + close(); + return false; + } + // restart the iterator from new key + iter = segment.tailSet(cell).iterator(); + // last is going to be reinitialized in the next getNext() call + last = null; + current = getNext(); + return (current != null); + } + + /** + * Reseek the scanner at or after the specified KeyValue. + * This method is guaranteed to seek at or after the required key only if the + * key comes after the current position of the scanner. Should not be used + * to seek to a key which may come before the current position. + * + * @param cell seek value (should be non-null) + * @return true if scanner has values left, false if end of scanner + */ + @Override + public boolean reseek(Cell cell) throws IOException { + + /* + See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation. + This code is executed concurrently with flush and puts, without locks. + The ideal implementation for performance would use the sub skip list implicitly + pointed by the iterator. Unfortunately the Java API does not offer a method to + get it. So we remember the last keys we iterated to and restore + the reseeked set to at least that point. + */ + iter = segment.tailSet(getHighest(cell, last)).iterator(); + current = getNext(); + return (current != null); + } + + /** + * Seek the scanner at or before the row of specified Cell, it firstly + * tries to seek the scanner at or after the specified Cell, return if + * peek KeyValue of scanner has the same row with specified Cell, + * otherwise seek the scanner at the first Cell of the row which is the + * previous row of specified KeyValue + * + * @param key seek Cell + * @return true if the scanner is at the valid KeyValue, false if such Cell does not exist + */ + @Override + public boolean backwardSeek(Cell key) throws IOException { + seek(key); // seek forward then go backward + if (peek() == null || segment.compareRows(peek(), key) > 0) { + return seekToPreviousRow(key); + } + return true; + } + + /** + * Seek the scanner at the first Cell of the row which is the previous row + * of specified key + * + * @param cell seek value + * @return true if the scanner at the first valid Cell of previous row, + * false if not existing such Cell + */ + @Override + public boolean seekToPreviousRow(Cell cell) throws IOException { + boolean keepSeeking = false; + Cell key = cell; + + do { + Cell firstKeyOnRow = CellUtil.createFirstOnRow(key); + SortedSet cellHead = segment.headSet(firstKeyOnRow); + Cell lastCellBeforeRow = cellHead.isEmpty() ? null : cellHead.last(); + if (lastCellBeforeRow == null) { + current = null; + return false; + } + Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow); + this.stopSkippingKVsIfNextRow = true; + seek(firstKeyOnPreviousRow); + this.stopSkippingKVsIfNextRow = false; + if (peek() == null + || segment.getComparator().compareRows(peek(), firstKeyOnPreviousRow) > 0) { + keepSeeking = true; + key = firstKeyOnPreviousRow; + continue; + } else { + keepSeeking = false; + } + } while (keepSeeking); + return true; + } + + /** + * Seek the scanner at the first KeyValue of last row + * + * @return true if scanner has values left, false if the underlying data is empty + */ + @Override + public boolean seekToLastRow() throws IOException { + Cell higherCell = segment.isEmpty() ? null : segment.last(); + if (higherCell == null) { + return false; + } + + Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell); + + if (seek(firstCellOnLastRow)) { + return true; + } else { + return seekToPreviousRow(higherCell); + } + } + + @Override protected Segment getSegment() { + return segment; + } + + /********************* Private Methods **********************/ + + /** + * Private internal method for iterating over the segment, + * skipping the cells with irrelevant MVCC + */ + private Cell getNext() { + Cell startKV = current; + Cell next = null; + + try { + while (iter.hasNext()) { + next = iter.next(); + if (next.getSequenceId() <= this.readPoint) { + return next; // skip irrelevant versions + } + if (stopSkippingKVsIfNextRow && // for backwardSeek() stay in the + startKV != null && // boundaries of a single row + segment.compareRows(next, startKV) > 0) { + return null; + } + } // end of while + + return null; // nothing found + } finally { + if (next != null) { + // in all cases, remember the last KV we iterated to, needed for reseek() + last = next; + } + } + } + + /** + * Private internal method that returns the higher of the two key values, or null + * if they are both null + */ + private Cell getHighest(Cell first, Cell second) { + if (first == null && second == null) { + return null; + } + if (first != null && second != null) { + int compare = segment.compare(first, second); + return (compare > 0 ? first : second); + } + return (first != null ? first : second); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java new file mode 100644 index 00000000000..fcaddb0759e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MutableSegment.java @@ -0,0 +1,57 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.SortedSet; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; + +/** + * An abstraction of a mutable segment in memstore, specifically the active segment. + */ +@InterfaceAudience.Private +public abstract class MutableSegment extends Segment { + + protected MutableSegment(MemStoreLAB memStoreLAB, long size) { + super(memStoreLAB, size); + } + + /** + * Returns a subset of the segment cell set, which starts with the given cell + * @param firstCell a cell in the segment + * @return a subset of the segment cell set, which starts with the given cell + */ + public abstract SortedSet tailSet(Cell firstCell); + + /** + * Returns the Cell comparator used by this segment + * @return the Cell comparator used by this segment + */ + public abstract CellComparator getComparator(); + + //methods for test + + /** + * Returns the first cell in the segment + * @return the first cell in the segment + */ + abstract Cell first(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java new file mode 100644 index 00000000000..7891809912d --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Segment.java @@ -0,0 +1,218 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.util.ByteRange; + +/** + * This is an abstraction of a segment maintained in a memstore, e.g., the active + * cell set or its snapshot. + * + * This abstraction facilitates the management of the compaction pipeline and the shifts of these + * segments from active set to snapshot set in the default implementation. + */ +@InterfaceAudience.Private +public abstract class Segment { + + private volatile MemStoreLAB memStoreLAB; + private final AtomicLong size; + private final TimeRangeTracker timeRangeTracker; + protected volatile boolean tagsPresent; + + protected Segment(MemStoreLAB memStoreLAB, long size) { + this.memStoreLAB = memStoreLAB; + this.size = new AtomicLong(size); + this.timeRangeTracker = new TimeRangeTracker(); + this.tagsPresent = false; + } + + protected Segment(Segment segment) { + this.memStoreLAB = segment.getMemStoreLAB(); + this.size = new AtomicLong(segment.getSize()); + this.timeRangeTracker = segment.getTimeRangeTracker(); + this.tagsPresent = segment.isTagsPresent(); + } + + /** + * Creates the scanner that is able to scan the concrete segment + * @return a scanner for the given read point + */ + public abstract SegmentScanner getSegmentScanner(long readPoint); + + /** + * Returns whether the segment has any cells + * @return whether the segment has any cells + */ + public abstract boolean isEmpty(); + + /** + * Returns number of cells in segment + * @return number of cells in segment + */ + public abstract int getCellsCount(); + + /** + * Adds the given cell into the segment + * @return the change in the heap size + */ + public abstract long add(Cell cell); + + /** + * Removes the given cell from the segment + * @return the change in the heap size + */ + public abstract long rollback(Cell cell); + + /** + * Returns the first cell in the segment that has equal or greater key than the given cell + * @return the first cell in the segment that has equal or greater key than the given cell + */ + public abstract Cell getFirstAfter(Cell cell); + + /** + * Returns a set of all cells in the segment + * @return a set of all cells in the segment + */ + public abstract CellSet getCellSet(); + + /** + * Closing a segment before it is being discarded + */ + public void close() { + MemStoreLAB mslab = getMemStoreLAB(); + if(mslab != null) { + mslab.close(); + } + // do not set MSLab to null as scanners may still be reading the data here and need to decrease + // the counter when they finish + } + + /** + * If the segment has a memory allocator the cell is being cloned to this space, and returned; + * otherwise the given cell is returned + * @return either the given cell or its clone + */ + public Cell maybeCloneWithAllocator(Cell cell) { + if (getMemStoreLAB() == null) { + return cell; + } + + int len = KeyValueUtil.length(cell); + ByteRange alloc = getMemStoreLAB().allocateBytes(len); + if (alloc == null) { + // The allocation was too large, allocator decided + // not to do anything with it. + return cell; + } + assert alloc.getBytes() != null; + KeyValueUtil.appendToByteArray(cell, alloc.getBytes(), alloc.getOffset()); + KeyValue newKv = new KeyValue(alloc.getBytes(), alloc.getOffset(), len); + newKv.setSequenceId(cell.getSequenceId()); + return newKv; + } + + public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { + return (getTimeRangeTracker().includesTimeRange(scan.getTimeRange()) + && (getTimeRangeTracker().getMaximumTimestamp() >= + oldestUnexpiredTS)); + } + + public long getMinTimestamp() { + return getTimeRangeTracker().getMinimumTimestamp(); + } + + public boolean isTagsPresent() { + return tagsPresent; + } + + public void incScannerCount() { + if(getMemStoreLAB() != null) { + getMemStoreLAB().incScannerCount(); + } + } + + public void decScannerCount() { + if(getMemStoreLAB() != null) { + getMemStoreLAB().decScannerCount(); + } + } + + /** + * Setting the heap size of the segment - used to account for different class overheads + * @return this object + */ + + public Segment setSize(long size) { + this.size.set(size); + return this; + } + + /** + * Returns the heap size of the segment + * @return the heap size of the segment + */ + public long getSize() { + return size.get(); + } + + /** + * Increases the heap size counter of the segment by the given delta + */ + public void incSize(long delta) { + size.addAndGet(delta); + } + + public TimeRangeTracker getTimeRangeTracker() { + return timeRangeTracker; + } + + protected void updateMetaInfo(Cell toAdd, long s) { + getTimeRangeTracker().includeTimestamp(toAdd); + size.addAndGet(s); + } + + private MemStoreLAB getMemStoreLAB() { + return memStoreLAB; + } + + // Debug methods + /** + * Dumps all cells of the segment into the given log + */ + public abstract void dump(Log log); + + @Override + public String toString() { + String res = "Store segment of type "+this.getClass().getName()+"; "; + res += "isEmpty "+(isEmpty()?"yes":"no")+"; "; + res += "cellCount "+getCellsCount()+"; "; + res += "size "+getSize()+"; "; + res += "Min ts "+getMinTimestamp()+"; "; + return res; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java new file mode 100644 index 00000000000..ccb11dfe0c5 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentFactory.java @@ -0,0 +1,89 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellComparator; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * A singleton store segment factory. + * Generate concrete store segments. + */ +@InterfaceAudience.Private +public final class SegmentFactory { + + static final String USEMSLAB_KEY = "hbase.hregion.memstore.mslab.enabled"; + static final boolean USEMSLAB_DEFAULT = true; + static final String MSLAB_CLASS_NAME = "hbase.regionserver.mslab.class"; + + private SegmentFactory() {} + private static SegmentFactory instance = new SegmentFactory(); + public static SegmentFactory instance() { + return instance; + } + + public ImmutableSegment createImmutableSegment(final Configuration conf, + final CellComparator comparator, long size) { + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + MutableSegment segment = generateMutableSegment(conf, comparator, memStoreLAB, size); + return createImmutableSegment(conf, segment); + } + + public ImmutableSegment createImmutableSegment(CellComparator comparator, + long size) { + MutableSegment segment = generateMutableSegment(null, comparator, null, size); + return createImmutableSegment(null, segment); + } + + public ImmutableSegment createImmutableSegment(final Configuration conf, MutableSegment segment) { + return generateImmutableSegment(conf, segment); + } + public MutableSegment createMutableSegment(final Configuration conf, + CellComparator comparator, long size) { + MemStoreLAB memStoreLAB = getMemStoreLAB(conf); + return generateMutableSegment(conf, comparator, memStoreLAB, size); + } + + //****** private methods to instantiate concrete store segments **********// + + private ImmutableSegment generateImmutableSegment(final Configuration conf, + MutableSegment segment) { + // TBD use configuration to set type of segment + return new ImmutableSegmentAdapter(segment); + } + private MutableSegment generateMutableSegment( + final Configuration conf, CellComparator comparator, MemStoreLAB memStoreLAB, long size) { + // TBD use configuration to set type of segment + CellSet set = new CellSet(comparator); + return new MutableCellSetSegment(set, memStoreLAB, size, comparator); + } + + private MemStoreLAB getMemStoreLAB(Configuration conf) { + MemStoreLAB memStoreLAB = null; + if (conf.getBoolean(USEMSLAB_KEY, USEMSLAB_DEFAULT)) { + String className = conf.get(MSLAB_CLASS_NAME, HeapMemStoreLAB.class.getName()); + memStoreLAB = ReflectionUtils.instantiateWithCustomCtor(className, + new Class[] { Configuration.class }, new Object[] { conf }); + } + return memStoreLAB; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java new file mode 100644 index 00000000000..8852d5cc6cc --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -0,0 +1,152 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.Scan; + +/** + * An abstraction for store segment scanner. + */ +@InterfaceAudience.Private +public abstract class SegmentScanner implements KeyValueScanner { + + private long sequenceID = Long.MAX_VALUE; + + protected abstract Segment getSegment(); + + /** + * Get the sequence id associated with this KeyValueScanner. This is required + * for comparing multiple files (or memstore segments) scanners to find out + * which one has the latest data. + * + */ + @Override + public long getSequenceID() { + return sequenceID; + } + + /** + * Close the KeyValue scanner. + */ + @Override + public void close() { + getSegment().decScannerCount(); + } + + /** + * This functionality should be resolved in the higher level which is + * MemStoreScanner, currently returns true as default. Doesn't throw + * IllegalStateException in order not to change the signature of the + * overridden method + */ + @Override + public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) { + return true; + } + /** + * This scanner is working solely on the in-memory MemStore therefore this + * interface is not relevant. + */ + @Override + public boolean requestSeek(Cell c, boolean forward, boolean useBloom) + throws IOException { + + throw new IllegalStateException( + "requestSeek cannot be called on MutableCellSetSegmentScanner"); + } + + /** + * This scanner is working solely on the in-memory MemStore and doesn't work on + * store files, MutableCellSetSegmentScanner always does the seek, + * therefore always returning true. + */ + @Override + public boolean realSeekDone() { + return true; + } + + /** + * This function should be never called on scanners that always do real seek operations (i.e. most + * of the scanners and also this one). The easiest way to achieve this is to call + * {@link #realSeekDone()} first. + */ + @Override + public void enforceSeek() throws IOException { + throw new IllegalStateException( + "enforceSeek cannot be called on MutableCellSetSegmentScanner"); + } + + /** + * @return true if this is a file scanner. Otherwise a memory scanner is assumed. + */ + @Override + public boolean isFileScanner() { + return false; + } + + /** + * @return the next key in the index (the key to seek to the next block) + * if known, or null otherwise + * Not relevant for in-memory scanner + */ + @Override + public Cell getNextIndexedKey() { + return null; + } + + /** + * Called after a batch of rows scanned (RPC) and set to be returned to client. Any in between + * cleanup can be done here. Nothing to be done for MutableCellSetSegmentScanner. + */ + @Override + public void shipped() throws IOException { + // do nothing + } + + /** + * Set the sequence id of the scanner. + * This is used to determine an order between memory segment scanners. + * @param x a unique sequence id + */ + public void setSequenceID(long x) { + sequenceID = x; + } + + /** + * Returns whether the given scan should seek in this segment + * @return whether the given scan should seek in this segment + */ + public boolean shouldSeek(Scan scan, long oldestUnexpiredTS) { + return getSegment().shouldSeek(scan,oldestUnexpiredTS); + } + + //debug method + @Override + public String toString() { + String res = "Store segment scanner of type "+this.getClass().getName()+"; "; + res += "sequence id "+getSequenceID()+"; "; + res += getSegment().toString(); + return res; + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java index 34ba1fa4637..f4f25dd96ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlushContext.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; import java.util.List; -import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.monitoring.MonitoredTask; /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java index 4f30960d3df..5c79d7257f0 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/TestHeapSize.java @@ -19,6 +19,27 @@ package org.apache.hadoop.hbase.io; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; +import org.apache.hadoop.hbase.io.hfile.LruBlockCache; +import org.apache.hadoop.hbase.io.hfile.LruCachedBlock; +import org.apache.hadoop.hbase.regionserver.CellSet; +import org.apache.hadoop.hbase.regionserver.DefaultMemStore; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; +import org.apache.hadoop.hbase.testclassification.IOTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.ClassSize; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.RuntimeMXBean; @@ -35,27 +56,6 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantReadWriteLock; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.KeyValue; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.io.hfile.BlockCacheKey; -import org.apache.hadoop.hbase.io.hfile.LruCachedBlock; -import org.apache.hadoop.hbase.io.hfile.LruBlockCache; -import org.apache.hadoop.hbase.regionserver.CellSkipListSet; -import org.apache.hadoop.hbase.regionserver.DefaultMemStore; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HStore; -import org.apache.hadoop.hbase.regionserver.TimeRangeTracker; -import org.apache.hadoop.hbase.testclassification.IOTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.ClassSize; -import org.junit.BeforeClass; -import org.junit.Test; -import org.junit.experimental.categories.Category; - import static org.junit.Assert.assertEquals; /** @@ -237,8 +237,8 @@ public class TestHeapSize { assertEquals(expected, actual); } - // CellSkipListSet - cl = CellSkipListSet.class; + // CellSet + cl = CellSet.class; expected = ClassSize.estimateBase(cl, false); actual = ClassSize.CELL_SKIPLIST_SET; if (expected != actual) { @@ -305,15 +305,16 @@ public class TestHeapSize { // DefaultMemStore Deep Overhead actual = DefaultMemStore.DEEP_OVERHEAD; expected = ClassSize.estimateBase(cl, false); - expected += ClassSize.estimateBase(AtomicLong.class, false); - expected += (2 * ClassSize.estimateBase(CellSkipListSet.class, false)); + expected += (2 * ClassSize.estimateBase(AtomicLong.class, false)); + expected += (2 * ClassSize.estimateBase(CellSet.class, false)); expected += (2 * ClassSize.estimateBase(ConcurrentSkipListMap.class, false)); expected += (2 * ClassSize.estimateBase(TimeRangeTracker.class, false)); if(expected != actual) { ClassSize.estimateBase(cl, true); ClassSize.estimateBase(AtomicLong.class, true); - ClassSize.estimateBase(CellSkipListSet.class, true); - ClassSize.estimateBase(CellSkipListSet.class, true); + ClassSize.estimateBase(AtomicLong.class, true); + ClassSize.estimateBase(CellSet.class, true); + ClassSize.estimateBase(CellSet.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true); ClassSize.estimateBase(ConcurrentSkipListMap.class, true); ClassSize.estimateBase(TimeRangeTracker.class, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellSkipListSet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellSkipListSet.java index 684839d4196..e0cc39f0515 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellSkipListSet.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCellSkipListSet.java @@ -18,11 +18,7 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.util.Iterator; -import java.util.SortedSet; - import junit.framework.TestCase; - import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; @@ -32,10 +28,13 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.experimental.categories.Category; +import java.util.Iterator; +import java.util.SortedSet; + @Category({RegionServerTests.class, SmallTests.class}) public class TestCellSkipListSet extends TestCase { - private final CellSkipListSet csls = - new CellSkipListSet(CellComparator.COMPARATOR); + private final CellSet csls = + new CellSet(CellComparator.COMPARATOR); protected void setUp() throws Exception { super.setUp(); @@ -163,4 +162,4 @@ public class TestCellSkipListSet extends TestCase { assertTrue(Bytes.equals(head.first().getValueArray(), head.first().getValueOffset(), head.first().getValueLength(), value2, 0, value2.length)); } -} \ No newline at end of file +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index ec70740decb..5e6007dd39b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -18,17 +18,10 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.lang.management.ManagementFactory; -import java.lang.management.MemoryMXBean; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.base.Joiner; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; import junit.framework.TestCase; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -57,12 +50,14 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.experimental.categories.Category; -import com.google.common.base.Joiner; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; /** memstore test case */ @Category({RegionServerTests.class, MediumTests.class}) @@ -89,11 +84,9 @@ public class TestDefaultMemStore extends TestCase { byte [] other = Bytes.toBytes("somethingelse"); KeyValue samekey = new KeyValue(bytes, bytes, bytes, other); this.memstore.add(samekey); - Cell found = this.memstore.cellSet.first(); - assertEquals(1, this.memstore.cellSet.size()); - assertTrue( - Bytes.toString(found.getValueArray(), found.getValueOffset(), found.getValueLength()), - CellUtil.matchingValue(samekey, found)); + Cell found = this.memstore.getActive().first(); + assertEquals(1, this.memstore.getActive().getCellsCount()); + assertTrue(Bytes.toString(found.getValueArray()), CellUtil.matchingValue(samekey, found)); } /** @@ -108,7 +101,7 @@ public class TestDefaultMemStore extends TestCase { Configuration conf = HBaseConfiguration.create(); ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0, - this.memstore.comparator); + this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); int count = 0; @@ -476,7 +469,7 @@ public class TestDefaultMemStore extends TestCase { for (int i = 0; i < snapshotCount; i++) { addRows(this.memstore); runSnapshot(this.memstore); - assertEquals("History not being cleared", 0, this.memstore.snapshot.size()); + assertEquals("History not being cleared", 0, this.memstore.getSnapshot().getCellsCount()); } } @@ -497,7 +490,7 @@ public class TestDefaultMemStore extends TestCase { m.add(key2); assertTrue("Expected memstore to hold 3 values, actually has " + - m.cellSet.size(), m.cellSet.size() == 3); + m.getActive().getCellsCount(), m.getActive().getCellsCount() == 3); } ////////////////////////////////////////////////////////////////////////////// @@ -529,7 +522,7 @@ public class TestDefaultMemStore extends TestCase { Configuration conf = HBaseConfiguration.create(); for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, - KeepDeletedCells.FALSE, 0, this.memstore.comparator); + KeepDeletedCells.FALSE, 0, this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, @@ -570,12 +563,12 @@ public class TestDefaultMemStore extends TestCase { memstore.add(new KeyValue(row, fam ,qf3, val)); //Creating a snapshot memstore.snapshot(); - assertEquals(3, memstore.snapshot.size()); + assertEquals(3, memstore.getSnapshot().getCellsCount()); //Adding value to "new" memstore - assertEquals(0, memstore.cellSet.size()); + assertEquals(0, memstore.getActive().getCellsCount()); memstore.add(new KeyValue(row, fam ,qf4, val)); memstore.add(new KeyValue(row, fam ,qf5, val)); - assertEquals(2, memstore.cellSet.size()); + assertEquals(2, memstore.getActive().getCellsCount()); } ////////////////////////////////////////////////////////////////////////////// @@ -597,7 +590,7 @@ public class TestDefaultMemStore extends TestCase { memstore.add(put2); memstore.add(put3); - assertEquals(3, memstore.cellSet.size()); + assertEquals(3, memstore.getActive().getCellsCount()); KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.Delete, val); memstore.delete(del2); @@ -608,9 +601,9 @@ public class TestDefaultMemStore extends TestCase { expected.add(put2); expected.add(put1); - assertEquals(4, memstore.cellSet.size()); + assertEquals(4, memstore.getActive().getCellsCount()); int i = 0; - for(Cell cell : memstore.cellSet) { + for(Cell cell : memstore.getActive().getCellSet()) { assertEquals(expected.get(i++), cell); } } @@ -631,7 +624,7 @@ public class TestDefaultMemStore extends TestCase { memstore.add(put2); memstore.add(put3); - assertEquals(3, memstore.cellSet.size()); + assertEquals(3, memstore.getActive().getCellsCount()); KeyValue del2 = new KeyValue(row, fam, qf1, ts2, KeyValue.Type.DeleteColumn, val); @@ -644,9 +637,9 @@ public class TestDefaultMemStore extends TestCase { expected.add(put1); - assertEquals(4, memstore.cellSet.size()); + assertEquals(4, memstore.getActive().getCellsCount()); int i = 0; - for (Cell cell: memstore.cellSet) { + for (Cell cell: memstore.getActive().getCellSet()) { assertEquals(expected.get(i++), cell); } } @@ -684,9 +677,9 @@ public class TestDefaultMemStore extends TestCase { - assertEquals(5, memstore.cellSet.size()); + assertEquals(5, memstore.getActive().getCellsCount()); int i = 0; - for (Cell cell: memstore.cellSet) { + for (Cell cell: memstore.getActive().getCellSet()) { assertEquals(expected.get(i++), cell); } } @@ -700,8 +693,8 @@ public class TestDefaultMemStore extends TestCase { memstore.add(new KeyValue(row, fam, qf, ts, val)); KeyValue delete = new KeyValue(row, fam, qf, ts, KeyValue.Type.Delete, val); memstore.delete(delete); - assertEquals(2, memstore.cellSet.size()); - assertEquals(delete, memstore.cellSet.first()); + assertEquals(2, memstore.getActive().getCellsCount()); + assertEquals(delete, memstore.getActive().first()); } public void testRetainsDeleteVersion() throws IOException { @@ -713,8 +706,8 @@ public class TestDefaultMemStore extends TestCase { "row1", "fam", "a", 100, KeyValue.Type.Delete, "dont-care"); memstore.delete(delete); - assertEquals(2, memstore.cellSet.size()); - assertEquals(delete, memstore.cellSet.first()); + assertEquals(2, memstore.getActive().getCellsCount()); + assertEquals(delete, memstore.getActive().first()); } public void testRetainsDeleteColumn() throws IOException { // add a put to memstore @@ -725,8 +718,8 @@ public class TestDefaultMemStore extends TestCase { KeyValue.Type.DeleteColumn, "dont-care"); memstore.delete(delete); - assertEquals(2, memstore.cellSet.size()); - assertEquals(delete, memstore.cellSet.first()); + assertEquals(2, memstore.getActive().getCellsCount()); + assertEquals(delete, memstore.getActive().first()); } public void testRetainsDeleteFamily() throws IOException { // add a put to memstore @@ -737,43 +730,8 @@ public class TestDefaultMemStore extends TestCase { KeyValue.Type.DeleteFamily, "dont-care"); memstore.delete(delete); - assertEquals(2, memstore.cellSet.size()); - assertEquals(delete, memstore.cellSet.first()); - } - - //////////////////////////////////// - //Test for timestamps - //////////////////////////////////// - - /** - * Test to ensure correctness when using Memstore with multiple timestamps - */ - public void testMultipleTimestamps() throws Exception { - long[] timestamps = new long[] {20,10,5,1}; - Scan scan = new Scan(); - - for (long timestamp: timestamps) - addRows(memstore,timestamp); - - byte[] fam = Bytes.toBytes("fam"); - HColumnDescriptor hcd = mock(HColumnDescriptor.class); - when(hcd.getName()).thenReturn(fam); - Store store = mock(Store.class); - when(store.getFamily()).thenReturn(hcd); - scan.setColumnFamilyTimeRange(fam, 0, 2); - assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE)); - - scan.setColumnFamilyTimeRange(fam, 20, 82); - assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE)); - - scan.setColumnFamilyTimeRange(fam, 10, 20); - assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE)); - - scan.setColumnFamilyTimeRange(fam, 8, 12); - assertTrue(memstore.shouldSeek(scan, store, Long.MIN_VALUE)); - - scan.setColumnFamilyTimeRange(fam, 28, 42); - assertTrue(!memstore.shouldSeek(scan, store, Long.MIN_VALUE)); + assertEquals(2, memstore.getActive().getCellsCount()); + assertEquals(delete, memstore.getActive().first()); } //////////////////////////////////// @@ -795,7 +753,7 @@ public class TestDefaultMemStore extends TestCase { */ public void testUpsertMSLAB() throws Exception { Configuration conf = HBaseConfiguration.create(); - conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true); + conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true); memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR); int ROW_SIZE = 2048; @@ -838,7 +796,7 @@ public class TestDefaultMemStore extends TestCase { public void testUpsertMemstoreSize() throws Exception { Configuration conf = HBaseConfiguration.create(); memstore = new DefaultMemStore(conf, CellComparator.COMPARATOR); - long oldSize = memstore.size.get(); + long oldSize = memstore.size(); List l = new ArrayList(); KeyValue kv1 = KeyValueTestUtil.create("r", "f", "q", 100, "v"); @@ -849,18 +807,18 @@ public class TestDefaultMemStore extends TestCase { l.add(kv1); l.add(kv2); l.add(kv3); this.memstore.upsert(l, 2);// readpoint is 2 - long newSize = this.memstore.size.get(); + long newSize = this.memstore.size(); assert(newSize > oldSize); //The kv1 should be removed. - assert(memstore.cellSet.size() == 2); + assert(memstore.getActive().getCellsCount() == 2); KeyValue kv4 = KeyValueTestUtil.create("r", "f", "q", 104, "v"); kv4.setSequenceId(1); l.clear(); l.add(kv4); this.memstore.upsert(l, 3); - assertEquals(newSize, this.memstore.size.get()); + assertEquals(newSize, this.memstore.size()); //The kv2 should be removed. - assert(memstore.cellSet.size() == 2); + assert(memstore.getActive().getCellsCount() == 2); //this.memstore = null; } @@ -1021,10 +979,11 @@ public class TestDefaultMemStore extends TestCase { private long runSnapshot(final DefaultMemStore hmc) throws UnexpectedStateException { // Save off old state. - int oldHistorySize = hmc.snapshot.size(); + int oldHistorySize = hmc.getSnapshot().getCellsCount(); MemStoreSnapshot snapshot = hmc.snapshot(); // Make some assertions about what just happened. - assertTrue("History size has not increased", oldHistorySize < hmc.snapshot.size()); + assertTrue("History size has not increased", oldHistorySize < hmc.getSnapshot().getCellsCount + ()); long t = memstore.timeOfOldestEdit(); assertTrue("Time of oldest edit is not Long.MAX_VALUE", t == Long.MAX_VALUE); hmc.clearSnapshot(snapshot.getId()); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index 385048c4a7c..b237490bff5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -18,20 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import java.io.IOException; -import java.security.Key; -import java.security.SecureRandom; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.NavigableSet; -import java.util.concurrent.ConcurrentSkipListSet; - -import javax.crypto.spec.SecretKeySpec; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -74,6 +60,19 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.mockito.Mockito; +import javax.crypto.spec.SecretKeySpec; +import java.io.IOException; +import java.security.Key; +import java.security.SecureRandom; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.NavigableSet; +import java.util.concurrent.ConcurrentSkipListSet; + @Category(MediumTests.class) public class TestHMobStore { public static final Log LOG = LogFactory.getLog(TestHMobStore.class); @@ -468,7 +467,7 @@ public class TestHMobStore { this.store.snapshot(); flushStore(store, id++); Assert.assertEquals(storeFilesSize, this.store.getStorefiles().size()); - Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size()); + Assert.assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 7add8a9b143..a5574d3a232 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -18,54 +18,10 @@ */ package org.apache.hadoop.hbase.regionserver; - -import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; -import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; -import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; -import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; -import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; -import static org.junit.Assert.assertArrayEquals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.io.IOException; -import java.io.InterruptedIOException; -import java.security.PrivilegedExceptionAction; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableMap; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.protobuf.ByteString; import org.apache.commons.lang.RandomStringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -176,10 +132,52 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.protobuf.ByteString; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.security.PrivilegedExceptionAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; + +import static org.apache.hadoop.hbase.HBaseTestingUtility.COLUMNS; +import static org.apache.hadoop.hbase.HBaseTestingUtility.FIRST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.LAST_CHAR; +import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam2; +import static org.apache.hadoop.hbase.HBaseTestingUtility.fam3; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; /** * Basic stand-alone testing of HRegion. No clusters! @@ -302,8 +300,6 @@ public class TestHRegion { HBaseTestingUtility.closeRegionAndWAL(region); } - - /* * This test is for verifying memstore snapshot size is correctly updated in case of rollback * See HBASE-10845 @@ -332,7 +328,7 @@ public class TestHRegion { Path rootDir = new Path(dir + "testMemstoreSnapshotSize"); MyFaultyFSLog faultyLog = new MyFaultyFSLog(fs, rootDir, "testMemstoreSnapshotSize", CONF); HRegion region = initHRegion(tableName, null, null, name.getMethodName(), - CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); + CONF, false, Durability.SYNC_WAL, faultyLog, COLUMN_FAMILY_BYTES); Store store = region.getStore(COLUMN_FAMILY_BYTES); // Get some random bytes. @@ -1289,7 +1285,8 @@ public class TestHRegion { private final AtomicInteger count; private Exception e; - GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, final AtomicInteger c) { + GetTillDoneOrException(final int i, final byte[] r, final AtomicBoolean d, + final AtomicInteger c) { super("getter." + i); this.g = new Get(r); this.done = d; @@ -2452,10 +2449,10 @@ public class TestHRegion { // This is kinda hacky, but better than nothing... long now = System.currentTimeMillis(); DefaultMemStore memstore = (DefaultMemStore) ((HStore) region.getStore(fam1)).memstore; - Cell firstCell = memstore.cellSet.first(); + Cell firstCell = memstore.getActive().first(); assertTrue(firstCell.getTimestamp() <= now); now = firstCell.getTimestamp(); - for (Cell cell : memstore.cellSet) { + for (Cell cell : memstore.getActive().getCellSet()) { assertTrue(cell.getTimestamp() <= now); now = cell.getTimestamp(); } @@ -2782,7 +2779,8 @@ public class TestHRegion { } catch (NotServingRegionException e) { // this is the correct exception that is expected } catch (IOException e) { - fail("Got wrong type of exception - should be a NotServingRegionException, but was an IOException: " + fail("Got wrong type of exception - should be a NotServingRegionException, " + + "but was an IOException: " + e.getMessage()); } } finally { @@ -2980,7 +2978,8 @@ public class TestHRegion { } @Test - public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws IOException { + public void testScanner_ExplicitColumns_FromMemStoreAndFiles_EnforceVersions() throws + IOException { byte[] row1 = Bytes.toBytes("row1"); byte[] fam1 = Bytes.toBytes("fam1"); byte[][] families = { fam1 }; @@ -4978,7 +4977,8 @@ public class TestHRegion { // move the file of the primary region to the archive, simulating a compaction Collection storeFiles = primaryRegion.getStore(families[0]).getStorefiles(); primaryRegion.getRegionFileSystem().removeStoreFiles(Bytes.toString(families[0]), storeFiles); - Collection storeFileInfos = primaryRegion.getRegionFileSystem().getStoreFiles(families[0]); + Collection storeFileInfos = primaryRegion.getRegionFileSystem() + .getStoreFiles(families[0]); Assert.assertTrue(storeFileInfos == null || storeFileInfos.size() == 0); verifyData(secondaryRegion, 0, 1000, cq, families); @@ -4992,7 +4992,8 @@ public class TestHRegion { } } - private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws IOException { + private void putData(int startRow, int numRows, byte[] qf, byte[]... families) throws + IOException { putData(this.region, startRow, numRows, qf, families); } @@ -5085,7 +5086,6 @@ public class TestHRegion { /** * Test that we get the expected flush results back - * @throws IOException */ @Test public void testFlushResult() throws IOException { @@ -5138,11 +5138,6 @@ public class TestHRegion { } /** - * @param tableName - * @param callingMethod - * @param conf - * @param families - * @throws IOException * @return A region on which you must call * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. */ @@ -5152,12 +5147,6 @@ public class TestHRegion { } /** - * @param tableName - * @param callingMethod - * @param conf - * @param isReadOnly - * @param families - * @throws IOException * @return A region on which you must call * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. */ @@ -5177,14 +5166,6 @@ public class TestHRegion { } /** - * @param tableName - * @param startKey - * @param stopKey - * @param callingMethod - * @param conf - * @param isReadOnly - * @param families - * @throws IOException * @return A region on which you must call * {@link HBaseTestingUtility#closeRegionAndWAL(HRegion)} when done. */ @@ -5676,7 +5657,8 @@ public class TestHRegion { currRow.clear(); hasNext = scanner.next(currRow); assertEquals(2, currRow.size()); - assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), currRow.get(0).getRowLength(), row4, 0, + assertTrue(Bytes.equals(currRow.get(0).getRowArray(), currRow.get(0).getRowOffset(), + currRow.get(0).getRowLength(), row4, 0, row4.length)); assertTrue(hasNext); // 2. scan out "row3" (2 kv) @@ -6088,7 +6070,7 @@ public class TestHRegion { public void testOpenRegionWrittenToWALForLogReplay() throws Exception { // similar to the above test but with distributed log replay final ServerName serverName = ServerName.valueOf("testOpenRegionWrittenToWALForLogReplay", - 100, 42); + 100, 42); final RegionServerServices rss = spy(TEST_UTIL.createMockRegionServerService(serverName)); HTableDescriptor htd diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java index 80333e8087d..b5e979890c9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStoreChunkPool.java @@ -18,12 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.util.List; -import java.util.Random; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.testclassification.RegionServerTests; @@ -36,6 +30,13 @@ import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; +import java.io.IOException; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * Test the {@link MemStoreChunkPool} class */ @@ -47,7 +48,7 @@ public class TestMemStoreChunkPool { @BeforeClass public static void setUpBeforeClass() throws Exception { - conf.setBoolean(DefaultMemStore.USEMSLAB_KEY, true); + conf.setBoolean(SegmentFactory.USEMSLAB_KEY, true); conf.setFloat(MemStoreChunkPool.CHUNK_POOL_MAXSIZE_KEY, 0.2f); chunkPoolDisabledBeforeTest = MemStoreChunkPool.chunkPoolDisabled; MemStoreChunkPool.chunkPoolDisabled = false; @@ -116,13 +117,13 @@ public class TestMemStoreChunkPool { // Creating a snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.snapshot.size()); + assertEquals(3, memstore.getSnapshot().getCellsCount()); // Adding value to "new" memstore - assertEquals(0, memstore.cellSet.size()); + assertEquals(0, memstore.getActive().getCellsCount()); memstore.add(new KeyValue(row, fam, qf4, val)); memstore.add(new KeyValue(row, fam, qf5, val)); - assertEquals(2, memstore.cellSet.size()); + assertEquals(2, memstore.getActive().getCellsCount()); memstore.clearSnapshot(snapshot.getId()); int chunkCount = chunkPool.getPoolSize(); @@ -132,7 +133,7 @@ public class TestMemStoreChunkPool { @Test public void testPuttingBackChunksWithOpeningScanner() - throws UnexpectedStateException { + throws IOException { byte[] row = Bytes.toBytes("testrow"); byte[] fam = Bytes.toBytes("testfamily"); byte[] qf1 = Bytes.toBytes("testqualifier1"); @@ -153,13 +154,13 @@ public class TestMemStoreChunkPool { // Creating a snapshot MemStoreSnapshot snapshot = memstore.snapshot(); - assertEquals(3, memstore.snapshot.size()); + assertEquals(3, memstore.getSnapshot().getCellsCount()); // Adding value to "new" memstore - assertEquals(0, memstore.cellSet.size()); + assertEquals(0, memstore.getActive().getCellsCount()); memstore.add(new KeyValue(row, fam, qf4, val)); memstore.add(new KeyValue(row, fam, qf5, val)); - assertEquals(2, memstore.cellSet.size()); + assertEquals(2, memstore.getActive().getCellsCount()); // opening scanner before clear the snapshot List scanners = memstore.getScanners(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java index 354ea2d00ca..0a67ff83270 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java @@ -27,6 +27,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import com.google.common.collect.Lists; import java.io.IOException; import java.lang.ref.SoftReference; import java.security.PrivilegedExceptionAction; @@ -92,8 +93,6 @@ import org.junit.experimental.categories.Category; import org.junit.rules.TestName; import org.mockito.Mockito; -import com.google.common.collect.Lists; - /** * Test class for the Store */ @@ -555,7 +554,7 @@ public class TestStore { this.store.snapshot(); flushStore(store, id++); Assert.assertEquals(storeFilessize, this.store.getStorefiles().size()); - Assert.assertEquals(0, ((DefaultMemStore)this.store.memstore).cellSet.size()); + Assert.assertEquals(0, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); } private void assertCheck() { @@ -600,7 +599,7 @@ public class TestStore { flushStore(store, id++); Assert.assertEquals(1, this.store.getStorefiles().size()); // from the one we inserted up there, and a new one - Assert.assertEquals(2, ((DefaultMemStore)this.store.memstore).cellSet.size()); + Assert.assertEquals(2, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); // how many key/values for this row are there? Get get = new Get(row); @@ -669,7 +668,7 @@ public class TestStore { } long computedSize=0; - for (Cell cell : ((DefaultMemStore)this.store.memstore).cellSet) { + for (Cell cell : ((AbstractMemStore)this.store.memstore).getActive().getCellSet()) { long kvsize = DefaultMemStore.heapSizeChange(cell, true); //System.out.println(kv + " size= " + kvsize + " kvsize= " + kv.heapSize()); computedSize += kvsize; @@ -701,7 +700,7 @@ public class TestStore { // then flush. flushStore(store, id++); Assert.assertEquals(1, this.store.getStorefiles().size()); - Assert.assertEquals(1, ((DefaultMemStore)this.store.memstore).cellSet.size()); + Assert.assertEquals(1, ((AbstractMemStore)this.store.memstore).getActive().getCellsCount()); // now increment again: newValue += 1;