HBASE-17917 Use pread by default for all user scan

This commit is contained in:
zhangduo 2017-05-10 14:15:44 +08:00
parent c5cc81d8e3
commit 0ae0edcd63
28 changed files with 541 additions and 262 deletions

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.io.hfile;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
@ -24,8 +27,6 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -53,9 +54,6 @@ import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.io.IOUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
/**
* Reads {@link HFile} version 2 blocks to HFiles and via {@link Cacheable} Interface to caches.
* Version 2 was introduced in hbase-0.92.0. No longer has support for version 1 blocks since
@ -1418,7 +1416,7 @@ public class HFileBlock implements Cacheable {
static class FSReaderImpl implements FSReader {
/** The file system stream of the underlying {@link HFile} that
* does or doesn't do checksum validations in the filesystem */
protected FSDataInputStreamWrapper streamWrapper;
private FSDataInputStreamWrapper streamWrapper;
private HFileBlockDecodingContext encodedBlockDecodingCtx;
@ -1434,22 +1432,18 @@ public class HFileBlock implements Cacheable {
private AtomicReference<PrefetchedHeader> prefetchedHeader = new AtomicReference<>(new PrefetchedHeader());
/** The size of the file we are reading from, or -1 if unknown. */
protected long fileSize;
private long fileSize;
/** The size of the header */
@VisibleForTesting
protected final int hdrSize;
/** The filesystem used to access data */
protected HFileSystem hfs;
private HFileSystem hfs;
private final Lock streamLock = new ReentrantLock();
/** The default buffer size for our buffered streams */
public static final int DEFAULT_BUFFER_SIZE = 1 << 20;
protected HFileContext fileContext;
private HFileContext fileContext;
// Cache the fileName
protected String pathName;
private String pathName;
FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path,
HFileContext fileContext) throws IOException {
@ -1524,39 +1518,33 @@ public class HFileBlock implements Cacheable {
* next header
* @throws IOException
*/
protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size,
@VisibleForTesting
protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size,
boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException {
if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) {
// We are asked to read the next block's header as well, but there is
// not enough room in the array.
throw new IOException("Attempted to read " + size + " bytes and " +
hdrSize + " bytes of next header into a " + dest.length +
"-byte array at offset " + destOffset);
throw new IOException("Attempted to read " + size + " bytes and " + hdrSize +
" bytes of next header into a " + dest.length + "-byte array at offset " + destOffset);
}
if (!pread && streamLock.tryLock()) {
if (!pread) {
// Seek + read. Better for scanning.
try {
HFileUtil.seekOnMultipleSources(istream, fileOffset);
HFileUtil.seekOnMultipleSources(istream, fileOffset);
long realOffset = istream.getPos();
if (realOffset != fileOffset) {
throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size +
" bytes, but pos=" + realOffset + " after seek");
}
long realOffset = istream.getPos();
if (realOffset != fileOffset) {
throw new IOException("Tried to seek to " + fileOffset + " to "
+ "read " + size + " bytes, but pos=" + realOffset
+ " after seek");
}
if (!peekIntoNextBlock) {
IOUtils.readFully(istream, dest, destOffset, size);
return -1;
}
if (!peekIntoNextBlock) {
IOUtils.readFully(istream, dest, destOffset, size);
return -1;
}
// Try to read the next block header.
if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
return -1;
}
} finally {
streamLock.unlock();
// Try to read the next block header.
if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) {
return -1;
}
} else {
// Positional read. Better for random reads; or when the streamLock is already locked.
@ -1565,7 +1553,6 @@ public class HFileBlock implements Cacheable {
return -1;
}
}
assert peekIntoNextBlock;
return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize;
}
@ -1719,6 +1706,7 @@ public class HFileBlock implements Cacheable {
* If HBase checksum is switched off, then use HDFS checksum.
* @return the HFileBlock or null if there is a HBase checksum mismatch
*/
@VisibleForTesting
protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset,
long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum)
throws IOException {
@ -1830,7 +1818,7 @@ public class HFileBlock implements Cacheable {
* If the block doesn't uses checksum, returns false.
* @return True if checksum matches, else false.
*/
protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize)
throws IOException {
// If this is an older version of the block that does not have checksums, then return false
// indicating that checksum verification did not succeed. Actually, this method should never

View File

@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.regionserver;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
@ -422,6 +424,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner
return 0;
}
@VisibleForTesting
KeyValueScanner getCurrentForTesting() {
return current;
}

View File

@ -22,6 +22,7 @@ import java.io.Closeable;
import java.io.IOException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
@ -134,6 +135,12 @@ public interface KeyValueScanner extends Shipper, Closeable {
*/
boolean isFileScanner();
/**
* @return the file path if this is a file scanner, otherwise null.
* @see #isFileScanner()
*/
Path getFilePath();
// Support for "Reversed Scanner"
/**
* Seek the scanner at or before the row of specified Cell, it firstly

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
@ -65,6 +66,14 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner {
// Not a file by default.
return false;
}
@Override
public Path getFilePath() {
// Not a file by default.
return null;
}
@Override
public Cell getNextIndexedKey() {
return null;

View File

@ -123,15 +123,17 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner {
@Override
public boolean seekToPreviousRow(Cell key) throws IOException {
boolean flushed = checkFlushed();
checkReseek(flushed);
if (checkFlushed()) {
reopenAfterFlush();
}
return this.heap.seekToPreviousRow(key);
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
boolean flushed = checkFlushed();
checkReseek(flushed);
if (checkFlushed()) {
reopenAfterFlush();
}
return this.heap.backwardSeek(key);
}
}

View File

@ -48,6 +48,7 @@ public class ScanInfo {
private boolean usePread;
private long cellsPerTimeoutCheck;
private boolean parallelSeekEnabled;
private final long preadMaxBytes;
private final Configuration conf;
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
@ -58,14 +59,14 @@ public class ScanInfo {
* @param conf
* @param family {@link HColumnDescriptor} describing the column family
* @param ttl Store's TTL (in ms)
* @param timeToPurgeDeletes duration in ms after which a delete marker can
* be purged during a major compaction.
* @param timeToPurgeDeletes duration in ms after which a delete marker can be purged during a
* major compaction.
* @param comparator The store's comparator
*/
public ScanInfo(final Configuration conf, final HColumnDescriptor family, final long ttl,
final long timeToPurgeDeletes, final CellComparator comparator) {
this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
.getKeepDeletedCells(), timeToPurgeDeletes, comparator);
this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl,
family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator);
}
/**
@ -74,6 +75,7 @@ public class ScanInfo {
* @param minVersions Store's MIN_VERSIONS setting
* @param maxVersions Store's VERSIONS setting
* @param ttl Store's TTL (in ms)
* @param blockSize Store's block size
* @param timeToPurgeDeletes duration in ms after which a delete marker can
* be purged during a major compaction.
* @param keepDeletedCells Store's keepDeletedCells setting
@ -81,7 +83,7 @@ public class ScanInfo {
*/
public ScanInfo(final Configuration conf, final byte[] family, final int minVersions,
final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells,
final long timeToPurgeDeletes, final CellComparator comparator) {
final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator) {
this.family = family;
this.minVersions = minVersions;
this.maxVersions = maxVersions;
@ -99,6 +101,7 @@ public class ScanInfo {
perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK;
this.parallelSeekEnabled =
conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false);
this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize);
this.conf = conf;
}
@ -149,4 +152,8 @@ public class ScanInfo {
public CellComparator getComparator() {
return comparator;
}
long getPreadMaxBytes() {
return preadMaxBytes;
}
}

