HBASE-15359 Simplifying segment hierarchy

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
eshcar 2016-03-01 10:25:39 +02:00 committed by stack
parent 3fa77a1845
commit dc44943666
10 changed files with 371 additions and 631 deletions

View File

@ -226,6 +226,23 @@ public abstract class AbstractMemStore implements MemStore {
return getSnapshot().getSize();
}
/**
* Remove n key from the memstore. Only cells that have the same key and the
* same memstoreTS are removed. It is ok to not update timeRangeTracker
* in this call. It is possible that we can optimize this method by using
* tailMap/iterator, but since this method is called rarely (only for
* error recovery), we can leave those optimization for the future.
* @param cell
*/
@Override
public void rollback(Cell cell) {
// If the key is in the active, delete it. Update this.size.
long sz = active.rollback(cell);
if (sz != 0) {
setOldestEditTimeToNow();
}
}
@Override
public String toString() {
StringBuffer buf = new StringBuffer();
@ -241,23 +258,6 @@ public abstract class AbstractMemStore implements MemStore {
return buf.toString();
}
protected void rollbackInSnapshot(Cell cell) {
// If the key is in the snapshot, delete it. We should not update
// this.size, because that tracks the size of only the memstore and
// not the snapshot. The flush of this snapshot to disk has not
// yet started because Store.flush() waits for all rwcc transactions to
// commit before starting the flush to disk.
snapshot.rollback(cell);
}
protected void rollbackInActive(Cell cell) {
// If the key is in the memstore, delete it. Update this.size.
long sz = active.rollback(cell);
if (sz != 0) {
setOldestEditTimeToNow();
}
}
protected Configuration getConfiguration() {
return conf;
}

View File

@ -117,20 +117,6 @@ public class DefaultMemStore extends AbstractMemStore {
return list;
}
/**
* Remove n key from the memstore. Only cells that have the same key and the
* same memstoreTS are removed. It is ok to not update timeRangeTracker
* in this call. It is possible that we can optimize this method by using
* tailMap/iterator, but since this method is called rarely (only for
* error recovery), we can leave those optimization for the future.
* @param cell
*/
@Override
public void rollback(Cell cell) {
rollbackInSnapshot(cell);
rollbackInActive(cell);
}
/**
* @param cell Find the row that comes after this one. If null, we return the
* first.

View File

@ -18,55 +18,29 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
/**
* ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
* and is not needed for a {@link MutableSegment}. Specifically, the method
* {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the
* {@link MemStoreSnapshot} object.
* In addition, this class overrides methods that are not likely to be supported by an immutable
* segment, e.g. {@link Segment#rollback(Cell)} and {@link Segment#getCellSet()}, which
* can be very inefficient.
*/
@InterfaceAudience.Private
public abstract class ImmutableSegment extends Segment {
public class ImmutableSegment extends Segment {
public ImmutableSegment(Segment segment) {
protected ImmutableSegment(Segment segment) {
super(segment);
}
/**
* Removes the given cell from this segment.
* By default immutable store segment can not rollback
* It may be invoked by tests in specific cases where it is known to be supported {@link
* ImmutableSegmentAdapter}
*/
@Override
public long rollback(Cell cell) {
return 0;
}
/**
* Returns a set of all the cells in the segment.
* The implementation of this method might be very inefficient for some immutable segments
* that do not maintain a cell set. Therefore by default this method is not supported.
* It may be invoked by tests in specific cases where it is known to be supported {@link
* ImmutableSegmentAdapter}
*/
@Override
public CellSet getCellSet() {
throw new NotImplementedException("Immutable Segment does not support this operation by " +
"default");
}
/**
* Builds a special scanner for the MemStoreSnapshot object that may be different than the
* Builds a special scanner for the MemStoreSnapshot object that is different than the
* general segment scanner.
* @return a special scanner for the MemStoreSnapshot object
*/
public abstract KeyValueScanner getKeyValueScanner();
public KeyValueScanner getKeyValueScanner() {
return new CollectionBackedScanner(getCellSet(), getComparator());
}
}

View File

@ -1,107 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
/**
* This segment is adapting a mutable segment making it into an immutable segment.
* This is used when a mutable segment is moved to being a snapshot or pushed into a compaction
* pipeline, that consists only of immutable segments.
* The compaction may generate different type of immutable segment
*/
@InterfaceAudience.Private
public class ImmutableSegmentAdapter extends ImmutableSegment {
final private MutableSegment adaptee;
public ImmutableSegmentAdapter(MutableSegment segment) {
super(segment);
this.adaptee = segment;
}
@Override
public KeyValueScanner getKeyValueScanner() {
return new CollectionBackedScanner(adaptee.getCellSet(), adaptee.getComparator());
}
@Override
public SegmentScanner getSegmentScanner(long readPoint) {
return adaptee.getSegmentScanner(readPoint);
}
@Override
public boolean isEmpty() {
return adaptee.isEmpty();
}
@Override
public int getCellsCount() {
return adaptee.getCellsCount();
}
@Override
public long add(Cell cell) {
return adaptee.add(cell);
}
@Override
public Cell getFirstAfter(Cell cell) {
return adaptee.getFirstAfter(cell);
}
@Override
public void close() {
adaptee.close();
}
@Override
public Cell maybeCloneWithAllocator(Cell cell) {
return adaptee.maybeCloneWithAllocator(cell);
}
@Override
public Segment setSize(long size) {
adaptee.setSize(size);
return this;
}
@Override
public long getSize() {
return adaptee.getSize();
}
@Override
public long rollback(Cell cell) {
return adaptee.rollback(cell);
}
@Override
public CellSet getCellSet() {
return adaptee.getCellSet();
}
@Override
public void dump(Log log) {
adaptee.dump(log);
}
}

View File

@ -1,153 +0,0 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.Iterator;
import java.util.SortedSet;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* This mutable store segment encapsulates a mutable cell set and its respective memory allocation
* buffers (MSLAB).
*/
@InterfaceAudience.Private
final class MutableCellSetSegment extends MutableSegment {
private volatile CellSet cellSet;
private final CellComparator comparator;
// Instantiate objects only using factory
MutableCellSetSegment(CellSet cellSet, MemStoreLAB memStoreLAB, long size,
CellComparator comparator) {
super(memStoreLAB, size);
this.cellSet = cellSet;
this.comparator = comparator;
}
@Override
public SegmentScanner getSegmentScanner(long readPoint) {
return new MutableCellSetSegmentScanner(this, readPoint);
}
@Override
public boolean isEmpty() {
return getCellSet().isEmpty();
}
@Override
public int getCellsCount() {
return getCellSet().size();
}
@Override
public long add(Cell cell) {
boolean succ = getCellSet().add(cell);
long s = AbstractMemStore.heapSizeChange(cell, succ);
updateMetaInfo(cell, s);
// In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
// When we use ACL CP or Visibility CP which deals with Tags during
// mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
// parse the byte[] to identify the tags length.
if(cell.getTagsLength() > 0) {
tagsPresent = true;
}
return s;
}
@Override
public long rollback(Cell cell) {
Cell found = get(cell);
if (found != null && found.getSequenceId() == cell.getSequenceId()) {
long sz = AbstractMemStore.heapSizeChange(cell, true);
remove(cell);
incSize(-sz);
return sz;
}
return 0;
}
@Override
public Cell getFirstAfter(Cell cell) {
SortedSet<Cell> snTailSet = tailSet(cell);
if (!snTailSet.isEmpty()) {
return snTailSet.first();
}
return null;
}
@Override
public void dump(Log log) {
for (Cell cell: getCellSet()) {
log.debug(cell);
}
}
@Override
public SortedSet<Cell> tailSet(Cell firstCell) {
return getCellSet().tailSet(firstCell);
}
@Override
public CellSet getCellSet() {
return cellSet;
}
@Override
public CellComparator getComparator() {
return comparator;
}
//*** Methods for MemStoreSegmentsScanner
public Cell last() {
return getCellSet().last();
}
public Iterator<Cell> iterator() {
return getCellSet().iterator();
}
public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
return getCellSet().headSet(firstKeyOnRow);
}
public int compare(Cell left, Cell right) {
return getComparator().compare(left, right);
}
public int compareRows(Cell left, Cell right) {
return getComparator().compareRows(left, right);
}
private Cell get(Cell cell) {
return getCellSet().get(cell);
}
private boolean remove(Cell e) {
return getCellSet().remove(e);
}
// methods for tests
@Override
Cell first() {
return this.getCellSet().first();
}
}

View File

@ -1,258 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Iterator;
import java.util.SortedSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A scanner of a single cells segment {@link MutableCellSetSegment}.
*/
@InterfaceAudience.Private
class MutableCellSetSegmentScanner extends SegmentScanner {
// the observed structure
private final MutableCellSetSegment segment;
// the highest relevant MVCC
private long readPoint;
// the current iterator that can be reinitialized by
// seek(), backwardSeek(), or reseek()
private Iterator<Cell> iter;
// the pre-calculated cell to be returned by peek()
private Cell current = null;
// or next()
// A flag represents whether could stop skipping KeyValues for MVCC
// if have encountered the next row. Only used for reversed scan
private boolean stopSkippingKVsIfNextRow = false;
// last iterated KVs by seek (to restore the iterator state after reseek)
private Cell last = null;
public MutableCellSetSegmentScanner(MutableCellSetSegment segment, long readPoint) {
super();
this.segment = segment;
this.readPoint = readPoint;
iter = segment.iterator();
// the initialization of the current is required for working with heap of SegmentScanners
current = getNext();
//increase the reference count so the underlying structure will not be de-allocated
this.segment.incScannerCount();
}
/**
* Look at the next Cell in this scanner, but do not iterate the scanner
* @return the currently observed Cell
*/
@Override
public Cell peek() { // sanity check, the current should be always valid
if (current!=null && current.getSequenceId() > readPoint) {
throw new RuntimeException("current is invalid: read point is "+readPoint+", " +
"while current sequence id is " +current.getSequenceId());
}
return current;
}
/**
* Return the next Cell in this scanner, iterating the scanner
* @return the next Cell or null if end of scanner
*/
@Override
public Cell next() throws IOException {
Cell oldCurrent = current;
current = getNext(); // update the currently observed Cell
return oldCurrent;
}
/**
* Seek the scanner at or after the specified Cell.
* @param cell seek value
* @return true if scanner has values left, false if end of scanner
*/
@Override
public boolean seek(Cell cell) throws IOException {
if(cell == null) {
close();
return false;
}
// restart the iterator from new key
iter = segment.tailSet(cell).iterator();
// last is going to be reinitialized in the next getNext() call
last = null;
current = getNext();
return (current != null);
}
/**
* Reseek the scanner at or after the specified KeyValue.
* This method is guaranteed to seek at or after the required key only if the
* key comes after the current position of the scanner. Should not be used
* to seek to a key which may come before the current position.
*
* @param cell seek value (should be non-null)
* @return true if scanner has values left, false if end of scanner
*/
@Override
public boolean reseek(Cell cell) throws IOException {
/*
See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
This code is executed concurrently with flush and puts, without locks.
The ideal implementation for performance would use the sub skip list implicitly
pointed by the iterator. Unfortunately the Java API does not offer a method to
get it. So we remember the last keys we iterated to and restore
the reseeked set to at least that point.
*/
iter = segment.tailSet(getHighest(cell, last)).iterator();
current = getNext();
return (current != null);
}
/**
* Seek the scanner at or before the row of specified Cell, it firstly
* tries to seek the scanner at or after the specified Cell, return if
* peek KeyValue of scanner has the same row with specified Cell,
* otherwise seek the scanner at the first Cell of the row which is the
* previous row of specified KeyValue
*
* @param key seek Cell
* @return true if the scanner is at the valid KeyValue, false if such Cell does not exist
*/
@Override
public boolean backwardSeek(Cell key) throws IOException {
seek(key); // seek forward then go backward
if (peek() == null || segment.compareRows(peek(), key) > 0) {
return seekToPreviousRow(key);
}
return true;
}
/**
* Seek the scanner at the first Cell of the row which is the previous row
* of specified key
*
* @param cell seek value
* @return true if the scanner at the first valid Cell of previous row,
* false if not existing such Cell
*/
@Override
public boolean seekToPreviousRow(Cell cell) throws IOException {
boolean keepSeeking = false;
Cell key = cell;
do {
Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
SortedSet<Cell> cellHead = segment.headSet(firstKeyOnRow);
Cell lastCellBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
if (lastCellBeforeRow == null) {
current = null;
return false;
}
Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow);
this.stopSkippingKVsIfNextRow = true;
seek(firstKeyOnPreviousRow);
this.stopSkippingKVsIfNextRow = false;
if (peek() == null
|| segment.getComparator().compareRows(peek(), firstKeyOnPreviousRow) > 0) {
keepSeeking = true;
key = firstKeyOnPreviousRow;
continue;
} else {
keepSeeking = false;
}
} while (keepSeeking);
return true;
}
/**
* Seek the scanner at the first KeyValue of last row
*
* @return true if scanner has values left, false if the underlying data is empty
*/
@Override
public boolean seekToLastRow() throws IOException {
Cell higherCell = segment.isEmpty() ? null : segment.last();
if (higherCell == null) {
return false;
}
Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell);
if (seek(firstCellOnLastRow)) {
return true;
} else {
return seekToPreviousRow(higherCell);
}
}
@Override protected Segment getSegment() {
return segment;
}
/********************* Private Methods **********************/
/**
* Private internal method for iterating over the segment,
* skipping the cells with irrelevant MVCC
*/
private Cell getNext() {
Cell startKV = current;
Cell next = null;
try {
while (iter.hasNext()) {
next = iter.next();
if (next.getSequenceId() <= this.readPoint) {
return next; // skip irrelevant versions
}
if (stopSkippingKVsIfNextRow && // for backwardSeek() stay in the
startKV != null && // boundaries of a single row
segment.compareRows(next, startKV) > 0) {
return null;
}
} // end of while
return null; // nothing found
} finally {
if (next != null) {
// in all cases, remember the last KV we iterated to, needed for reseek()
last = next;
}
}
}
/**
* Private internal method that returns the higher of the two key values, or null
* if they are both null
*/
private Cell getHighest(Cell first, Cell second) {
if (first == null && second == null) {
return null;
}
if (first != null && second != null) {
int compare = segment.compare(first, second);
return (compare > 0 ? first : second);
}
return (first != null ? first : second);
}
}

