From 0ae0edcd630aa1dcb6c47ea11fa4367ae0a5baa8 Mon Sep 17 00:00:00 2001 From: zhangduo Date: Wed, 10 May 2017 14:15:44 +0800 Subject: [PATCH] HBASE-17917 Use pread by default for all user scan --- .../hadoop/hbase/io/hfile/HFileBlock.java | 70 ++--- .../hbase/regionserver/KeyValueHeap.java | 3 + .../hbase/regionserver/KeyValueScanner.java | 7 + .../regionserver/NonLazyKeyValueScanner.java | 9 + .../regionserver/ReversedStoreScanner.java | 12 +- .../hadoop/hbase/regionserver/ScanInfo.java | 17 +- .../hbase/regionserver/SegmentScanner.java | 7 +- .../hadoop/hbase/regionserver/StoreFile.java | 14 +- .../hbase/regionserver/StoreFileInfo.java | 15 + .../hbase/regionserver/StoreFileReader.java | 3 +- .../hbase/regionserver/StoreFileScanner.java | 42 +-- .../hbase/regionserver/StoreScanner.java | 279 +++++++++++------- .../apache/hadoop/hbase/TestIOFencing.java | 79 +++-- .../hadoop/hbase/io/hfile/TestHFileBlock.java | 3 +- .../DelegatingKeyValueScanner.java | 6 + .../hbase/regionserver/MockStoreFile.java | 13 +- .../hbase/regionserver/TestBlocksScanned.java | 6 +- .../regionserver/TestCompactingMemStore.java | 2 +- .../hbase/regionserver/TestCompaction.java | 6 +- .../TestDefaultCompactSelection.java | 4 +- .../regionserver/TestDefaultMemStore.java | 7 +- .../regionserver/TestMajorCompaction.java | 6 +- .../regionserver/TestReversibleScanners.java | 5 +- .../hbase/regionserver/TestStoreScanner.java | 13 +- .../regionserver/TestSwitchToStreamRead.java | 127 ++++++++ .../TestCompactionScanQueryMatcher.java | 4 +- .../TestUserScanQueryMatcher.java | 23 +- .../hbase/util/TestCoprocessorScanPolicy.java | 21 +- 28 files changed, 541 insertions(+), 262 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java index 445dc86862f..1e86b0bf7a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hbase.io.hfile; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + import java.io.DataInputStream; import java.io.DataOutput; import java.io.DataOutputStream; @@ -24,8 +27,6 @@ import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicReference; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -53,9 +54,6 @@ import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.io.IOUtils; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - /** * Reads {@link HFile} version 2 blocks to HFiles and via {@link Cacheable} Interface to caches. * Version 2 was introduced in hbase-0.92.0. No longer has support for version 1 blocks since @@ -1418,7 +1416,7 @@ public class HFileBlock implements Cacheable { static class FSReaderImpl implements FSReader { /** The file system stream of the underlying {@link HFile} that * does or doesn't do checksum validations in the filesystem */ - protected FSDataInputStreamWrapper streamWrapper; + private FSDataInputStreamWrapper streamWrapper; private HFileBlockDecodingContext encodedBlockDecodingCtx; @@ -1434,22 +1432,18 @@ public class HFileBlock implements Cacheable { private AtomicReference prefetchedHeader = new AtomicReference<>(new PrefetchedHeader()); /** The size of the file we are reading from, or -1 if unknown. */ - protected long fileSize; + private long fileSize; /** The size of the header */ + @VisibleForTesting protected final int hdrSize; /** The filesystem used to access data */ - protected HFileSystem hfs; + private HFileSystem hfs; - private final Lock streamLock = new ReentrantLock(); - - /** The default buffer size for our buffered streams */ - public static final int DEFAULT_BUFFER_SIZE = 1 << 20; - - protected HFileContext fileContext; + private HFileContext fileContext; // Cache the fileName - protected String pathName; + private String pathName; FSReaderImpl(FSDataInputStreamWrapper stream, long fileSize, HFileSystem hfs, Path path, HFileContext fileContext) throws IOException { @@ -1524,39 +1518,33 @@ public class HFileBlock implements Cacheable { * next header * @throws IOException */ - protected int readAtOffset(FSDataInputStream istream, byte [] dest, int destOffset, int size, + @VisibleForTesting + protected int readAtOffset(FSDataInputStream istream, byte[] dest, int destOffset, int size, boolean peekIntoNextBlock, long fileOffset, boolean pread) throws IOException { if (peekIntoNextBlock && destOffset + size + hdrSize > dest.length) { // We are asked to read the next block's header as well, but there is // not enough room in the array. - throw new IOException("Attempted to read " + size + " bytes and " + - hdrSize + " bytes of next header into a " + dest.length + - "-byte array at offset " + destOffset); + throw new IOException("Attempted to read " + size + " bytes and " + hdrSize + + " bytes of next header into a " + dest.length + "-byte array at offset " + destOffset); } - if (!pread && streamLock.tryLock()) { + if (!pread) { // Seek + read. Better for scanning. - try { - HFileUtil.seekOnMultipleSources(istream, fileOffset); + HFileUtil.seekOnMultipleSources(istream, fileOffset); + long realOffset = istream.getPos(); + if (realOffset != fileOffset) { + throw new IOException("Tried to seek to " + fileOffset + " to " + "read " + size + + " bytes, but pos=" + realOffset + " after seek"); + } - long realOffset = istream.getPos(); - if (realOffset != fileOffset) { - throw new IOException("Tried to seek to " + fileOffset + " to " - + "read " + size + " bytes, but pos=" + realOffset - + " after seek"); - } + if (!peekIntoNextBlock) { + IOUtils.readFully(istream, dest, destOffset, size); + return -1; + } - if (!peekIntoNextBlock) { - IOUtils.readFully(istream, dest, destOffset, size); - return -1; - } - - // Try to read the next block header. - if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) { - return -1; - } - } finally { - streamLock.unlock(); + // Try to read the next block header. + if (!readWithExtra(istream, dest, destOffset, size, hdrSize)) { + return -1; } } else { // Positional read. Better for random reads; or when the streamLock is already locked. @@ -1565,7 +1553,6 @@ public class HFileBlock implements Cacheable { return -1; } } - assert peekIntoNextBlock; return Bytes.toInt(dest, destOffset + size + BlockType.MAGIC_LENGTH) + hdrSize; } @@ -1719,6 +1706,7 @@ public class HFileBlock implements Cacheable { * If HBase checksum is switched off, then use HDFS checksum. * @return the HFileBlock or null if there is a HBase checksum mismatch */ + @VisibleForTesting protected HFileBlock readBlockDataInternal(FSDataInputStream is, long offset, long onDiskSizeWithHeaderL, boolean pread, boolean verifyChecksum) throws IOException { @@ -1830,7 +1818,7 @@ public class HFileBlock implements Cacheable { * If the block doesn't uses checksum, returns false. * @return True if checksum matches, else false. */ - protected boolean validateChecksum(long offset, ByteBuffer data, int hdrSize) + private boolean validateChecksum(long offset, ByteBuffer data, int hdrSize) throws IOException { // If this is an older version of the block that does not have checksums, then return false // indicating that checksum verification did not succeed. Actually, this method should never diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java index 195e8f7491d..a398ce9428c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java @@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; + import java.io.IOException; import java.util.ArrayList; import java.util.Comparator; @@ -422,6 +424,7 @@ public class KeyValueHeap extends NonReversedNonLazyKeyValueScanner return 0; } + @VisibleForTesting KeyValueScanner getCurrentForTesting() { return current; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java index a4cb2f4da74..7f716d63a86 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java @@ -22,6 +22,7 @@ import java.io.Closeable; import java.io.IOException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Scan; @@ -134,6 +135,12 @@ public interface KeyValueScanner extends Shipper, Closeable { */ boolean isFileScanner(); + /** + * @return the file path if this is a file scanner, otherwise null. + * @see #isFileScanner() + */ + Path getFilePath(); + // Support for "Reversed Scanner" /** * Seek the scanner at or before the row of specified Cell, it firstly diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java index c01b0e6b662..8778f5d7058 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Scan; @@ -65,6 +66,14 @@ public abstract class NonLazyKeyValueScanner implements KeyValueScanner { // Not a file by default. return false; } + + + @Override + public Path getFilePath() { + // Not a file by default. + return null; + } + @Override public Cell getNextIndexedKey() { return null; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java index d71af2b794e..07f98ada8ac 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReversedStoreScanner.java @@ -123,15 +123,17 @@ class ReversedStoreScanner extends StoreScanner implements KeyValueScanner { @Override public boolean seekToPreviousRow(Cell key) throws IOException { - boolean flushed = checkFlushed(); - checkReseek(flushed); + if (checkFlushed()) { + reopenAfterFlush(); + } return this.heap.seekToPreviousRow(key); } - + @Override public boolean backwardSeek(Cell key) throws IOException { - boolean flushed = checkFlushed(); - checkReseek(flushed); + if (checkFlushed()) { + reopenAfterFlush(); + } return this.heap.backwardSeek(key); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java index 349e1661c31..2a66e55d155 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanInfo.java @@ -48,6 +48,7 @@ public class ScanInfo { private boolean usePread; private long cellsPerTimeoutCheck; private boolean parallelSeekEnabled; + private final long preadMaxBytes; private final Configuration conf; public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT @@ -58,14 +59,14 @@ public class ScanInfo { * @param conf * @param family {@link HColumnDescriptor} describing the column family * @param ttl Store's TTL (in ms) - * @param timeToPurgeDeletes duration in ms after which a delete marker can - * be purged during a major compaction. + * @param timeToPurgeDeletes duration in ms after which a delete marker can be purged during a + * major compaction. * @param comparator The store's comparator */ public ScanInfo(final Configuration conf, final HColumnDescriptor family, final long ttl, final long timeToPurgeDeletes, final CellComparator comparator) { - this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family - .getKeepDeletedCells(), timeToPurgeDeletes, comparator); + this(conf, family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, + family.getKeepDeletedCells(), family.getBlocksize(), timeToPurgeDeletes, comparator); } /** @@ -74,6 +75,7 @@ public class ScanInfo { * @param minVersions Store's MIN_VERSIONS setting * @param maxVersions Store's VERSIONS setting * @param ttl Store's TTL (in ms) + * @param blockSize Store's block size * @param timeToPurgeDeletes duration in ms after which a delete marker can * be purged during a major compaction. * @param keepDeletedCells Store's keepDeletedCells setting @@ -81,7 +83,7 @@ public class ScanInfo { */ public ScanInfo(final Configuration conf, final byte[] family, final int minVersions, final int maxVersions, final long ttl, final KeepDeletedCells keepDeletedCells, - final long timeToPurgeDeletes, final CellComparator comparator) { + final long blockSize, final long timeToPurgeDeletes, final CellComparator comparator) { this.family = family; this.minVersions = minVersions; this.maxVersions = maxVersions; @@ -99,6 +101,7 @@ public class ScanInfo { perHeartbeat: StoreScanner.DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK; this.parallelSeekEnabled = conf.getBoolean(StoreScanner.STORESCANNER_PARALLEL_SEEK_ENABLE, false); + this.preadMaxBytes = conf.getLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 4 * blockSize); this.conf = conf; } @@ -149,4 +152,8 @@ public class ScanInfo { public CellComparator getComparator() { return comparator; } + + long getPreadMaxBytes() { + return preadMaxBytes; + } } \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java index 272736011b7..08ded8807e0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SegmentScanner.java @@ -23,6 +23,7 @@ import java.util.Iterator; import java.util.SortedSet; import org.apache.commons.lang.NotImplementedException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.classification.InterfaceAudience; @@ -316,6 +317,11 @@ public class SegmentScanner implements KeyValueScanner { return false; } + @Override + public Path getFilePath() { + return null; + } + /** * @return the next key in the index (the key to seek to the next block) * if known, or null otherwise @@ -396,5 +402,4 @@ public class SegmentScanner implements KeyValueScanner { } return (first != null ? first : second); } - } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java index c53fbf08f3b..91ff97a72d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java @@ -589,11 +589,17 @@ public class StoreFile { return reader; } + public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, + boolean canOptimizeForNonNullColumn) { + return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, + canOptimizeForNonNullColumn); + } + public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, - boolean pread, boolean isCompaction, long readPt, long scannerOrder, - boolean canOptimizeForNonNullColumn) throws IOException { - return createStreamReader(canUseDropBehind).getStoreFileScanner( - cacheBlocks, pread, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); + boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) + throws IOException { + return createStreamReader(canUseDropBehind).getStoreFileScanner(cacheBlocks, false, + isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java index c4754a8b6ac..0e99c742b5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileInfo.java @@ -564,4 +564,19 @@ public class StoreFileInfo { hash = hash * 31 + ((link == null) ? 0 : link.hashCode()); return hash; } + + /** + * Return the active file name that contains the real data. + *

+ * For referenced hfile, we will return the name of the reference file as it will be used to + * construct the StoreFileReader. And for linked hfile, we will return the name of the file being + * linked. + */ + public String getActiveFileName() { + if (reference != null || link == null) { + return initialPath.getName(); + } else { + return HFileLink.getReferencedHFileName(initialPath.getName()); + } + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java index b015ea56237..ee7d132aedd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileReader.java @@ -76,7 +76,8 @@ public class StoreFileReader { // indicate that whether this StoreFileReader is shared, i.e., used for pread. If not, we will // close the internal reader when readCompleted is called. - private final boolean shared; + @VisibleForTesting + final boolean shared; private StoreFileReader(HFile.Reader reader, AtomicInteger refCount, boolean shared) { this.reader = reader; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java index aa4f897f532..42c2af29a9b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.LongAdder; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; @@ -95,25 +96,21 @@ public class StoreFileScanner implements KeyValueScanner { } /** - * Return an array of scanners corresponding to the given - * set of store files. + * Return an array of scanners corresponding to the given set of store files. */ - public static List getScannersForStoreFiles( - Collection files, - boolean cacheBlocks, - boolean usePread, long readPt) throws IOException { - return getScannersForStoreFiles(files, cacheBlocks, - usePread, false, false, readPt); + public static List getScannersForStoreFiles(Collection files, + boolean cacheBlocks, boolean usePread, long readPt) throws IOException { + return getScannersForStoreFiles(files, cacheBlocks, usePread, false, false, readPt); } /** * Return an array of scanners corresponding to the given set of store files. */ - public static List getScannersForStoreFiles( - Collection files, boolean cacheBlocks, boolean usePread, - boolean isCompaction, boolean useDropBehind, long readPt) throws IOException { - return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, - useDropBehind, null, readPt); + public static List getScannersForStoreFiles(Collection files, + boolean cacheBlocks, boolean usePread, boolean isCompaction, boolean useDropBehind, + long readPt) throws IOException { + return getScannersForStoreFiles(files, cacheBlocks, usePread, isCompaction, useDropBehind, null, + readPt); } /** @@ -126,11 +123,17 @@ public class StoreFileScanner implements KeyValueScanner { List scanners = new ArrayList<>(files.size()); List sortedFiles = new ArrayList<>(files); Collections.sort(sortedFiles, StoreFile.Comparators.SEQ_ID); + boolean canOptimizeForNonNullColumn = matcher != null ? !matcher.hasNullColumnInQuery() : false; for (int i = 0, n = sortedFiles.size(); i < n; i++) { StoreFile sf = sortedFiles.get(i); sf.initReader(); - StoreFileScanner scanner = sf.getReader().getStoreFileScanner(cacheBlocks, usePread, - isCompaction, readPt, i, matcher != null ? !matcher.hasNullColumnInQuery() : false); + StoreFileScanner scanner; + if (usePread) { + scanner = sf.getPreadScanner(cacheBlocks, readPt, i, canOptimizeForNonNullColumn); + } else { + scanner = sf.getStreamScanner(canUseDrop, cacheBlocks, isCompaction, readPt, i, + canOptimizeForNonNullColumn); + } scanners.add(scanner); } return scanners; @@ -148,8 +151,8 @@ public class StoreFileScanner implements KeyValueScanner { boolean succ = false; try { for (int i = 0, n = sortedFiles.size(); i < n; i++) { - scanners.add(sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, false, true, - readPt, i, false)); + scanners.add( + sortedFiles.get(i).getStreamScanner(canUseDropBehind, false, true, readPt, i, false)); } succ = true; } finally { @@ -444,6 +447,11 @@ public class StoreFileScanner implements KeyValueScanner { return true; } + @Override + public Path getFilePath() { + return reader.getHFileReader().getPath(); + } + // Test methods static final long getSeekCount() { return seekCount.sum(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java index d39a6ee7e2a..338a68c7ee4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java @@ -24,7 +24,9 @@ import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.io.InterruptedIOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.NavigableSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.locks.ReentrantLock; @@ -53,8 +55,12 @@ import org.apache.hadoop.hbase.regionserver.querymatcher.UserScanQueryMatcher; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** - * Scanner scans both the memstore and the Store. Coalesce KeyValue stream - * into List<KeyValue> for a single row. + * Scanner scans both the memstore and the Store. Coalesce KeyValue stream into List<KeyValue> + * for a single row. + *

+ * The implementation is not thread safe. So there will be no race between next and close. The only + * exception is updateReaders, it will be called in the memstore flush thread to indicate that there + * is a flush. */ @InterfaceAudience.Private public class StoreScanner extends NonReversedNonLazyKeyValueScanner @@ -62,36 +68,35 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private static final Log LOG = LogFactory.getLog(StoreScanner.class); // In unit tests, the store could be null protected final Store store; - protected ScanQueryMatcher matcher; + private ScanQueryMatcher matcher; protected KeyValueHeap heap; - protected boolean cacheBlocks; + private boolean cacheBlocks; - protected long countPerRow = 0; - protected int storeLimit = -1; - protected int storeOffset = 0; + private long countPerRow = 0; + private int storeLimit = -1; + private int storeOffset = 0; // Used to indicate that the scanner has closed (see HBASE-1107) - // Doesnt need to be volatile because it's always accessed via synchronized methods - protected boolean closing = false; - protected final boolean get; - protected final boolean explicitColumnQuery; - protected final boolean useRowColBloom; + // Do not need to be volatile because it's always accessed via synchronized methods + private boolean closing = false; + private final boolean get; + private final boolean explicitColumnQuery; + private final boolean useRowColBloom; /** * A flag that enables StoreFileScanner parallel-seeking */ - protected boolean parallelSeekEnabled = false; - protected ExecutorService executor; - protected final Scan scan; - protected final NavigableSet columns; - protected final long oldestUnexpiredTS; - protected final long now; - protected final int minVersions; - protected final long maxRowSize; - protected final long cellsPerHeartbeatCheck; + private boolean parallelSeekEnabled = false; + private ExecutorService executor; + private final Scan scan; + private final long oldestUnexpiredTS; + private final long now; + private final int minVersions; + private final long maxRowSize; + private final long cellsPerHeartbeatCheck; // Collects all the KVHeap that are eagerly getting closed during the // course of a scan - protected List heapsForDelayedClose = new ArrayList<>(); + private final List heapsForDelayedClose = new ArrayList<>(); /** * The number of KVs seen by the scanner. Includes explicitly skipped KVs, but not @@ -100,14 +105,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner private long kvsScanned = 0; private Cell prevCell = null; + private final long preadMaxBytes; + private long bytesRead; + /** We don't ever expect to change this, the constant is just for clarity. */ static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; public static final String STORESCANNER_PARALLEL_SEEK_ENABLE = "hbase.storescanner.parallel.seek.enable"; /** Used during unit testing to ensure that lazy seek does save seek ops */ - protected static boolean lazySeekEnabledGlobally = - LAZY_SEEK_ENABLED_BY_DEFAULT; + private static boolean lazySeekEnabledGlobally = LAZY_SEEK_ENABLED_BY_DEFAULT; /** * The number of cells scanned in between timeout checks. Specifying a larger value means that @@ -122,19 +129,26 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner */ public static final long DEFAULT_HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK = 10000; - // if heap == null and lastTop != null, you need to reseek given the key below - protected Cell lastTop = null; + /** + * If the read type if Scan.ReadType.DEFAULT, we will start with pread, and if the kvs we scanned + * reaches this limit, we will reopen the scanner with stream. The default value is 4 times of + * block size for this store. + */ + public static final String STORESCANNER_PREAD_MAX_BYTES = "hbase.storescanner.pread.max.bytes"; + + private final Scan.ReadType readType; // A flag whether use pread for scan - private final boolean scanUsePread; + // it maybe changed if we use Scan.ReadType.DEFAULT and we have read lots of data. + private boolean scanUsePread; // Indicates whether there was flush during the course of the scan - protected volatile boolean flushed = false; + private volatile boolean flushed = false; // generally we get one file from a flush - protected List flushedStoreFiles = new ArrayList<>(1); + private final List flushedStoreFiles = new ArrayList<>(1); // The current list of scanners - protected List currentScanners = new ArrayList<>(); + private final List currentScanners = new ArrayList<>(); // flush update lock - private ReentrantLock flushLock = new ReentrantLock(); + private final ReentrantLock flushLock = new ReentrantLock(); protected final long readPt; @@ -155,7 +169,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner int numCol = columns == null ? 0 : columns.size(); explicitColumnQuery = numCol > 0; this.scan = scan; - this.columns = columns; this.now = EnvironmentEdgeManager.currentTime(); this.oldestUnexpiredTS = scan.isRaw() ? 0L : now - scanInfo.getTtl(); this.minVersions = scanInfo.getMinVersions(); @@ -168,32 +181,31 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.maxRowSize = scanInfo.getTableMaxRowSize(); if (get) { + this.readType = Scan.ReadType.PREAD; this.scanUsePread = true; } else { - switch (scan.getReadType()) { - case STREAM: - this.scanUsePread = false; - break; - case PREAD: - this.scanUsePread = true; - break; - default: - this.scanUsePread = scanInfo.isUsePread(); - break; + if (scan.getReadType() == Scan.ReadType.DEFAULT) { + this.readType = scanInfo.isUsePread() ? Scan.ReadType.PREAD : Scan.ReadType.DEFAULT; + } else { + this.readType = scan.getReadType(); + } + // Always start with pread unless user specific stream. Will change to stream later if + // readType is default if the scan keeps running for a long time. + this.scanUsePread = this.readType != Scan.ReadType.STREAM; + } + this.preadMaxBytes = scanInfo.getPreadMaxBytes(); + this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); + // Parallel seeking is on if the config allows and more there is more than one store file. + if (this.store != null && this.store.getStorefilesCount() > 1) { + RegionServerServices rsService = ((HStore) store).getHRegion().getRegionServerServices(); + if (rsService != null && scanInfo.isParallelSeekEnabled()) { + this.parallelSeekEnabled = true; + this.executor = rsService.getExecutorService(); } } - this.cellsPerHeartbeatCheck = scanInfo.getCellsPerTimeoutCheck(); - // Parallel seeking is on if the config allows and more there is more than one store file. - if (this.store != null && this.store.getStorefilesCount() > 1) { - RegionServerServices rsService = ((HStore)store).getHRegion().getRegionServerServices(); - if (rsService != null && scanInfo.isParallelSeekEnabled()) { - this.parallelSeekEnabled = true; - this.executor = rsService.getExecutorService(); - } - } } - protected void addCurrentScanners(List scanners) { + private void addCurrentScanners(List scanners) { this.currentScanners.addAll(scanners); } @@ -360,9 +372,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * Get a filtered list of scanners. Assumes we are not in a compaction. * @return list of scanners to seek */ - protected List getScannersNoCompaction() throws IOException { - return selectScannersFrom(store.getScanners(cacheBlocks, get, scanUsePread, false, matcher, - scan.getStartRow(), scan.getStopRow(), this.readPt)); + private List getScannersNoCompaction() throws IOException { + return selectScannersFrom( + store.getScanners(cacheBlocks, scanUsePread, false, matcher, scan.getStartRow(), + scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), this.readPt)); } /** @@ -413,7 +426,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner /** * Filters the given list of scanners using Bloom filter, time range, and * TTL. + *

+ * Will be overridden by testcase so declared as protected. */ + @VisibleForTesting protected List selectScannersFrom( final List allScanners) { boolean memOnly; @@ -451,10 +467,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public Cell peek() { - if (this.heap == null) { - return this.lastTop; - } - return this.heap.peek(); + return heap != null ? heap.peek() : null; } @Override @@ -472,9 +485,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (this.closing) { return; } - if (withHeapClose) this.closing = true; + if (withHeapClose) { + this.closing = true; + } // Under test, we dont have a this.store - if (this.store != null) this.store.deleteChangedReaderObserver(this); + if (this.store != null) { + this.store.deleteChangedReaderObserver(this); + } if (withHeapClose) { for (KeyValueHeap h : this.heapsForDelayedClose) { h.close(); @@ -492,14 +509,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner this.heap = null; } } - this.lastTop = null; // If both are null, we are closed. } @Override public boolean seek(Cell key) throws IOException { - boolean flushed = checkFlushed(); - // reset matcher state, in case that underlying store changed - checkReseek(flushed); + if (checkFlushed()) { + reopenAfterFlush(); + } return this.heap.seek(key); } @@ -519,8 +535,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (scannerContext == null) { throw new IllegalArgumentException("Scanner context cannot be null"); } - boolean flushed = checkFlushed(); - if (checkReseek(flushed)) { + trySwitchToStreamRead(); + if (checkFlushed() && reopenAfterFlush()) { return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues(); } @@ -550,7 +566,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } // Clear progress away unless invoker has indicated it should be kept. - if (!scannerContext.getKeepProgress()) scannerContext.clearProgress(); + if (!scannerContext.getKeepProgress()) { + scannerContext.clearProgress(); + } // Only do a sanity-check if store and comparator are available. CellComparator comparator = store != null ? store.getComparator() : null; @@ -566,9 +584,13 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues(); } } - - if (prevCell != cell) ++kvsScanned; // Do object compare - we set prevKV from the same heap. + // Do object compare - we set prevKV from the same heap. + if (prevCell != cell) { + ++kvsScanned; + } checkScanOrder(prevCell, cell, comparator); + int cellSize = CellUtil.estimatedSerializedSizeOf(cell); + bytesRead += cellSize; prevCell = cell; ScanQueryMatcher.MatchCode qcode = matcher.match(cell); switch (qcode) { @@ -600,7 +622,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner // Update local tracking information count++; - int cellSize = CellUtil.estimatedSerializedSizeOf(cell); totalBytesRead += cellSize; // Update the progress of the scanner context @@ -636,7 +657,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner case DONE: // Optimization for Gets! If DONE, no more to get on this row, early exit! - if (this.scan.isGetScan()) { + if (get) { // Then no more to this row... exit. close(false);// Do all cleanup except heap.close() return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -807,34 +828,16 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } /** - * @param flushed indicates if there was a flush - * @return true if top of heap has changed (and KeyValueHeap has to try the - * next KV) - * @throws IOException + * @return if top of heap has changed (and KeyValueHeap has to try the next KV) */ - protected boolean checkReseek(boolean flushed) throws IOException { - if (flushed && this.lastTop != null) { - resetScannerStack(this.lastTop); - if (this.heap.peek() == null - || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) { - LOG.debug("Storescanner.peek() is changed where before = " - + this.lastTop.toString() + ",and after = " + this.heap.peek()); - this.lastTop = null; - return true; - } - this.lastTop = null; // gone! - } - // else dont need to reseek - return false; - } - - protected void resetScannerStack(Cell lastTopKey) throws IOException { + protected final boolean reopenAfterFlush() throws IOException { + Cell lastTop = heap.peek(); // When we have the scan object, should we not pass it to getScanners() to get a limited set of // scanners? We did so in the constructor and we could have done it now by storing the scan // object from the constructor - List scanners = null; + List scanners; + flushLock.lock(); try { - flushLock.lock(); scanners = selectScannersFrom(store.getScanners(flushedStoreFiles, cacheBlocks, get, scanUsePread, false, matcher, scan.getStartRow(), scan.getStopRow(), this.readPt, true)); // Clear the current set of flushed store files so that they don't get added again @@ -844,7 +847,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner } // Seek the new scanners to the last key - seekScanners(scanners, lastTopKey, false, parallelSeekEnabled); + seekScanners(scanners, lastTop, false, parallelSeekEnabled); // remove the older memstore scanner for (int i = 0; i < currentScanners.size(); i++) { if (!currentScanners.get(i).isFileScanner()) { @@ -856,6 +859,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner addCurrentScanners(scanners); // Combine all seeked scanners with a heap resetKVHeap(this.currentScanners, store.getComparator()); + resetQueryMatcher(lastTop); + if (heap.peek() == null || store.getComparator().compareRows(lastTop, this.heap.peek()) != 0) { + if (LOG.isDebugEnabled()) { + LOG.debug("Storescanner.peek() is changed where before = " + lastTop.toString() + + ",and after = " + heap.peek()); + } + return true; + } + return false; + } + + private void resetQueryMatcher(Cell lastTopKey) { // Reset the state of the Query Matcher and set to top row. // Only reset and call setRow if the row changes; avoids confusing the // query matcher if scanning intra-row. @@ -902,18 +917,73 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner @Override public boolean reseek(Cell kv) throws IOException { - boolean flushed = checkFlushed(); - // Heap will not be null, if this is called from next() which. - // If called from RegionScanner.reseek(...) make sure the scanner - // stack is reset if needed. - checkReseek(flushed); + if (checkFlushed()) { + reopenAfterFlush(); + } if (explicitColumnQuery && lazySeekEnabledGlobally) { return heap.requestSeek(kv, true, useRowColBloom); } return heap.reseek(kv); } - protected boolean checkFlushed() { + private void trySwitchToStreamRead() throws IOException { + if (readType != Scan.ReadType.DEFAULT || !scanUsePread || closing || heap.peek() == null || + bytesRead < preadMaxBytes) { + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Switch to stream read because we have already read " + bytesRead + + " bytes from this scanner"); + } + scanUsePread = false; + Cell lastTop = heap.peek(); + Map name2File = new HashMap<>(store.getStorefilesCount()); + for (StoreFile file : store.getStorefiles()) { + name2File.put(file.getFileInfo().getActiveFileName(), file); + } + List filesToReopen = new ArrayList<>(); + List memstoreScanners = new ArrayList<>(); + List fileScanners = null; + List scannersToClose = new ArrayList<>(); + boolean succ = false; + try { + for (KeyValueScanner kvs : currentScanners) { + if (!kvs.isFileScanner()) { + memstoreScanners.add(kvs); + } else { + scannersToClose.add(kvs); + if (kvs.peek() == null) { + continue; + } + filesToReopen.add(name2File.get(kvs.getFilePath().getName())); + } + } + if (filesToReopen.isEmpty()) { + return; + } + fileScanners = + store.getScanners(filesToReopen, cacheBlocks, false, false, matcher, scan.getStartRow(), + scan.includeStartRow(), scan.getStopRow(), scan.includeStopRow(), readPt, false); + seekScanners(fileScanners, lastTop, false, parallelSeekEnabled); + currentScanners.clear(); + addCurrentScanners(fileScanners); + addCurrentScanners(memstoreScanners); + resetKVHeap(currentScanners, store.getComparator()); + resetQueryMatcher(lastTop); + for (KeyValueScanner kvs : scannersToClose) { + kvs.close(); + } + succ = true; + } finally { + if (!succ && fileScanners != null) { + for (KeyValueScanner scanner : fileScanners) { + scanner.close(); + } + } + } + } + + protected final boolean checkFlushed() { // check the var without any lock. Suppose even if we see the old // value here still it is ok to continue because we will not be resetting // the heap but will continue with the referenced memstore's snapshot. For compactions @@ -922,9 +992,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner if (flushed) { // If there is a flush and the current scan is notified on the flush ensure that the // scan's heap gets reset and we do a seek on the newly flushed file. - if(!this.closing) { - this.lastTop = this.peek(); - } else { + if (this.closing) { return false; } // reset the flag @@ -983,6 +1051,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner * Used in testing. * @return all scanners in no particular order */ + @VisibleForTesting List getAllScannersForTesting() { List allScanners = new ArrayList<>(); KeyValueScanner current = heap.getCurrentForTesting(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java index f12a3346c1f..f65459fdb56 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java @@ -17,12 +17,16 @@ */ package org.apache.hadoop.hbase; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.common.collect.Lists; + import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; -import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -31,8 +35,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.CompactingMemStore; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; import org.apache.hadoop.hbase.regionserver.ConstantSizeRegionSplitPolicy; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; @@ -45,7 +47,9 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.regionserver.wal.WALUtil; import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; +import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread; @@ -53,28 +57,26 @@ import org.apache.hadoop.hbase.wal.WAL; import org.junit.Test; import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - /** - * Test for the case where a regionserver going down has enough cycles to do damage to regions - * that have actually been assigned elsehwere. - * - *

If we happen to assign a region before it fully done with in its old location -- i.e. it is on two servers at the - * same time -- all can work fine until the case where the region on the dying server decides to compact or otherwise - * change the region file set. The region in its new location will then get a surprise when it tries to do something - * w/ a file removed by the region in its old location on dying server. - * - *

Making a test for this case is a little tough in that even if a file is deleted up on the namenode, - * if the file was opened before the delete, it will continue to let reads happen until something changes the - * state of cached blocks in the dfsclient that was already open (a block from the deleted file is cleaned - * from the datanode by NN). - * - *

What we will do below is do an explicit check for existence on the files listed in the region that - * has had some files removed because of a compaction. This sort of hurry's along and makes certain what is a chance - * occurance. + * Test for the case where a regionserver going down has enough cycles to do damage to regions that + * have actually been assigned elsehwere. + *

+ * If we happen to assign a region before it fully done with in its old location -- i.e. it is on + * two servers at the same time -- all can work fine until the case where the region on the dying + * server decides to compact or otherwise change the region file set. The region in its new location + * will then get a surprise when it tries to do something w/ a file removed by the region in its old + * location on dying server. + *

+ * Making a test for this case is a little tough in that even if a file is deleted up on the + * namenode, if the file was opened before the delete, it will continue to let reads happen until + * something changes the state of cached blocks in the dfsclient that was already open (a block from + * the deleted file is cleaned from the datanode by NN). + *

+ * What we will do below is do an explicit check for existence on the files listed in the region + * that has had some files removed because of a compaction. This sort of hurry's along and makes + * certain what is a chance occurance. */ -@Category({MiscTests.class, MediumTests.class}) +@Category({MiscTests.class, LargeTests.class}) public class TestIOFencing { private static final Log LOG = LogFactory.getLog(TestIOFencing.class); static { @@ -334,23 +336,38 @@ public class TestIOFencing { while (compactingRegion.compactCount == 0) { Thread.sleep(1000); } - // The server we killed stays up until the compaction that was started before it was killed completes. In logs - // you should see the old regionserver now going down. + // The server we killed stays up until the compaction that was started before it was killed + // completes. In logs you should see the old regionserver now going down. LOG.info("Compaction finished"); // If we survive the split keep going... // Now we make sure that the region isn't totally confused. Load up more rows. - TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); + TEST_UTIL.loadNumericRows(table, FAMILY, FIRST_BATCH_COUNT, + FIRST_BATCH_COUNT + SECOND_BATCH_COUNT); admin.majorCompact(TABLE_NAME); startWaitTime = System.currentTimeMillis(); while (newRegion.compactCount == 0) { Thread.sleep(1000); - assertTrue("New region never compacted", System.currentTimeMillis() - startWaitTime < 180000); + assertTrue("New region never compacted", + System.currentTimeMillis() - startWaitTime < 180000); } - if(policy == MemoryCompactionPolicy.EAGER) { - assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= TEST_UTIL.countRows(table)); + int count; + for (int i = 0;; i++) { + try { + count = TEST_UTIL.countRows(table); + break; + } catch (DoNotRetryIOException e) { + // wait up to 30s + if (i >= 30 || !e.getMessage().contains("File does not exist")) { + throw e; + } + Thread.sleep(1000); + } + } + if (policy == MemoryCompactionPolicy.EAGER) { + assertTrue(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT >= count); } else { - assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, TEST_UTIL.countRows(table)); + assertEquals(FIRST_BATCH_COUNT + SECOND_BATCH_COUNT, count); } } finally { if (compactingRegion != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java index 68c4587e5a7..cb1c932cdcd 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/TestHFileBlock.java @@ -671,7 +671,8 @@ public class TestHFileBlock { while (System.currentTimeMillis() < endTime) { int blockId = rand.nextInt(NUM_TEST_BLOCKS); long offset = offsets.get(blockId); - boolean pread = rand.nextBoolean(); + // now we only support concurrent read with pread = true + boolean pread = true; boolean withOnDiskSize = rand.nextBoolean(); long expectedSize = (blockId == NUM_TEST_BLOCKS - 1 ? fileSize diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java index 51a2a978f80..403d880fd05 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/DelegatingKeyValueScanner.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; @@ -92,6 +93,11 @@ public class DelegatingKeyValueScanner implements KeyValueScanner { return delegate.isFileScanner(); } + @Override + public Path getFilePath() { + return delegate.getFilePath(); + } + @Override public boolean backwardSeek(Cell key) throws IOException { return delegate.backwardSeek(key); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java index d52c6c78a01..91b85d37dae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MockStoreFile.java @@ -144,11 +144,18 @@ public class MockStoreFile extends StoreFile { public void initReader() throws IOException { } + @Override + public StoreFileScanner getPreadScanner(boolean cacheBlocks, long readPt, long scannerOrder, + boolean canOptimizeForNonNullColumn) { + return getReader().getStoreFileScanner(cacheBlocks, true, false, readPt, scannerOrder, + canOptimizeForNonNullColumn); + } + @Override public StoreFileScanner getStreamScanner(boolean canUseDropBehind, boolean cacheBlocks, - boolean pread, boolean isCompaction, long readPt, long scannerOrder, - boolean canOptimizeForNonNullColumn) throws IOException { - return getReader().getStoreFileScanner(cacheBlocks, pread, isCompaction, readPt, scannerOrder, + boolean isCompaction, long readPt, long scannerOrder, boolean canOptimizeForNonNullColumn) + throws IOException { + return getReader().getStoreFileScanner(cacheBlocks, false, isCompaction, readPt, scannerOrder, canOptimizeForNonNullColumn); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java index 497fd0373ea..c28e48ba453 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestBlocksScanned.java @@ -96,14 +96,14 @@ public class TestBlocksScanned extends HBaseTestCase { CacheStats stats = new CacheConfig(TEST_UTIL.getConfiguration()).getBlockCache().getStats(); long before = stats.getHitCount() + stats.getMissCount(); // Do simple test of getting one row only first. - Scan scan = new Scan(Bytes.toBytes("aaa"), Bytes.toBytes("aaz")); + Scan scan = new Scan().withStartRow(Bytes.toBytes("aaa")).withStopRow(Bytes.toBytes("aaz")) + .setReadType(Scan.ReadType.PREAD); scan.addColumn(FAMILY, COL); scan.setMaxVersions(1); InternalScanner s = r.getScanner(scan); List results = new ArrayList<>(); - while (s.next(results)) - ; + while (s.next(results)); s.close(); int expectResultSize = 'z' - 'a'; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 9e90f3e39f9..04435dbed58 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -192,7 +192,7 @@ public class TestCompactingMemStore extends TestDefaultMemStore { Configuration conf = HBaseConfiguration.create(); for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, - KeepDeletedCells.FALSE, 0, this.memstore.getComparator()); + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 1bf6ea794e6..5f4c0aade37 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -212,9 +212,9 @@ public class TestCompaction { for (Store hstore: this.r.stores.values()) { HStore store = (HStore)hstore; ScanInfo old = store.getScanInfo(); - ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), - old.getMinVersions(), old.getMaxVersions(), ttl, - old.getKeepDeletedCells(), 0, old.getComparator()); + ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(), + old.getMaxVersions(), ttl, old.getKeepDeletedCells(), HConstants.DEFAULT_BLOCKSIZE, 0, + old.getComparator()); store.setScanInfo(si); } Thread.sleep(ttl); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java index 3c41fc5efd8..584285be083 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java @@ -160,8 +160,8 @@ public class TestDefaultCompactSelection extends TestCompactionPolicy { ScanInfo oldScanInfo = store.getScanInfo(); ScanInfo newScanInfo = new ScanInfo(oldScanInfo.getConfiguration(), oldScanInfo.getFamily(), oldScanInfo.getMinVersions(), oldScanInfo.getMaxVersions(), 600, - oldScanInfo.getKeepDeletedCells(), oldScanInfo.getTimeToPurgeDeletes(), - oldScanInfo.getComparator()); + oldScanInfo.getKeepDeletedCells(), oldScanInfo.getPreadMaxBytes(), + oldScanInfo.getTimeToPurgeDeletes(), oldScanInfo.getComparator()); store.setScanInfo(newScanInfo); // Do not compact empty store file List candidates = sfCreate(0); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 3acb48bfb1c..3b15ff3aba9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -162,9 +162,8 @@ public class TestDefaultMemStore { Scan scan = new Scan(); List result = new ArrayList<>(); Configuration conf = HBaseConfiguration.create(); - ScanInfo scanInfo = - new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, KeepDeletedCells.FALSE, 0, - this.memstore.getComparator()); + ScanInfo scanInfo = new ScanInfo(conf, null, 0, 1, HConstants.LATEST_TIMESTAMP, + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners); int count = 0; @@ -602,7 +601,7 @@ public class TestDefaultMemStore { Configuration conf = HBaseConfiguration.create(); for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) { ScanInfo scanInfo = new ScanInfo(conf, FAMILY, 0, 1, Integer.MAX_VALUE, - KeepDeletedCells.FALSE, 0, this.memstore.getComparator()); + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, this.memstore.getComparator()); ScanType scanType = ScanType.USER_SCAN; try (InternalScanner scanner = new StoreScanner(new Scan( Bytes.toBytes(startRowId)), scanInfo, scanType, null, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java index 9d00d3856f9..0b35f955a47 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMajorCompaction.java @@ -292,9 +292,9 @@ public class TestMajorCompaction { for (Store hstore : r.getStores()) { HStore store = ((HStore) hstore); ScanInfo old = store.getScanInfo(); - ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), - old.getMinVersions(), old.getMaxVersions(), ttl, - old.getKeepDeletedCells(), 0, old.getComparator()); + ScanInfo si = new ScanInfo(old.getConfiguration(), old.getFamily(), old.getMinVersions(), + old.getMaxVersions(), ttl, old.getKeepDeletedCells(), old.getPreadMaxBytes(), 0, + old.getComparator()); store.setScanInfo(si); } Thread.sleep(1000); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java index c1fd6a3376f..2dfdf5bac1a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestReversibleScanners.java @@ -264,8 +264,9 @@ public class TestReversibleScanners { BloomType.NONE, true); ScanType scanType = ScanType.USER_SCAN; - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, - Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR); + ScanInfo scanInfo = + new ScanInfo(TEST_UTIL.getConfiguration(), FAMILYNAME, 0, Integer.MAX_VALUE, Long.MAX_VALUE, + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); // Case 1.Test a full reversed scan Scan scan = new Scan(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index 3e2949cc645..524af34755c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -67,8 +67,8 @@ public class TestStoreScanner { private static final String CF_STR = "cf"; private static final byte [] CF = Bytes.toBytes(CF_STR); static Configuration CONF = HBaseConfiguration.create(); - private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, - Long.MAX_VALUE, KeepDeletedCells.FALSE, 0, CellComparator.COMPARATOR); + private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, + KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); private ScanType scanType = ScanType.USER_SCAN; /** @@ -829,8 +829,8 @@ public class TestStoreScanner { List scanners = scanFixture(kvs); Scan scan = new Scan(); scan.setMaxVersions(1); - ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0, - CellComparator.COMPARATOR); + ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); ScanType scanType = ScanType.USER_SCAN; try (StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners)) { List results = new ArrayList<>(); @@ -902,8 +902,8 @@ public class TestStoreScanner { Scan scan = new Scan(); scan.setMaxVersions(1); // scanner with ttl equal to 500 - ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, 0, - CellComparator.COMPARATOR); + ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, 1, 500, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.COMPARATOR); ScanType scanType = ScanType.USER_SCAN; try (StoreScanner scanner = new StoreScanner(scan, scanInfo, scanType, null, scanners)) { @@ -968,6 +968,7 @@ public class TestStoreScanner { 0 /* minVersions */, 2 /* maxVersions */, 500 /* ttl */, KeepDeletedCells.FALSE /* keepDeletedCells */, + HConstants.DEFAULT_BLOCKSIZE /* block size */, 200, /* timeToPurgeDeletes */ CellComparator.COMPARATOR); try (StoreScanner scanner = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java new file mode 100644 index 00000000000..fb978b15224 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ RegionServerTests.class, MediumTests.class }) +public class TestSwitchToStreamRead { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static TableName TABLE_NAME = TableName.valueOf("stream"); + + private static byte[] FAMILY = Bytes.toBytes("cf"); + + private static byte[] QUAL = Bytes.toBytes("cq"); + + private static String VALUE_PREFIX; + + private static HRegion REGION; + + @BeforeClass + public static void setUp() throws IOException { + UTIL.getConfiguration().setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 2048); + StringBuilder sb = new StringBuilder(256); + for (int i = 0; i < 255; i++) { + sb.append((char) ThreadLocalRandom.current().nextInt('A', 'z' + 1)); + } + VALUE_PREFIX = sb.append("-").toString(); + REGION = UTIL.createLocalHRegion( + new HTableDescriptor(TABLE_NAME).addFamily(new HColumnDescriptor(FAMILY).setBlocksize(1024)), + null, null); + for (int i = 0; i < 900; i++) { + REGION + .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); + } + REGION.flush(true); + for (int i = 900; i < 1000; i++) { + REGION + .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i))); + } + } + + @AfterClass + public static void tearDown() throws IOException { + REGION.close(true); + UTIL.cleanupTestDir(); + } + + @Test + public void test() throws IOException { + try (RegionScanner scanner = REGION.getScanner(new Scan())) { + StoreScanner storeScanner = (StoreScanner) ((RegionScannerImpl) scanner) + .getStoreHeapForTesting().getCurrentForTesting(); + for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { + if (kvs instanceof StoreFileScanner) { + StoreFileScanner sfScanner = (StoreFileScanner) kvs; + // starting from pread so we use shared reader here. + assertTrue(sfScanner.getReader().shared); + } + } + List cells = new ArrayList<>(); + for (int i = 0; i < 500; i++) { + assertTrue(scanner.next(cells)); + Result result = Result.create(cells); + assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); + cells.clear(); + } + for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) { + if (kvs instanceof StoreFileScanner) { + StoreFileScanner sfScanner = (StoreFileScanner) kvs; + // we should have convert to use stream read now. + assertFalse(sfScanner.getReader().shared); + } + } + for (int i = 500; i < 1000; i++) { + assertEquals(i != 999, scanner.next(cells)); + Result result = Result.create(cells); + assertEquals(VALUE_PREFIX + i, Bytes.toString(result.getValue(FAMILY, QUAL))); + cells.clear(); + } + } + // make sure all scanners are closed. + for (StoreFile sf : REGION.getStore(FAMILY).getStorefiles()) { + assertFalse(sf.isReferencedInReads()); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java index af8c27dc9d3..73c92e42b94 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestCompactionScanQueryMatcher.java @@ -73,8 +73,8 @@ public class TestCompactionScanQueryMatcher extends AbstractTestScanQueryMatcher throws IOException { long now = EnvironmentEdgeManager.currentTime(); // Set time to purge deletes to negative value to avoid it ever happening. - ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, -1L, - rowComparator); + ScanInfo scanInfo = new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, -1L, rowComparator); CompactionScanQueryMatcher qm = CompactionScanQueryMatcher.create(scanInfo, ScanType.COMPACT_RETAIN_DELETES, Long.MAX_VALUE, HConstants.OLDEST_TIMESTAMP, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java index b4e4311ed09..f3cf604196f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/querymatcher/TestUserScanQueryMatcher.java @@ -54,7 +54,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { long now = EnvironmentEdgeManager.currentTime(); // Do with fam2 which has a col2 qualifier. UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), + new ScanInfo(this.conf, fam2, 10, 1, ttl, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), get.getFamilyMap().get(fam2), now - ttl, now, null); Cell kv = new KeyValue(row1, fam2, col2, 1, data); Cell cell = CellUtil.createLastOnRowCol(kv); @@ -79,8 +80,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { long now = EnvironmentEdgeManager.currentTime(); // 2,4,5 - UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), + UserScanQueryMatcher qm = UserScanQueryMatcher.create( + scan, new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), get.getFamilyMap().get(fam2), now - ttl, now, null); List memstore = new ArrayList<>(6); @@ -122,9 +124,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { expected.add(ScanQueryMatcher.MatchCode.DONE); long now = EnvironmentEdgeManager.currentTime(); - UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null, - now - ttl, now, null); + UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1, + ttl, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), + null, now - ttl, now, null); List memstore = new ArrayList<>(6); memstore.add(new KeyValue(row1, fam2, col1, 1, data)); @@ -168,7 +170,8 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { long now = EnvironmentEdgeManager.currentTime(); UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), + new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, + HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now, null); KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data), @@ -209,9 +212,9 @@ public class TestUserScanQueryMatcher extends AbstractTestScanQueryMatcher { ScanQueryMatcher.MatchCode.DONE }; long now = EnvironmentEdgeManager.currentTime(); - UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, - new ScanInfo(this.conf, fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null, - now - testTTL, now, null); + UserScanQueryMatcher qm = UserScanQueryMatcher.create(scan, new ScanInfo(this.conf, fam2, 0, 1, + testTTL, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, rowComparator), + null, now - testTTL, now, null); KeyValue[] kvs = new KeyValue[] { new KeyValue(row1, fam2, col1, now - 100, data), new KeyValue(row1, fam2, col2, now - 50, data), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index 27e93a0ef46..720ad291db3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -245,11 +245,10 @@ public class TestCoprocessorScanPolicy { Integer newVersions = versions.get(store.getTableName()); ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), - family.getName(), family.getMinVersions(), - newVersions == null ? family.getMaxVersions() : newVersions, + ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), + family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); return new StoreScanner(store, scanInfo, scan, scanners, @@ -266,11 +265,10 @@ public class TestCoprocessorScanPolicy { Integer newVersions = versions.get(store.getTableName()); ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), - family.getName(), family.getMinVersions(), - newVersions == null ? family.getMaxVersions() : newVersions, + ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), + family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); Scan scan = new Scan(); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); return new StoreScanner(store, scanInfo, scan, scanners, scanType, @@ -287,11 +285,10 @@ public class TestCoprocessorScanPolicy { Integer newVersions = versions.get(store.getTableName()); ScanInfo oldSI = store.getScanInfo(); HColumnDescriptor family = store.getFamily(); - ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), - family.getName(), family.getMinVersions(), - newVersions == null ? family.getMaxVersions() : newVersions, + ScanInfo scanInfo = new ScanInfo(TEST_UTIL.getConfiguration(), family.getName(), + family.getMinVersions(), newVersions == null ? family.getMaxVersions() : newVersions, newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), - oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); + family.getBlocksize(), oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); return new StoreScanner(store, scanInfo, scan, targetCols, readPt); } else { return s;