View File

@ -23,6 +23,7 @@ import java.util.Iterator;
import java.util.SortedSet;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@ -316,6 +317,11 @@ public class SegmentScanner implements KeyValueScanner {
return false;
}
@Override
public Path getFilePath() {
return null;
}
/**
* @return the next key in the index (the key to seek to the next block)
* if known, or null otherwise
@ -396,5 +402,4 @@ public class SegmentScanner implements KeyValueScanner {
}
return (first != null ? first : second);
}
}

View File

@ -589,11 +589,17 @@ public class StoreFile {
return reader;
}
public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
boolean canOptimizeForNonNullColumn) {
return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
canOptimizeForNonNullColumn);
}
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
boolean pread, boolean isCompaction, long readPt, long scannerOrder,
boolean canOptimizeForNonNullColumn) throws IOException {
return createStreamReader(canUseDropBehind).getStoreFileScanner(
cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
throws IOException {
return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false,
isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn);
}
/**

View File

@ -564,4 +564,19 @@ public class StoreFileInfo {
hash = hash * 31 + ((link == null) ? 0 : link.hashCode());
return hash;
}
/**
* Return the active file name that contains the real data.
* <p>
* For referenced hfile, we will return the name of the reference file as it will be used to
* construct the StoreFileReader. And for linked hfile, we will return the name of the file being
* linked.
*/
public String getActiveFileName() {
if (reference != null || link == null) {
return initialPath.getName();
} else {
return HFileLink.getReferencedHFileName(initialPath.getName());
}
}
}

View File

