HBASE-17291 Remove ImmutableSegment#getKeyValueScanner (Ram)

This commit is contained in:
Ramkrishna 2017-01-09 12:06:25 +05:30
parent 97fd9051f4
commit f65a439f01
7 changed files with 247 additions and 19 deletions

View File

@ -27,14 +27,13 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import java.io.IOException;
/**
* ImmutableSegment is an abstract class that extends the API supported by a {@link Segment},
* and is not needed for a {@link MutableSegment}. Specifically, the method
* {@link ImmutableSegment#getKeyValueScanner()} builds a special scanner for the
* {@link ImmutableSegment#getSnapshotScanner()} builds a special scanner for the
* {@link MemStoreSnapshot} object.
*/
@InterfaceAudience.Private
@ -127,8 +126,8 @@ public class ImmutableSegment extends Segment {
* general segment scanner.
* @return a special scanner for the MemStoreSnapshot object
*/
public KeyValueScanner getKeyValueScanner() {
return new CollectionBackedScanner(getCellSet(), getComparator());
public KeyValueScanner getSnapshotScanner() {
return new SnapshotScanner(this);
}
@Override

View File

@ -40,7 +40,7 @@ public class MemStoreSnapshot {
this.dataSize = snapshot.keySize();
this.heapOverhead = snapshot.heapOverhead();
this.timeRangeTracker = snapshot.getTimeRangeTracker();
this.scanner = snapshot.getKeyValueScanner();
this.scanner = snapshot.getSnapshotScanner();
this.tagsPresent = snapshot.isTagsPresent();
}

View File

@ -41,14 +41,14 @@ public class SegmentScanner implements KeyValueScanner {
private static final long DEFAULT_SCANNER_ORDER = Long.MAX_VALUE;
// the observed structure
private final Segment segment;
protected final Segment segment;
// the highest relevant MVCC
private long readPoint;
// the current iterator that can be reinitialized by
// seek(), backwardSeek(), or reseek()
private Iterator<Cell> iter;
protected Iterator<Cell> iter;
// the pre-calculated cell to be returned by peek()
private Cell current = null;
protected Cell current = null;
// or next()
// A flag represents whether could stop skipping KeyValues for MVCC
// if have encountered the next row. Only used for reversed scan
@ -57,7 +57,7 @@ public class SegmentScanner implements KeyValueScanner {
private Cell last = null;
// flag to indicate if this scanner is closed
private boolean closed = false;
protected boolean closed = false;
protected SegmentScanner(Segment segment, long readPoint) {
this(segment, readPoint, DEFAULT_SCANNER_ORDER);
@ -74,7 +74,7 @@ public class SegmentScanner implements KeyValueScanner {
this.segment.incScannerCount();
iter = segment.iterator();
// the initialization of the current is required for working with heap of SegmentScanners
current = getNext();
updateCurrent();
this.scannerOrder = scannerOrder;
if (current == null) {
// nothing to fetch from this scanner
@ -108,7 +108,7 @@ public class SegmentScanner implements KeyValueScanner {
return null;
}
Cell oldCurrent = current;
current = getNext(); // update the currently observed Cell
updateCurrent(); // update the currently observed Cell
return oldCurrent;
}
@ -127,13 +127,17 @@ public class SegmentScanner implements KeyValueScanner {
return false;
}
// restart the iterator from new key
iter = segment.tailSet(cell).iterator();
iter = getIterator(cell);
// last is going to be reinitialized in the next getNext() call
last = null;
current = getNext();
updateCurrent();
return (current != null);
}
protected Iterator<Cell> getIterator(Cell cell) {
return segment.tailSet(cell).iterator();
}
/**
* Reseek the scanner at or after the specified KeyValue.
* This method is guaranteed to seek at or after the required key only if the
@ -156,8 +160,8 @@ public class SegmentScanner implements KeyValueScanner {
get it. So we remember the last keys we iterated to and restore
the reseeked set to at least that point.
*/
iter = segment.tailSet(getHighest(cell, last)).iterator();
current = getNext();
iter = getIterator(getHighest(cell, last));
updateCurrent();
return (current != null);
}
@ -355,7 +359,7 @@ public class SegmentScanner implements KeyValueScanner {
* Private internal method for iterating over the segment,
* skipping the cells with irrelevant MVCC
*/
private Cell getNext() {
protected void updateCurrent() {
Cell startKV = current;
Cell next = null;
@ -363,16 +367,18 @@ public class SegmentScanner implements KeyValueScanner {
while (iter.hasNext()) {
next = iter.next();
if (next.getSequenceId() <= this.readPoint) {
return next; // skip irrelevant versions
current = next;
return;// skip irrelevant versions
}
if (stopSkippingKVsIfNextRow && // for backwardSeek() stay in the
startKV != null && // boundaries of a single row
segment.compareRows(next, startKV) > 0) {
return null;
current = null;
return;
}
} // end of while
return null; // nothing found
current = null; // nothing found
} finally {
if (next != null) {
// in all cases, remember the last KV we iterated to, needed for reseek()

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
/**
* Scans the snapshot. Acts as a simple scanner that just iterates over all the cells
* in the segment
*/
@InterfaceAudience.Private
public class SnapshotScanner extends SegmentScanner {
public SnapshotScanner(Segment immutableSegment) {
// Snapshot scanner does not need readpoint. It should read all the cells in the
// segment
super(immutableSegment, Long.MAX_VALUE);
}
@Override
public Cell peek() { // sanity check, the current should be always valid
if (closed) {
return null;
}
return current;
}
@Override
public boolean shouldUseScanner(Scan scan, Store store, long oldestUnexpiredTS) {
return true;
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
throw new NotImplementedException(
"backwardSeek must not be called on a " + "non-reversed scanner");
}
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
throw new NotImplementedException(
"seekToPreviousRow must not be called on a " + "non-reversed scanner");
}
@Override
public boolean seekToLastRow() throws IOException {
throw new NotImplementedException(
"seekToLastRow must not be called on a " + "non-reversed scanner");
}
@Override
protected Iterator<Cell> getIterator(Cell cell) {
return segment.iterator();
}
@Override
protected void updateCurrent() {
if (iter.hasNext()) {
current = iter.next();
} else {
current = null;
}
}
@Override
public boolean seek(Cell seekCell) {
// restart iterator
iter = getIterator(seekCell);
return reseek(seekCell);
}
@Override
public boolean reseek(Cell seekCell) {
while (iter.hasNext()) {
Cell next = iter.next();
int ret = segment.getComparator().compare(next, seekCell);
if (ret >= 0) {
current = next;
return true;
}
}
return false;
}
}

View File

@ -393,6 +393,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// close the scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
@ -433,6 +435,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
// close the scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
@ -460,6 +464,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}
@ -527,6 +533,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
// Creating another snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
// close the scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
snapshot = memstore.snapshot();
@ -541,6 +549,8 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}

View File

@ -387,6 +387,108 @@ public class TestCompactingToCellArrayMapMemStore extends TestCompactingMemStore
}
}
@Override
@Test
public void testPuttingBackChunksWithOpeningScanner() throws IOException {
byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf1 = Bytes.toBytes("testqualifier1");
byte[] qf2 = Bytes.toBytes("testqualifier2");
byte[] qf3 = Bytes.toBytes("testqualifier3");
byte[] qf4 = Bytes.toBytes("testqualifier4");
byte[] qf5 = Bytes.toBytes("testqualifier5");
byte[] qf6 = Bytes.toBytes("testqualifier6");
byte[] qf7 = Bytes.toBytes("testqualifier7");
byte[] val = Bytes.toBytes("testval");
// Setting up memstore
memstore.add(new KeyValue(row, fam, qf1, val), null);
memstore.add(new KeyValue(row, fam, qf2, val), null);
memstore.add(new KeyValue(row, fam, qf3, val), null);
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.getSnapshot().getCellsCount());
// Adding value to "new" memstore
assertEquals(0, memstore.getActive().getCellsCount());
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// opening scanner before clear the snapshot
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
// close the scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
// Chunks will be put back to pool after close scanners;
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
assertTrue(chunkPool.getPoolSize() > 0);
// clear chunks
chunkPool.clearChunks();
// Creating another snapshot
snapshot = memstore.snapshot();
// Adding more value
memstore.add(new KeyValue(row, fam, qf6, val), null);
memstore.add(new KeyValue(row, fam, qf7, val), null);
// opening scanners
scanners = memstore.getScanners(0);
// close scanners before clear the snapshot
for (KeyValueScanner scanner : scanners) {
scanner.close();
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}
@Test
public void testPuttingBackChunksAfterFlushing() throws IOException {
byte[] row = Bytes.toBytes("testrow");
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf1 = Bytes.toBytes("testqualifier1");
byte[] qf2 = Bytes.toBytes("testqualifier2");
byte[] qf3 = Bytes.toBytes("testqualifier3");
byte[] qf4 = Bytes.toBytes("testqualifier4");
byte[] qf5 = Bytes.toBytes("testqualifier5");
byte[] val = Bytes.toBytes("testval");
// Setting up memstore
memstore.add(new KeyValue(row, fam, qf1, val), null);
memstore.add(new KeyValue(row, fam, qf2, val), null);
memstore.add(new KeyValue(row, fam, qf3, val), null);
// Creating a snapshot
MemStoreSnapshot snapshot = memstore.snapshot();
assertEquals(3, memstore.getSnapshot().getCellsCount());
// Adding value to "new" memstore
assertEquals(0, memstore.getActive().getCellsCount());
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// close the scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
assertTrue(chunkCount > 0);
}
private long addRowsByKeys(final AbstractMemStore hmc, String[] keys) {
byte[] fam = Bytes.toBytes("testfamily");
byte[] qf = Bytes.toBytes("testqualifier");

View File

@ -135,6 +135,8 @@ public class TestMemStoreChunkPool {
memstore.add(new KeyValue(row, fam, qf4, val), null);
memstore.add(new KeyValue(row, fam, qf5, val), null);
assertEquals(2, memstore.getActive().getCellsCount());
// close the scanner - this is how the snapshot will be used
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
int chunkCount = chunkPool.getPoolSize();
@ -177,6 +179,8 @@ public class TestMemStoreChunkPool {
List<KeyValueScanner> scanners = memstore.getScanners(0);
// Shouldn't putting back the chunks to pool,since some scanners are opening
// based on their data
// close the snapshot scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() == 0);
@ -203,6 +207,8 @@ public class TestMemStoreChunkPool {
}
// Since no opening scanner, the chunks of snapshot should be put back to
// pool
// close the snapshot scanner
snapshot.getScanner().close();
memstore.clearSnapshot(snapshot.getId());
assertTrue(chunkPool.getPoolSize() > 0);
}