View File

@ -18,34 +18,43 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.SortedSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* An abstraction of a mutable segment in memstore, specifically the active segment.
* A mutable segment in memstore, specifically the active segment.
*/
@InterfaceAudience.Private
public abstract class MutableSegment extends Segment {
public class MutableSegment extends Segment {
protected MutableSegment(MemStoreLAB memStoreLAB, long size) {
super(memStoreLAB, size);
protected MutableSegment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB,
long size) {
super(cellSet, comparator, memStoreLAB, size);
}
/**
* Returns a subset of the segment cell set, which starts with the given cell
* @param firstCell a cell in the segment
* @return a subset of the segment cell set, which starts with the given cell
* Adds the given cell into the segment
* @return the change in the heap size
*/
public abstract SortedSet<Cell> tailSet(Cell firstCell);
public long add(Cell cell) {
return internalAdd(cell);
}
/**
* Returns the Cell comparator used by this segment
* @return the Cell comparator used by this segment
* Removes the given cell from the segment
* @return the change in the heap size
*/
public abstract CellComparator getComparator();
public long rollback(Cell cell) {
Cell found = getCellSet().get(cell);
if (found != null && found.getSequenceId() == cell.getSequenceId()) {
long sz = AbstractMemStore.heapSizeChange(cell, true);
getCellSet().remove(cell);
incSize(-sz);
return sz;
}
return 0;
}
//methods for test
@ -53,5 +62,7 @@ public abstract class MutableSegment extends Segment {
* Returns the first cell in the segment
* @return the first cell in the segment
*/
abstract Cell first();
Cell first() {
return this.getCellSet().first();
}
}

View File

@ -18,10 +18,13 @@
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.Iterator;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -38,12 +41,17 @@ import org.apache.hadoop.hbase.util.ByteRange;
@InterfaceAudience.Private
public abstract class Segment {
private volatile CellSet cellSet;
private final CellComparator comparator;
private volatile MemStoreLAB memStoreLAB;
private final AtomicLong size;
private final TimeRangeTracker timeRangeTracker;
protected volatile boolean tagsPresent;
protected Segment(MemStoreLAB memStoreLAB, long size) {
protected Segment(CellSet cellSet, CellComparator comparator, MemStoreLAB memStoreLAB, long
size) {
this.cellSet = cellSet;
this.comparator = comparator;
this.memStoreLAB = memStoreLAB;
this.size = new AtomicLong(size);
this.timeRangeTracker = new TimeRangeTracker();
@ -51,6 +59,8 @@ public abstract class Segment {
}
protected Segment(Segment segment) {
this.cellSet = segment.getCellSet();
this.comparator = segment.getComparator();
this.memStoreLAB = segment.getMemStoreLAB();
this.size = new AtomicLong(segment.getSize());
this.timeRangeTracker = segment.getTimeRangeTracker();
@ -58,46 +68,40 @@ public abstract class Segment {
}
/**
* Creates the scanner that is able to scan the concrete segment
* Creates the scanner for the given read point
* @return a scanner for the given read point
*/
public abstract SegmentScanner getSegmentScanner(long readPoint);
public SegmentScanner getSegmentScanner(long readPoint) {
return new SegmentScanner(this, readPoint);
}
/**
* Returns whether the segment has any cells
* @return whether the segment has any cells
*/
public abstract boolean isEmpty();
public boolean isEmpty() {
return getCellSet().isEmpty();
}
/**
* Returns number of cells in segment
* @return number of cells in segment
*/
public abstract int getCellsCount();
/**
* Adds the given cell into the segment
* @return the change in the heap size
*/
public abstract long add(Cell cell);
/**
* Removes the given cell from the segment
* @return the change in the heap size
*/
public abstract long rollback(Cell cell);
public int getCellsCount() {
return getCellSet().size();
}
/**
* Returns the first cell in the segment that has equal or greater key than the given cell
* @return the first cell in the segment that has equal or greater key than the given cell
*/
public abstract Cell getFirstAfter(Cell cell);
/**
* Returns a set of all cells in the segment
* @return a set of all cells in the segment
*/
public abstract CellSet getCellSet();
public Cell getFirstAfter(Cell cell) {
SortedSet<Cell> snTailSet = tailSet(cell);
if (!snTailSet.isEmpty()) {
return snTailSet.first();
}
return null;
}
/**
* Closing a segment before it is being discarded
@ -190,9 +194,69 @@ public abstract class Segment {
return timeRangeTracker;
}
//*** Methods for SegmentsScanner
public Cell last() {
return getCellSet().last();
}
public Iterator<Cell> iterator() {
return getCellSet().iterator();
}
public SortedSet<Cell> headSet(Cell firstKeyOnRow) {
return getCellSet().headSet(firstKeyOnRow);
}
public int compare(Cell left, Cell right) {
return getComparator().compare(left, right);
}
public int compareRows(Cell left, Cell right) {
return getComparator().compareRows(left, right);
}
/**
* Returns a set of all cells in the segment
* @return a set of all cells in the segment
*/
protected CellSet getCellSet() {
return cellSet;
}
/**
* Returns the Cell comparator used by this segment
* @return the Cell comparator used by this segment
*/
protected CellComparator getComparator() {
return comparator;
}
protected long internalAdd(Cell cell) {
boolean succ = getCellSet().add(cell);
long s = AbstractMemStore.heapSizeChange(cell, succ);
updateMetaInfo(cell, s);
return s;
}
protected void updateMetaInfo(Cell toAdd, long s) {
getTimeRangeTracker().includeTimestamp(toAdd);
size.addAndGet(s);
// In no tags case this NoTagsKeyValue.getTagsLength() is a cheap call.
// When we use ACL CP or Visibility CP which deals with Tags during
// mutation, the TagRewriteCell.getTagsLength() is a cheaper call. We do not
// parse the byte[] to identify the tags length.
if(toAdd.getTagsLength() > 0) {
tagsPresent = true;
}
}
/**
* Returns a subset of the segment cell set, which starts with the given cell
* @param firstCell a cell in the segment
* @return a subset of the segment cell set, which starts with the given cell
*/
protected SortedSet<Cell> tailSet(Cell firstCell) {
return getCellSet().tailSet(firstCell);
}
private MemStoreLAB getMemStoreLAB() {
@ -203,7 +267,11 @@ public abstract class Segment {
/**
* Dumps all cells of the segment into the given log
*/
public abstract void dump(Log log);
void dump(Log log) {
for (Cell cell: getCellSet()) {
log.debug(cell);
}
}
@Override
public String toString() {

View File

@ -54,7 +54,7 @@ public final class SegmentFactory {
}
public ImmutableSegment createImmutableSegment(final Configuration conf, MutableSegment segment) {
return generateImmutableSegment(conf, segment);
return new ImmutableSegment(segment);
}
public MutableSegment createMutableSegment(final Configuration conf,
CellComparator comparator, long size) {
@ -64,16 +64,11 @@ public final class SegmentFactory {
//****** private methods to instantiate concrete store segments **********//
private ImmutableSegment generateImmutableSegment(final Configuration conf,
MutableSegment segment) {
// TBD use configuration to set type of segment
return new ImmutableSegmentAdapter(segment);
}
private MutableSegment generateMutableSegment(
final Configuration conf, CellComparator comparator, MemStoreLAB memStoreLAB, long size) {
// TBD use configuration to set type of segment
CellSet set = new CellSet(comparator);
return new MutableCellSetSegment(set, memStoreLAB, size, comparator);
return new MutableSegment(set, comparator, memStoreLAB, size);
}
private MemStoreLAB getMemStoreLAB(Configuration conf) {

View File

@ -19,20 +19,193 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Iterator;
import java.util.SortedSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
/**
* An abstraction for store segment scanner.
* A scanner of a single memstore segment.
*/
@InterfaceAudience.Private
public abstract class SegmentScanner implements KeyValueScanner {
public class SegmentScanner implements KeyValueScanner {
private long sequenceID = Long.MAX_VALUE;
protected abstract Segment getSegment();
// the observed structure
private final Segment segment;
// the highest relevant MVCC
private long readPoint;
// the current iterator that can be reinitialized by
// seek(), backwardSeek(), or reseek()
private Iterator<Cell> iter;
// the pre-calculated cell to be returned by peek()
private Cell current = null;
// or next()
// A flag represents whether could stop skipping KeyValues for MVCC
// if have encountered the next row. Only used for reversed scan
private boolean stopSkippingKVsIfNextRow = false;
// last iterated KVs by seek (to restore the iterator state after reseek)
private Cell last = null;
protected SegmentScanner(Segment segment, long readPoint) {
this.segment = segment;
this.readPoint = readPoint;
iter = segment.iterator();
// the initialization of the current is required for working with heap of SegmentScanners
current = getNext();
//increase the reference count so the underlying structure will not be de-allocated
this.segment.incScannerCount();
}
/**
* Look at the next Cell in this scanner, but do not iterate the scanner
* @return the currently observed Cell
*/
@Override
public Cell peek() { // sanity check, the current should be always valid
if (current!=null && current.getSequenceId() > readPoint) {
throw new RuntimeException("current is invalid: read point is "+readPoint+", " +
"while current sequence id is " +current.getSequenceId());
}
return current;
}
/**
* Return the next Cell in this scanner, iterating the scanner
* @return the next Cell or null if end of scanner
*/
@Override
public Cell next() throws IOException {
Cell oldCurrent = current;
current = getNext(); // update the currently observed Cell
return oldCurrent;
}
/**
* Seek the scanner at or after the specified Cell.
* @param cell seek value
* @return true if scanner has values left, false if end of scanner
*/
@Override
public boolean seek(Cell cell) throws IOException {
if(cell == null) {
close();
return false;
}
// restart the iterator from new key
iter = segment.tailSet(cell).iterator();
// last is going to be reinitialized in the next getNext() call
last = null;
current = getNext();
return (current != null);
}
/**
* Reseek the scanner at or after the specified KeyValue.
* This method is guaranteed to seek at or after the required key only if the
* key comes after the current position of the scanner. Should not be used
* to seek to a key which may come before the current position.
*
* @param cell seek value (should be non-null)
* @return true if scanner has values left, false if end of scanner
*/
@Override
public boolean reseek(Cell cell) throws IOException {
/*
See HBASE-4195 & HBASE-3855 & HBASE-6591 for the background on this implementation.
This code is executed concurrently with flush and puts, without locks.
The ideal implementation for performance would use the sub skip list implicitly
pointed by the iterator. Unfortunately the Java API does not offer a method to
get it. So we remember the last keys we iterated to and restore
the reseeked set to at least that point.
*/
iter = segment.tailSet(getHighest(cell, last)).iterator();
current = getNext();
return (current != null);
}
/**
* Seek the scanner at or before the row of specified Cell, it firstly
* tries to seek the scanner at or after the specified Cell, return if
* peek KeyValue of scanner has the same row with specified Cell,
* otherwise seek the scanner at the first Cell of the row which is the
* previous row of specified KeyValue
*
* @param key seek Cell
* @return true if the scanner is at the valid KeyValue, false if such Cell does not exist
*/
@Override
public boolean backwardSeek(Cell key) throws IOException {
seek(key); // seek forward then go backward
if (peek() == null || segment.compareRows(peek(), key) > 0) {
return seekToPreviousRow(key);
}
return true;
}
/**
* Seek the scanner at the first Cell of the row which is the previous row
* of specified key
*
* @param cell seek value
* @return true if the scanner at the first valid Cell of previous row,
* false if not existing such Cell
*/
@Override
public boolean seekToPreviousRow(Cell cell) throws IOException {
boolean keepSeeking = false;
Cell key = cell;
do {
Cell firstKeyOnRow = CellUtil.createFirstOnRow(key);
SortedSet<Cell> cellHead = segment.headSet(firstKeyOnRow);
Cell lastCellBeforeRow = cellHead.isEmpty() ? null : cellHead.last();
if (lastCellBeforeRow == null) {
current = null;
return false;
}
Cell firstKeyOnPreviousRow = CellUtil.createFirstOnRow(lastCellBeforeRow);
this.stopSkippingKVsIfNextRow = true;
seek(firstKeyOnPreviousRow);
this.stopSkippingKVsIfNextRow = false;
if (peek() == null
|| segment.getComparator().compareRows(peek(), firstKeyOnPreviousRow) > 0) {
keepSeeking = true;
key = firstKeyOnPreviousRow;
continue;
} else {
keepSeeking = false;
}
} while (keepSeeking);
return true;
}
/**
* Seek the scanner at the first KeyValue of last row
*
* @return true if scanner has values left, false if the underlying data is empty
*/
@Override
public boolean seekToLastRow() throws IOException {
Cell higherCell = segment.isEmpty() ? null : segment.last();
if (higherCell == null) {
return false;
}
Cell firstCellOnLastRow = CellUtil.createFirstOnRow(higherCell);
if (seek(firstCellOnLastRow)) {
return true;
} else {
return seekToPreviousRow(higherCell);
}
}
/**
* Get the sequence id associated with this KeyValueScanner. This is required
@ -140,6 +313,10 @@ public abstract class SegmentScanner implements KeyValueScanner {
return getSegment().shouldSeek(scan,oldestUnexpiredTS);
}
protected Segment getSegment(){
return segment;
}
//debug method
@Override
public String toString() {
@ -149,4 +326,51 @@ public abstract class SegmentScanner implements KeyValueScanner {
return res;
}
/********************* Private Methods **********************/
/**
* Private internal method for iterating over the segment,
* skipping the cells with irrelevant MVCC
*/
private Cell getNext() {
Cell startKV = current;
Cell next = null;
try {
while (iter.hasNext()) {
next = iter.next();
if (next.getSequenceId() <= this.readPoint) {
return next; // skip irrelevant versions
}
if (stopSkippingKVsIfNextRow && // for backwardSeek() stay in the
startKV != null && // boundaries of a single row
segment.compareRows(next, startKV) > 0) {
return null;
}
} // end of while
return null; // nothing found
} finally {
if (next != null) {
// in all cases, remember the last KV we iterated to, needed for reseek()
last = next;
}
}
}
/**
* Private internal method that returns the higher of the two key values, or null
* if they are both null
*/
private Cell getHighest(Cell first, Cell second) {
if (first == null && second == null) {
return null;
}
if (first != null && second != null) {
int compare = segment.compare(first, second);
return (compare > 0 ? first : second);
}
return (first != null ? first : second);
}
}