@ -76,7 +76,8 @@ public class StoreFileReader {
// indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will
// close the internal reader when readCompleted is called.
private final boolean shared;
@VisibleForTesting
final boolean shared;
private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) {
this.reader = reader;

View File

@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.CellUtil;
@ -95,25 +96,21 @@ public class StoreFileScanner implements KeyValueScanner {
}
/**
* Return an array of scanners corresponding to the given
* set of store files.
* Return an array of scanners corresponding to the given set of store files.
*/
public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files,
boolean cacheBlocks,
boolean usePread, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks,
usePread, false, false, readPt);
public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
boolean cacheBlocks, boolean usePread, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt);
}
/**
* Return an array of scanners corresponding to the given set of store files.
*/
public static List<StoreFileScanner> getScannersForStoreFiles(
Collection<StoreFile> files, boolean cacheBlocks, boolean usePread,
boolean isCompaction, boolean useDropBehind, long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction,
useDropBehind, null, readPt);
public static List<StoreFileScanner> getScannersForStoreFiles(Collection<StoreFile> files,
boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind,
long readPt) throws IOException {
return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null,
readPt);
}
/**
@ -126,11 +123,17 @@ public class StoreFileScanner implements KeyValueScanner {
List<StoreFileScanner> scanners = new ArrayList<>(files.size());
List<StoreFile> sortedFiles = new ArrayList<>(files);
Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID);
boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false;
for (int i = 0, n = sortedFiles.size(); i < n; i++) {
StoreFile sf = sortedFiles.get(i);
sf.initReader();
StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread,
isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false);
StoreFileScanner scanner;
if (usePread) {
scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn);
} else {
scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i,
canOptimizeForNonNullColumn);
}
scanners.add(scanner);
}
return scanners;
@ -148,8 +151,8 @@ public class StoreFileScanner implements KeyValueScanner {
boolean succ = false;
try {
for (int i = 0, n = sortedFiles.size(); i < n; i++) {
scanners.add(sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, false, true,
readPt, i, false));
scanners.add(
sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false));
}
succ = true;
} finally {
@ -444,6 +447,11 @@ public class StoreFileScanner implements KeyValueScanner {
return true;
}
@Override
public Path getFilePath() {
return reader.getHFileReader().getPath();
}
// Test methods
static final long getSeekCount() {
return seekCount.sum();

View File

@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantLock;
@ -53,8 +55,12 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/**
* Scanner scans both the memstore and the Store. Coalesce KeyValue stream
* into List&lt;KeyValue&gt; for a single row.
* Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List&lt;KeyValue&gt;
* for a single row.
* <p>
* The implementation is not thread safe. So there will be no race between next and close. The only
* exception is updateReaders, it will be called in the memstore flush thread to indicate that there
* is a flush.
*/
@InterfaceAudience.Private
public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@ -62,36 +68,35 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private static final Log LOG = LogFactory.getLog(StoreScanner.class);
// In unit tests, the store could be null
protected final Store store;
protected ScanQueryMatcher matcher;
private ScanQueryMatcher matcher;
protected KeyValueHeap heap;
protected boolean cacheBlocks;
private boolean cacheBlocks;
protected long countPerRow = 0;
protected int storeLimit = -1;
protected int storeOffset = 0;
private long countPerRow = 0;
private int storeLimit = -1;
private int storeOffset = 0;
// Used to indicate that the scanner has closed (see HBASE-1107)
// Doesnt need to be volatile because it's always accessed via synchronized methods
protected boolean closing = false;
protected final boolean get;
protected final boolean explicitColumnQuery;
protected final boolean useRowColBloom;
// Do not need to be volatile because it's always accessed via synchronized methods
private boolean closing = false;
private final boolean get;
private final boolean explicitColumnQuery;
private final boolean useRowColBloom;
/**
* A flag that enables StoreFileScanner parallel-seeking
*/
protected boolean parallelSeekEnabled = false;
protected ExecutorService executor;
protected final Scan scan;
protected final NavigableSet<byte[]> columns;
protected final long oldestUnexpiredTS;
protected final long now;
protected final int minVersions;
protected final long maxRowSize;
protected final long cellsPerHeartbeatCheck;
private boolean parallelSeekEnabled = false;
private ExecutorService executor;
private final Scan scan;
private final long oldestUnexpiredTS;
private final long now;
private final int minVersions;
private final long maxRowSize;
private final long cellsPerHeartbeatCheck;
// Collects all the KVHeap that are eagerly getting closed during the
// course of a scan
protected List<KeyValueHeap> heapsForDelayedClose = new ArrayList<>();
private final List<KeyValueHeap> heapsForDelayedClose = new ArrayList<>();
/**
* The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not
@ -100,14 +105,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private long kvsScanned = 0;
private Cell prevCell = null;
private final long preadMaxBytes;
private long bytesRead;
/** We don't ever expect to change this, the constant is just for clarity. */
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
public static final String STORESCANNER_PARALLEL_SEEK_ENABLE =
"hbase.storescanner.parallel.seek.enable";
/** Used during unit testing to ensure that lazy seek does save seek ops */
protected static boolean lazySeekEnabledGlobally =
LAZY_SEEK_ENABLED_BY_DEFAULT;
private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT;
/**
* The number of cells scanned in between timeout checks. Specifying a larger value means that
@ -122,19 +129,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
*/
public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000;
// if heap == null and lastTop != null, you need to reseek given the key below
protected Cell lastTop = null;
/**
* If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned
* reaches this limit, we will reopen the scanner with stream. The default value is 4 times of
* block size for this store.
*/
public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes";
private final Scan.ReadType readType;
// A flag whether use pread for scan
private final boolean scanUsePread;
// it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data.
private boolean scanUsePread;
// Indicates whether there was flush during the course of the scan
protected volatile boolean flushed = false;
private volatile boolean flushed = false;
// generally we get one file from a flush
protected List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
private final List<StoreFile> flushedStoreFiles = new ArrayList<>(1);
// The current list of scanners
protected List<KeyValueScanner> currentScanners = new ArrayList<>();
private final List<KeyValueScanner> currentScanners = new ArrayList<>();
// flush update lock
private ReentrantLock flushLock = new ReentrantLock();
private final ReentrantLock flushLock = new ReentrantLock();
protected final long readPt;
@ -155,7 +169,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
int numCol = columns == null ? 0 : columns.size();
explicitColumnQuery = numCol > 0;
this.scan = scan;
this.columns = columns;
this.now = EnvironmentEdgeManager.currentTime();
this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl();
this.minVersions = scanInfo.getMinVersions();
@ -168,32 +181,31 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.maxRowSize = scanInfo.getTableMaxRowSize();
if (get) {
this.readType = Scan.ReadType.PREAD;
this.scanUsePread = true;
} else {
switch (scan.getReadType()) {
case STREAM:
this.scanUsePread = false;
break;
case PREAD:
this.scanUsePread = true;
break;
default:
this.scanUsePread = scanInfo.isUsePread();
break;
if (scan.getReadType() == Scan.ReadType.DEFAULT) {
this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT;
} else {
this.readType = scan.getReadType();
}
// Always start with pread unless user specific stream. Will change to stream later if
// readType is default if the scan keeps running for a long time.
this.scanUsePread = this.readType != Scan.ReadType.STREAM;
}
this.preadMaxBytes = scanInfo.getPreadMaxBytes();
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
// Parallel seeking is on if the config allows and more there is more than one store file.
if (this.store != null && this.store.getStorefilesCount() > 1) {
RegionServerServices rsService = ((HStore) store).getHRegion().getRegionServerServices();
if (rsService != null && scanInfo.isParallelSeekEnabled()) {
this.parallelSeekEnabled = true;
this.executor = rsService.getExecutorService();
}
}
this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck();
// Parallel seeking is on if the config allows and more there is more than one store file.
if (this.store != null && this.store.getStorefilesCount() > 1) {
RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices();
if (rsService != null && scanInfo.isParallelSeekEnabled()) {
this.parallelSeekEnabled = true;
this.executor = rsService.getExecutorService();
}
}
}
protected void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
private void addCurrentScanners(List<? extends KeyValueScanner> scanners) {
this.currentScanners.addAll(scanners);
}
@ -360,9 +372,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* Get a filtered list of scanners. Assumes we are not in a compaction.
* @return list of scanners to seek
*/
protected List<KeyValueScanner> getScannersNoCompaction() throws IOException {
return selectScannersFrom(store.getScanners(cacheBlocks, get, scanUsePread, false, matcher,
scan.getStartRow(), scan.getStopRow(), this.readPt));
private List<KeyValueScanner> getScannersNoCompaction() throws IOException {
return selectScannersFrom(
store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(),
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt));
}
/**
@ -413,7 +426,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
/**
* Filters the given list of scanners using Bloom filter, time range, and
* TTL.
* <p>
* Will be overridden by testcase so declared as protected.
*/
@VisibleForTesting
protected List<KeyValueScanner> selectScannersFrom(
final List<? extends KeyValueScanner> allScanners) {
boolean memOnly;
@ -451,10 +467,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public Cell peek() {
if (this.heap == null) {
return this.lastTop;
}
return this.heap.peek();
return heap != null ? heap.peek() : null;
}
@Override
@ -472,9 +485,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (this.closing) {
return;
}
if (withHeapClose) this.closing = true;
if (withHeapClose) {
this.closing = true;
}
// Under test, we dont have a this.store
if (this.store != null) this.store.deleteChangedReaderObserver(this);
if (this.store != null) {
this.store.deleteChangedReaderObserver(this);
}
if (withHeapClose) {
for (KeyValueHeap h : this.heapsForDelayedClose) {
h.close();
@ -492,14 +509,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this.heap = null;
}
}
this.lastTop = null; // If both are null, we are closed.
}
@Override
public boolean seek(Cell key) throws IOException {
boolean flushed = checkFlushed();
// reset matcher state, in case that underlying store changed
checkReseek(flushed);
if (checkFlushed()) {
reopenAfterFlush();
}
return this.heap.seek(key);
}
@ -519,8 +535,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (scannerContext == null) {
throw new IllegalArgumentException("Scanner context cannot be null");
}
boolean flushed = checkFlushed();
if (checkReseek(flushed)) {
trySwitchToStreamRead();
if (checkFlushed() && reopenAfterFlush()) {
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
}
@ -550,7 +566,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Clear progress away unless invoker has indicated it should be kept.
if (!scannerContext.getKeepProgress()) scannerContext.clearProgress();
if (!scannerContext.getKeepProgress()) {
scannerContext.clearProgress();
}
// Only do a sanity-check if store and comparator are available.
CellComparator comparator = store != null ? store.getComparator() : null;
@ -566,9 +584,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
}
if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
// Do object compare - we set prevKV from the same heap.
if (prevCell != cell) {
++kvsScanned;
}
checkScanOrder(prevCell, cell, comparator);
int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
bytesRead += cellSize;
prevCell = cell;
ScanQueryMatcher.MatchCode qcode = matcher.match(cell);
switch (qcode) {
@ -600,7 +622,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
// Update local tracking information
count++;
int cellSize = CellUtil.estimatedSerializedSizeOf(cell);
totalBytesRead += cellSize;
// Update the progress of the scanner context
@ -636,7 +657,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
case DONE:
// Optimization for Gets! If DONE, no more to get on this row, early exit!
if (this.scan.isGetScan()) {
if (get) {
// Then no more to this row... exit.
close(false);// Do all cleanup except heap.close()
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
@ -807,34 +828,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
/**
* @param flushed indicates if there was a flush
* @return true if top of heap has changed (and KeyValueHeap has to try the
* next KV)
* @throws IOException
* @return if top of heap has changed (and KeyValueHeap has to try the next KV)
*/
protected boolean checkReseek(boolean flushed) throws IOException {
if (flushed && this.lastTop != null) {
resetScannerStack(this.lastTop);
if (this.heap.peek() == null
|| store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
LOG.debug("Storescanner.peek() is changed where before = "
+ this.lastTop.toString() + ",and after = " + this.heap.peek());
this.lastTop = null;
return true;
}
this.lastTop = null; // gone!
}
// else dont need to reseek
return false;
}
protected void resetScannerStack(Cell lastTopKey) throws IOException {
protected final boolean reopenAfterFlush() throws IOException {
Cell lastTop = heap.peek();
// When we have the scan object, should we not pass it to getScanners() to get a limited set of
// scanners? We did so in the constructor and we could have done it now by storing the scan
// object from the constructor
List<KeyValueScanner> scanners = null;
List<KeyValueScanner> scanners;
flushLock.lock();
try {
flushLock.lock();
scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get,
scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true));
// Clear the current set of flushed store files so that they don't get added again
@ -844,7 +847,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
// Seek the new scanners to the last key
seekScanners(scanners, lastTopKey, false, parallelSeekEnabled);
seekScanners(scanners, lastTop, false, parallelSeekEnabled);
// remove the older memstore scanner
for (int i = 0; i < currentScanners.size(); i++) {
if (!currentScanners.get(i).isFileScanner()) {
@ -856,6 +859,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
addCurrentScanners(scanners);
// Combine all seeked scanners with a heap
resetKVHeap(this.currentScanners, store.getComparator());
resetQueryMatcher(lastTop);
if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) {
if (LOG.isDebugEnabled()) {
LOG.debug("Storescanner.peek() is changed where before = " + lastTop.toString() +
",and after = " + heap.peek());
}
return true;
}
return false;
}
private void resetQueryMatcher(Cell lastTopKey) {
// Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the
// query matcher if scanning intra-row.
@ -902,18 +917,73 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
@Override
public boolean reseek(Cell kv) throws IOException {
boolean flushed = checkFlushed();
// Heap will not be null, if this is called from next() which.
// If called from RegionScanner.reseek(...) make sure the scanner
// stack is reset if needed.
checkReseek(flushed);
if (checkFlushed()) {
reopenAfterFlush();
}
if (explicitColumnQuery && lazySeekEnabledGlobally) {
return heap.requestSeek(kv, true, useRowColBloom);
}
return heap.reseek(kv);
}
protected boolean checkFlushed() {
private void trySwitchToStreamRead() throws IOException {
if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null ||
bytesRead < preadMaxBytes) {
return;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Switch to stream read because we have already read " + bytesRead +
" bytes from this scanner");
}
scanUsePread = false;
Cell lastTop = heap.peek();
Map<String, StoreFile> name2File = new HashMap<>(store.getStorefilesCount());
for (StoreFile file : store.getStorefiles()) {
name2File.put(file.getFileInfo().getActiveFileName(), file);
}
List<StoreFile> filesToReopen = new ArrayList<>();
List<KeyValueScanner> memstoreScanners = new ArrayList<>();
List<KeyValueScanner> fileScanners = null;
List<KeyValueScanner> scannersToClose = new ArrayList<>();
boolean succ = false;
try {
for (KeyValueScanner kvs : currentScanners) {
if (!kvs.isFileScanner()) {
memstoreScanners.add(kvs);
} else {
scannersToClose.add(kvs);
if (kvs.peek() == null) {
continue;
}
filesToReopen.add(name2File.get(kvs.getFilePath().getName()));
}
}
if (filesToReopen.isEmpty()) {
return;
}
fileScanners =
store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(),
scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false);
seekScanners(fileScanners, lastTop, false, parallelSeekEnabled);
currentScanners.clear();
addCurrentScanners(fileScanners);
addCurrentScanners(memstoreScanners);
resetKVHeap(currentScanners, store.getComparator());
resetQueryMatcher(lastTop);
for (KeyValueScanner kvs : scannersToClose) {
kvs.close();
}
succ = true;
} finally {
if (!succ && fileScanners != null) {
for (KeyValueScanner scanner : fileScanners) {
scanner.close();
}
}
}
}
protected final boolean checkFlushed() {
// check the var without any lock. Suppose even if we see the old
// value here still it is ok to continue because we will not be resetting
// the heap but will continue with the referenced memstore's snapshot. For compactions
@ -922,9 +992,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
if (flushed) {
// If there is a flush and the current scan is notified on the flush ensure that the
// scan's heap gets reset and we do a seek on the newly flushed file.
if(!this.closing) {
this.lastTop = this.peek();
} else {
if (this.closing) {
return false;
}
// reset the flag
@ -983,6 +1051,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
* Used in testing.
* @return all scanners in no particular order
*/
@VisibleForTesting
List<KeyValueScanner> getAllScannersForTesting() {
List<KeyValueScanner> allScanners = new ArrayList<>();
KeyValueScanner current = heap.getCurrentForTesting();

View File

@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import com.google.common.collect.Lists;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@ -31,8 +35,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
@ -45,7 +47,9 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.regionserver.wal.WALUtil;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@ -53,28 +57,26 @@ import org.apache.hadoop.hbase.wal.WAL;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Test for the case where a regionserver going down has enough cycles to do damage to regions
* that have actually been assigned elsehwere.
*
* <p>If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the
* same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise
* change the region file set. The region in its new location will then get a surprise when it tries to do something
* w/ a file removed by the region in its old location on dying server.
*
* <p>Making a test for this case is a little tough in that even if a file is deleted up on the namenode,
* if the file was opened before the delete, it will continue to let reads happen until something changes the
* state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned
* from the datanode by NN).
*
* <p>What we will do below is do an explicit check for existence on the files listed in the region that
* has had some files removed because of a compaction. This sort of hurry's along and makes certain what is a chance
* occurance.
* Test for the case where a regionserver going down has enough cycles to do damage to regions that
* have actually been assigned elsehwere.
* <p>
* If we happen to assign a region before it fully done with in its old location -- i.e. it is on
* two servers at the same time -- all can work fine until the case where the region on the dying
* server decides to compact or otherwise change the region file set. The region in its new location
* will then get a surprise when it tries to do something w/ a file removed by the region in its old
* location on dying server.
* <p>
* Making a test for this case is a little tough in that even if a file is deleted up on the
* namenode, if the file was opened before the delete, it will continue to let reads happen until
* something changes the state of cached blocks in the dfsclient that was already open (a block from
* the deleted file is cleaned from the datanode by NN).
* <p>
* What we will do below is do an explicit check for existence on the files listed in the region
* that has had some files removed because of a compaction. This sort of hurry's along and makes
* certain what is a chance occurance.
*/
@Category({MiscTests.class, MediumTests.class})
@Category({MiscTests.class, LargeTests.class})
public class TestIOFencing {
private static final Log LOG = LogFactory.getLog(TestIOFencing.class);
static {
@ -334,23 +336,38 @@ public class TestIOFencing {
while (compactingRegion.compactCount == 0) {
Thread.sleep(1000);
}
// The server we killed stays up until the compaction that was started before it was killed completes. In logs
// you should see the old regionserver now going down.
// The server we killed stays up until the compaction that was started before it was killed
// completes. In logs you should see the old regionserver now going down.
LOG.info("Compaction finished");
// If we survive the split keep going...
// Now we make sure that the region isn't totally confused. Load up more rows.
TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT,
FIRST_BATCH_COUNT + SECOND_BATCH_COUNT);
admin.majorCompact(TABLE_NAME);
startWaitTime = System.currentTimeMillis();
while (newRegion.compactCount == 0) {
Thread.sleep(1000);
assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000);
assertTrue("New region never compacted",
System.currentTimeMillis() - startWaitTime < 180000);
}
if(policy == MemoryCompactionPolicy.EAGER) {
assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= TEST_UTIL.countRows(table));
int count;
for (int i = 0;; i++) {
try {
count = TEST_UTIL.countRows(table);
break;
} catch (DoNotRetryIOException e) {
// wait up to 30s
if (i >= 30 || !e.getMessage().contains("File does not exist")) {
throw e;
}
Thread.sleep(1000);
}
}
if (policy == MemoryCompactionPolicy.EAGER) {
assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count);
} else {
assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table));
assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count);
}
} finally {
if (compactingRegion != null) {

View File

@ -671,7 +671,8 @@ public class TestHFileBlock {
while (System.currentTimeMillis() < endTime) {
int blockId = rand.nextInt(NUM_TEST_BLOCKS);
long offset = offsets.get(blockId);
boolean pread = rand.nextBoolean();
// now we only support concurrent read with pread = true
boolean pread = true;
boolean withOnDiskSize = rand.nextBoolean();
long expectedSize =
(blockId == NUM_TEST_BLOCKS - 1 ? fileSize

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
@ -92,6 +93,11 @@ public class DelegatingKeyValueScanner implements KeyValueScanner {
return delegate.isFileScanner();
}
@Override
public Path getFilePath() {
return delegate.getFilePath();
}
@Override
public boolean backwardSeek(Cell key) throws IOException {
return delegate.backwardSeek(key);

View File

@ -144,11 +144,18 @@ public class MockStoreFile extends StoreFile {
public void initReader() throws IOException {
}
@Override
public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder,
boolean canOptimizeForNonNullColumn) {
return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder,
canOptimizeForNonNullColumn);
}
@Override
public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks,
boolean pread, boolean isCompaction, long readPt, long scannerOrder,
boolean canOptimizeForNonNullColumn) throws IOException {
return getReader().getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, scannerOrder,
boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn)
throws IOException {
return getReader().getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder,
canOptimizeForNonNullColumn);
}

View File

@ -96,14 +96,14 @@ public class TestBlocksScanned extends HBaseTestCase {
CacheStats stats = new CacheConfig(TEST_UTIL.getConfiguration()).getBlockCache().getStats();
long before = stats.getHitCount() + stats.getMissCount();
// Do simple test of getting one row only first.
Scan scan = new Scan(Bytes.toBytes("aaa"), Bytes.toBytes("aaz"));
Scan scan = new Scan().withStartRow(Bytes.toBytes("aaa")).withStopRow(Bytes.toBytes("aaz"))
.setReadType(Scan.ReadType.PREAD);
scan.addColumn(FAMILY, COL);
scan.setMaxVersions(1);
InternalScanner s = r.getScanner(scan);
List<Cell> results = new ArrayList<>();
while (s.next(results))
;
while (s.next(results));
s.close();
int expectResultSize = 'z' - 'a';

View File

@ -192,7 +192,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore {
Configuration conf = HBaseConfiguration.create();
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
ScanType scanType = ScanType.USER_SCAN;
InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null,

View File

@ -212,9 +212,9 @@ public class TestCompaction {
for (Store hstore: this.r.stores.values()) {
HStore store = (HStore)hstore;
ScanInfo old = store.getScanInfo();
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator());
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0,
old.getComparator());
store.setScanInfo(si);
}
Thread.sleep(ttl);

View File

@ -160,8 +160,8 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy {
ScanInfo oldScanInfo = store.getScanInfo();
ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(),
oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600,
oldScanInfo.getKeepDeletedCells(), oldScanInfo.getTimeToPurgeDeletes(),
oldScanInfo.getComparator());
oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(),
oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator());
store.setScanInfo(newScanInfo);
// Do not compact empty store file
List<StoreFile> candidates = sfCreate(0);

View File

@ -162,9 +162,8 @@ public class TestDefaultMemStore {
Scan scan = new Scan();
List<Cell> result = new ArrayList<>();
Configuration conf = HBaseConfiguration.create();
ScanInfo scanInfo =
new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0,
this.memstore.getComparator());
ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP,
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
ScanType scanType = ScanType.USER_SCAN;
StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
int count = 0;
@ -602,7 +601,7 @@ public class TestDefaultMemStore {
Configuration conf = HBaseConfiguration.create();
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE,
KeepDeletedCells.FALSE, 0, this.memstore.getComparator());
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator());
ScanType scanType = ScanType.USER_SCAN;
try (InternalScanner scanner = new StoreScanner(new Scan(
Bytes.toBytes(startRowId)), scanInfo, scanType, null,

View File

@ -292,9 +292,9 @@ public class TestMajorCompaction {
for (Store hstore : r.getStores()) {
HStore store = ((HStore) hstore);
ScanInfo old = store.getScanInfo();
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator());
ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(),
old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0,
old.getComparator());
store.setScanInfo(si);
}
Thread.sleep(1000);

View File

@ -264,8 +264,9 @@ public class TestReversibleScanners {
BloomType.NONE, true);
ScanType scanType = ScanType.USER_SCAN;
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE,
Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR);
ScanInfo scanInfo =
new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
// Case 1.Test a full reversed scan
Scan scan = new Scan();

View File

@ -67,8 +67,8 @@ public class TestStoreScanner {
private static final String CF_STR = "cf";
private static final byte [] CF = Bytes.toBytes(CF_STR);
static Configuration CONF = HBaseConfiguration.create();
private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE,
Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR);
private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE,
KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
private ScanType scanType = ScanType.USER_SCAN;
/**
@ -829,8 +829,8 @@ public class TestStoreScanner {
List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scan = new Scan();
scan.setMaxVersions(1);
ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
CellComparator.COMPARATOR);
ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
ScanType scanType = ScanType.USER_SCAN;
try (StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners)) {
List<Cell> results = new ArrayList<>();
@ -902,8 +902,8 @@ public class TestStoreScanner {
Scan scan = new Scan();
scan.setMaxVersions(1);
// scanner with ttl equal to 500
ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0,
CellComparator.COMPARATOR);
ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR);
ScanType scanType = ScanType.USER_SCAN;
try (StoreScanner scanner =
new StoreScanner(scan, scanInfo, scanType, null, scanners)) {
@ -968,6 +968,7 @@ public class TestStoreScanner {
0 /* minVersions */,
2 /* maxVersions */, 500 /* ttl */,
KeepDeletedCells.FALSE /* keepDeletedCells */,
HConstants.DEFAULT_BLOCKSIZE /* block size */,
200, /* timeToPurgeDeletes */
CellComparator.COMPARATOR);
try (StoreScanner scanner =

View File

@ -0,0 +1,127 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ RegionServerTests.class, MediumTests.class })
public class TestSwitchToStreamRead {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName TABLE_NAME = TableName.valueOf("stream");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] QUAL = Bytes.toBytes("cq");
private static String VALUE_PREFIX;
private static HRegion REGION;
@BeforeClass
public static void setUp() throws IOException {
UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048);
StringBuilder sb = new StringBuilder(256);
for (int i = 0; i < 255; i++) {
sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1));
}
VALUE_PREFIX = sb.append("-").toString();
REGION = UTIL.createLocalHRegion(
new HTableDescriptor(TABLE_NAME).addFamily(new HColumnDescriptor(FAMILY).setBlocksize(1024)),
null, null);
for (int i = 0; i < 900; i++) {
REGION
.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
}
REGION.flush(true);
for (int i = 900; i < 1000; i++) {
REGION
.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
}
}
@AfterClass
public static void tearDown() throws IOException {
REGION.close(true);
UTIL.cleanupTestDir();
}
@Test
public void test() throws IOException {
try (RegionScanner scanner = REGION.getScanner(new Scan())) {
StoreScanner storeScanner = (StoreScanner) ((RegionScannerImpl) scanner)
.getStoreHeapForTesting().getCurrentForTesting();
for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
if (kvs instanceof StoreFileScanner) {
StoreFileScanner sfScanner = (StoreFileScanner) kvs;
// starting from pread so we use shared reader here.
assertTrue(sfScanner.getReader().shared);
}
}
List<Cell> cells = new ArrayList<>();
for (int i = 0; i < 500; i++) {
assertTrue(scanner.next(cells));
Result result = Result.create(cells);
assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
cells.clear();
}
for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
if (kvs instanceof StoreFileScanner) {
StoreFileScanner sfScanner = (StoreFileScanner) kvs;
// we should have convert to use stream read now.
assertFalse(sfScanner.getReader().shared);
}
}
for (int i = 500; i < 1000; i++) {
assertEquals(i != 999, scanner.next(cells));
Result result = Result.create(cells);
assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL)));
cells.clear();
}
}
// make sure all scanners are closed.
for (StoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
assertFalse(sf.isReferencedInReads());
}
}
}

View File

@ -73,8 +73,8 @@ public class TestCompactionScanQueryMatcher extends AbstractTestScanQueryMatcher
throws IOException {
long now = EnvironmentEdgeManager.currentTime();
// Set time to purge deletes to negative value to avoid it ever happening.
ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L,
rowComparator);
ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, -1L, rowComparator);
CompactionScanQueryMatcher qm = CompactionScanQueryMatcher.create(scanInfo,
ScanType.COMPACT_RETAIN_DELETES, Long.MAX_VALUE, HConstants.OLDEST_TIMESTAMP,

View File

@ -54,7 +54,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
long now = EnvironmentEdgeManager.currentTime();
// Do with fam2 which has a col2 qualifier.
UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator),
new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
get.getFamilyMap().get(fam2), now - ttl, now, null);
Cell kv = new KeyValue(row1, fam2, col2, 1, data);
Cell cell = CellUtil.createLastOnRowCol(kv);
@ -79,8 +80,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
long now = EnvironmentEdgeManager.currentTime();
// 2,4,5
UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator),
UserScanQueryMatcher qm = UserScanQueryMatcher.create(
scan, new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
get.getFamilyMap().get(fam2), now - ttl, now, null);
List<KeyValue> memstore = new ArrayList<>(6);
@ -122,9 +124,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
expected.add(ScanQueryMatcher.MatchCode.DONE);
long now = EnvironmentEdgeManager.currentTime();
UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
now - ttl, now, null);
UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1,
ttl, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
null, now - ttl, now, null);
List<KeyValue> memstore = new ArrayList<>(6);
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@ -168,7 +170,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
long now = EnvironmentEdgeManager.currentTime();
UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator),
new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE,
HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
get.getFamilyMap().get(fam2), now - testTTL, now, null);
KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data),
@ -209,9 +212,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher {
ScanQueryMatcher.MatchCode.DONE };
long now = EnvironmentEdgeManager.currentTime();
UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan,
new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
now - testTTL, now, null);
UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1,
testTTL, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator),
null, now - testTTL, now, null);
KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data),
new KeyValue(row1, fam2, col2, now - 50, data),

View File

@ -245,11 +245,10 @@ public class TestCoprocessorScanPolicy {
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions,
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, scanners,
@ -266,11 +265,10 @@ public class TestCoprocessorScanPolicy {
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions,
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, scanners, scanType,
@ -287,11 +285,10 @@ public class TestCoprocessorScanPolicy {
Integer newVersions = versions.get(store.getTableName());
ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(),
family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions,
ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(),
family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
return new StoreScanner(store, scanInfo, scan, targetCols, readPt);
} else {
return s;