HBASE-8541 implement flush-into-stripes in stripe compactions

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1539211 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-11-06 01:49:36 +00:00
parent 940f4a43c0
commit 18ecb48662
14 changed files with 464 additions and 140 deletions

View File

@ -68,9 +68,9 @@ class DefaultStoreFileManager implements StoreFileManager {
}
@Override
public void insertNewFile(StoreFile sf) {
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
newFiles.add(sf);
newFiles.addAll(sfs);
sortAndSetStoreFiles(newFiles);
}

View File

@ -53,13 +53,7 @@ public class DefaultStoreFlusher extends StoreFlusher {
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
KeyValueScanner memstoreScanner =
new CollectionBackedScanner(snapshot, store.getComparator());
InternalScanner scanner = preCreateCoprocScanner(memstoreScanner);
if (scanner == null) {
scanner = createStoreScanner(smallestReadPoint, memstoreScanner);
}
scanner = postCreateCoprocScanner(scanner);
InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}

View File

@ -611,7 +611,7 @@ public class HStore implements Store {
// Append the new storefile into the list
this.lock.writeLock().lock();
try {
this.storeEngine.getStoreFileManager().insertNewFile(sf);
this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@ -852,9 +852,7 @@ public class HStore implements Store {
final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
this.lock.writeLock().lock();
try {
for (StoreFile sf : sfs) {
this.storeEngine.getStoreFileManager().insertNewFile(sf);
}
this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
this.memstore.clearSnapshot(set);
} finally {
// We need the lock, as long as we are updating the storeFiles
@ -1122,15 +1120,15 @@ public class HStore implements Store {
.append(" to execute.");
LOG.info(message.toString());
if (LOG.isTraceEnabled()) {
int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
long resultSize = 0;
for (StoreFile sf : sfs) {
resultSize += sf.getReader().length();
}
String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
+ "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
+ cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
LOG.trace(traceMessage);
int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
long resultSize = 0;
for (StoreFile sf : sfs) {
resultSize += sf.getReader().length();
}
String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
+ "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
+ cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
LOG.trace(traceMessage);
}
}

View File

@ -47,10 +47,10 @@ public interface StoreFileManager {
void loadFiles(List<StoreFile> storeFiles);
/**
* Adds new file, either for from MemStore flush or bulk insert, into the structure.
* Adds new files, either for from MemStore flush or bulk insert, into the structure.
* @param sf New store file.
*/
void insertNewFile(StoreFile sf);
void insertNewFiles(Collection<StoreFile> sfs) throws IOException;
/**
* Adds compaction results into the structure.

View File

@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
/**
* Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
@ -77,31 +78,27 @@ abstract class StoreFlusher {
writer.close();
}
/** Calls coprocessor to create a flush scanner based on memstore scanner */
protected InternalScanner preCreateCoprocScanner(
KeyValueScanner memstoreScanner) throws IOException {
if (store.getCoprocessorHost() != null) {
return store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
}
return null;
}
/** Creates the default flush scanner based on memstore scanner */
protected InternalScanner createStoreScanner(long smallestReadPoint,
KeyValueScanner memstoreScanner) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getScanInfo().getMaxVersions());
return new StoreScanner(store, store.getScanInfo(), scan,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
/**
* Calls coprocessor to create a scanner based on default flush scanner
* @return new or default scanner; if null, flush should not proceed.
* Creates the scanner for flushing snapshot. Also calls coprocessors.
* @return The scanner; null if coprocessor is canceling the flush.
*/
protected InternalScanner postCreateCoprocScanner(InternalScanner scanner)
throws IOException {
protected InternalScanner createScanner(SortedSet<KeyValue> snapshot,
long smallestReadPoint) throws IOException {
KeyValueScanner memstoreScanner =
new CollectionBackedScanner(snapshot, store.getComparator());
InternalScanner scanner = null;
if (store.getCoprocessorHost() != null) {
scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
}
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(store.getScanInfo().getMaxVersions());
scanner = new StoreScanner(store, store.getScanInfo(), scan,
Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
}
assert scanner != null;
if (store.getCoprocessorHost() != null) {
return store.getCoprocessorHost().preFlush(store, scanner);
}
@ -114,7 +111,7 @@ abstract class StoreFlusher {
* @param sink Sink to write data to. Could be StoreFile.Writer.
* @param smallestReadPoint Smallest read point used for the flush.
* @return Bytes flushed.
s */
*/
protected long performFlush(InternalScanner scanner,
Compactor.CellSink sink, long smallestReadPoint) throws IOException {
int compactionKVMax =

View File

@ -46,6 +46,9 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
/** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
protected StoreScanner sourceScanner;
/** Whether to write stripe metadata */
private boolean doWriteStripeMetadata = true;
public interface WriterFactory {
public StoreFile.Writer createWriter() throws IOException;
}
@ -63,18 +66,24 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
this.comparator = comparator;
}
public void setNoStripeMetadata() {
this.doWriteStripeMetadata = false;
}
public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
assert this.existingWriters != null;
commitWritersInternal();
assert this.boundaries.size() == (this.existingWriters.size() + 1);
LOG.debug("Writing out metadata for " + this.existingWriters.size() + " writers");
LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
+ "riting out metadata for " + this.existingWriters.size() + " writers");
List<Path> paths = new ArrayList<Path>();
for (int i = 0; i < this.existingWriters.size(); ++i) {
StoreFile.Writer writer = this.existingWriters.get(i);
if (writer == null) continue; // writer was skipped due to 0 KVs
writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
if (doWriteStripeMetadata) {
writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
}
writer.appendMetadata(maxSeqId, isMajor);
paths.add(writer.getPath());
writer.close();
@ -109,8 +118,8 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
if (StripeStoreFileManager.OPEN_KEY != left &&
comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
String error = "The first row is lower than the left boundary of ["
+ Bytes.toString(left) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
+ "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
LOG.error(error);
throw new IOException(error);
}

View File

@ -56,6 +56,9 @@ public class StripeStoreConfig {
to get this count from the outset and prevent unnecessary splitting. */
public static final String INITIAL_STRIPE_COUNT_KEY = "hbase.store.stripe.initialStripeCount";
/** Whether to flush memstore to L0 files, or directly to stripes. */
public static final String FLUSH_TO_L0_KEY = "hbase.store.stripe.compaction.flushToL0";
/** When splitting region, the maximum size imbalance to allow in an attempt to split at a
stripe boundary, so that no files go to both regions. Most users won't need to change that. */
public static final String MAX_REGION_SPLIT_IMBALANCE_KEY =
@ -70,36 +73,46 @@ public class StripeStoreConfig {
private final int initialCount;
private final long sizeToSplitAt;
private final float splitPartCount;
private final boolean flushIntoL0;
private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount
private final double EPSILON = 0.001; // good enough for this, not a real epsilon.
private static final double EPSILON = 0.001; // good enough for this, not a real epsilon.
public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4);
this.stripeCompactMinFiles = config.getInt(MIN_FILES_KEY, 3);
this.stripeCompactMaxFiles = config.getInt(MAX_FILES_KEY, 10);
this.maxRegionSplitImbalance = getFloat(config, MAX_REGION_SPLIT_IMBALANCE_KEY, 1.5f, true);
this.flushIntoL0 = config.getBoolean(FLUSH_TO_L0_KEY, false);
this.splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2.0f, true);
float splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2f, true);
if (Math.abs(splitPartCount - 1.0) < EPSILON) {
throw new RuntimeException("Split part count cannot be 1: " + this.splitPartCount);
LOG.error("Split part count cannot be 1 (" + this.splitPartCount + "), using the default");
splitPartCount = 2f;
}
// TODO: change when no L0.
this.splitPartCount = splitPartCount;
// Arbitrary default split size - 4 times the size of one L0 compaction.
// If we flush into L0 there's no split compaction, but for default value it is ok.
double flushSize = sci.getMemstoreFlushSize();
if (flushSize == 0) {
flushSize = 128 * 1024 * 1024;
}
long defaultSplitSize = (long)(flushSize * getLevel0MinFiles() * 4 * splitPartCount);
this.sizeToSplitAt = config.getLong(SIZE_TO_SPLIT_KEY, defaultSplitSize);
this.initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1);
int initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1);
if (initialCount == 0) {
LOG.error("Initial stripe count is 0, using the default");
initialCount = 1;
}
this.initialCount = initialCount;
this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount);
}
private static float getFloat(
Configuration config, String key, float defaultValue, boolean moreThanOne) {
float value = config.getFloat(key, defaultValue);
if (value == 0) {
LOG.warn(String.format("%s is set to 0; using default value of %f", key, defaultValue));
if (value < EPSILON) {
LOG.warn(String.format(
"%s is set to 0 or negative; using default value of %f", key, defaultValue));
value = defaultValue;
} else if ((value > 1f) != moreThanOne) {
value = 1f / value;
@ -123,6 +136,10 @@ public class StripeStoreConfig {
return stripeCompactMaxFiles;
}
public boolean isUsingL0Flush() {
return flushIntoL0;
}
public long getSplitSize() {
return sizeToSplitAt;
}

View File

@ -38,7 +38,7 @@ import com.google.common.base.Preconditions;
* The storage engine that implements the stripe-based store/compaction scheme.
*/
@InterfaceAudience.Private
public class StripeStoreEngine extends StoreEngine<DefaultStoreFlusher,
public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
StripeCompactionPolicy, StripeCompactor, StripeStoreFileManager> {
static final Log LOG = LogFactory.getLog(StripeStoreEngine.class);
private StripeStoreConfig config;
@ -58,8 +58,9 @@ public class StripeStoreEngine extends StoreEngine<DefaultStoreFlusher,
Configuration conf, Store store, KVComparator comparator) throws IOException {
this.config = new StripeStoreConfig(conf, store);
this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
this.storeFlusher = new DefaultStoreFlusher(conf, store);
this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
this.storeFlusher = new StripeStoreFlusher(
conf, store, this.compactionPolicy, this.storeFileManager);
this.compactor = new StripeCompactor(conf, store);
}

View File

@ -24,8 +24,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@ -41,6 +43,8 @@ import org.apache.hadoop.hbase.util.ConcatenatedLists;
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
/**
* Stripe implementation of StoreFileManager.
@ -136,15 +140,10 @@ public class StripeStoreFileManager
}
@Override
public void insertNewFile(StoreFile sf) {
LOG.debug("New level 0 file: " + sf);
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(state.level0Files);
insertFileIntoStripe(newFiles, sf);
ensureLevel0Metadata(sf);
this.state.level0Files = ImmutableList.copyOf(newFiles);
ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(state.allFilesCached);
newAllFiles.add(sf);
this.state.allFilesCached = ImmutableList.copyOf(newAllFiles);
public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
cmc.mergeResults(null, sfs);
debugDumpState("Added new files");
}
@Override
@ -304,7 +303,7 @@ public class StripeStoreFileManager
+ " files replaced by " + results.size());
// In order to be able to fail in the middle of the operation, we'll operate on lazy
// copies and apply the result at the end.
CompactionResultsMergeCopy cmc = new CompactionResultsMergeCopy();
CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
cmc.mergeResults(compactedFiles, results);
debugDumpState("Merged compaction results");
}
@ -628,11 +627,11 @@ public class StripeStoreFileManager
}
/**
* Non-static helper class for merging compaction results.
* Since we want to merge them atomically (more or less), it operates on lazy copies, and
* then applies copies to real lists as necessary.
* Non-static helper class for merging compaction or flush results.
* Since we want to merge them atomically (more or less), it operates on lazy copies,
* then creates a new state object and puts it in place.
*/
private class CompactionResultsMergeCopy {
private class CompactionOrFlushMergeCopy {
private ArrayList<List<StoreFile>> stripeFiles = null;
private ArrayList<StoreFile> level0Files = null;
private ArrayList<byte[]> stripeEndRows = null;
@ -641,11 +640,13 @@ public class StripeStoreFileManager
private Collection<StoreFile> results = null;
private List<StoreFile> l0Results = new ArrayList<StoreFile>();
private final boolean isFlush;
public CompactionResultsMergeCopy() {
public CompactionOrFlushMergeCopy(boolean isFlush) {
// Create a lazy mutable copy (other fields are so lazy they start out as nulls).
this.stripeFiles = new ArrayList<List<StoreFile>>(
StripeStoreFileManager.this.state.stripeFiles);
this.isFlush = isFlush;
}
public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
@ -654,8 +655,8 @@ public class StripeStoreFileManager
this.compactedFiles = compactedFiles;
this.results = results;
// Do logical processing.
removeCompactedFiles();
TreeMap<byte[], StoreFile> newStripes = processCompactionResults();
if (!isFlush) removeCompactedFiles();
TreeMap<byte[], StoreFile> newStripes = processResults();
if (newStripes != null) {
processNewCandidateStripes(newStripes);
}
@ -681,7 +682,7 @@ public class StripeStoreFileManager
}
List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
newAllFiles.removeAll(compactedFiles);
if (!isFlush) newAllFiles.removeAll(compactedFiles);
newAllFiles.addAll(results);
newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
return newState;
@ -689,12 +690,16 @@ public class StripeStoreFileManager
private void updateMetadataMaps() {
StripeStoreFileManager parent = StripeStoreFileManager.this;
for (StoreFile sf : this.compactedFiles) {
parent.fileStarts.remove(sf);
parent.fileEnds.remove(sf);
if (!isFlush) {
for (StoreFile sf : this.compactedFiles) {
parent.fileStarts.remove(sf);
parent.fileEnds.remove(sf);
}
}
for (StoreFile sf : this.l0Results) {
parent.ensureLevel0Metadata(sf);
if (this.l0Results != null) {
for (StoreFile sf : this.l0Results) {
parent.ensureLevel0Metadata(sf);
}
}
}
@ -729,12 +734,14 @@ public class StripeStoreFileManager
* or to the list of new candidate stripes.
* @return New candidate stripes.
*/
private TreeMap<byte[], StoreFile> processCompactionResults() throws IOException {
private TreeMap<byte[], StoreFile> processResults() throws IOException {
TreeMap<byte[], StoreFile> newStripes = null;
for (StoreFile sf : this.results) {
byte[] startRow = startOf(sf), endRow = endOf(sf);
if (isInvalid(endRow) || isInvalid(startRow)) {
LOG.warn("The newly compacted files doesn't have stripe rows set: " + sf.getPath());
if (!isFlush) {
LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
}
insertFileIntoStripe(getLevel0Copy(), sf);
this.l0Results.add(sf);
continue;
@ -775,7 +782,7 @@ public class StripeStoreFileManager
int stripeIndex = findStripeIndexByEndRow(oldEndRow);
if (stripeIndex < 0) {
throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
+ " to a known stripe (end row + [" + Bytes.toString(oldEndRow) + "])");
+ " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
}
source = getStripeCopy(stripeIndex);
}
@ -803,6 +810,8 @@ public class StripeStoreFileManager
throw new IOException("Newly created stripes do not cover the entire key space.");
}
boolean canAddNewStripes = true;
Collection<StoreFile> filesForL0 = null;
if (hasStripes) {
// Determine which stripes will need to be removed because they conflict with new stripes.
// The new boundaries should match old stripe boundaries, so we should get exact matches.
@ -815,20 +824,49 @@ public class StripeStoreFileManager
}
int removeTo = findStripeIndexByEndRow(lastEndRow);
if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
// Remove old empty stripes.
int originalCount = this.stripeFiles.size();
// See if there are files in the stripes we are trying to replace.
ArrayList<StoreFile> conflictingFiles = new ArrayList<StoreFile>();
for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
if (!this.stripeFiles.get(removeIndex).isEmpty()) {
throw new IOException("Compaction intends to create a new stripe that replaces an"
+ " existing one, but the latter contains some files.");
conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
}
if (!conflictingFiles.isEmpty()) {
// This can be caused by two things - concurrent flush into stripes, or a bug.
// Unfortunately, we cannot tell them apart without looking at timing or something
// like that. We will assume we are dealing with a flush and dump it into L0.
if (isFlush) {
long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
LOG.warn("Stripes were created by a flush, but results of size " + newSize
+ " cannot be added because the stripes have changed");
canAddNewStripes = false;
filesForL0 = newStripes.values();
} else {
long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
+ " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
filesForL0 = conflictingFiles;
}
if (removeIndex != originalCount - 1) {
this.stripeEndRows.remove(removeIndex);
if (filesForL0 != null) {
for (StoreFile sf : filesForL0) {
insertFileIntoStripe(getLevel0Copy(), sf);
}
l0Results.addAll(filesForL0);
}
}
if (canAddNewStripes) {
// Remove old empty stripes.
int originalCount = this.stripeFiles.size();
for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
if (removeIndex != originalCount - 1) {
this.stripeEndRows.remove(removeIndex);
}
this.stripeFiles.remove(removeIndex);
}
this.stripeFiles.remove(removeIndex);
}
}
if (!canAddNewStripes) return; // Files were already put into L0.
// Now, insert new stripes. The total ranges match, so we can insert where we removed.
byte[] previousEndRow = null;
int insertAt = removeFrom;
@ -838,7 +876,8 @@ public class StripeStoreFileManager
assert !isOpen(previousEndRow);
byte[] startRow = startOf(newStripe.getValue());
if (!rowEquals(previousEndRow, startRow)) {
throw new IOException("The new stripes produced by compaction are not contiguous");
throw new IOException("The new stripes produced by "
+ (isFlush ? "flush" : "compaction") + " are not contiguous");
}
}
// Add the new stripe.

View File

@ -0,0 +1,171 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.util.CollectionBackedScanner;
import com.google.common.annotations.VisibleForTesting;
/**
* Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
* into separate striped files, avoiding L0.
*/
public class StripeStoreFlusher extends StoreFlusher {
private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
private final Object flushLock = new Object();
private final StripeCompactionPolicy policy;
private final StripeCompactionPolicy.StripeInformationProvider stripes;
public StripeStoreFlusher(Configuration conf, Store store,
StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
super(conf, store);
this.policy = policy;
this.stripes = stripes;
}
@Override
public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status)
throws IOException {
List<Path> result = null;
int kvCount = snapshot.size();
if (kvCount == 0) return result; // don't flush if there are no entries
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
// Let policy select flush method.
StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
long flushedBytes = 0;
boolean success = false;
StripeMultiFileWriter mw = null;
try {
mw = req.createWriter(); // Writer according to the policy.
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
synchronized (flushLock) {
flushedBytes = performFlush(scanner, mw, smallestReadPoint);
result = mw.commitWriters(cacheFlushSeqNum, false);
success = true;
}
} finally {
if (!success && (mw != null)) {
result.clear();
for (Path leftoverFile : mw.abortWriters()) {
try {
store.getFileSystem().delete(leftoverFile, false);
} catch (Exception e) {
LOG.error("Failed to delete a file after failed flush: " + e);
}
}
}
flushedSize.set(flushedBytes);
try {
scanner.close();
} catch (IOException ex) {
LOG.warn("Failed to close flush scanner, ignoring", ex);
}
}
return result;
}
private StripeMultiFileWriter.WriterFactory createWriterFactory(
final TimeRangeTracker tracker, final long kvCount) {
return new StripeMultiFileWriter.WriterFactory() {
@Override
public Writer createWriter() throws IOException {
StoreFile.Writer writer = store.createWriterInTmp(
kvCount, store.getFamily().getCompression(), false, true, true);
writer.setTimeRangeTracker(tracker);
return writer;
}
};
}
/** Stripe flush request wrapper that writes a non-striped file. */
public static class StripeFlushRequest {
@VisibleForTesting
public StripeMultiFileWriter createWriter() throws IOException {
StripeMultiFileWriter writer =
new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
writer.setNoStripeMetadata();
return writer;
}
}
/** Stripe flush request wrapper based on boundaries. */
public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
private final List<byte[]> targetBoundaries;
/** @param targetBoundaries New files should be written with these boundaries. */
public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
this.targetBoundaries = targetBoundaries;
}
@Override
public StripeMultiFileWriter createWriter() throws IOException {
return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
}
}
/** Stripe flush request wrapper based on size. */
public static class SizeStripeFlushRequest extends StripeFlushRequest {
private final int targetCount;
private final long targetKvs;
/**
* @param targetCount The maximum number of stripes to flush into.
* @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
* total number of kvs, all the overflow data goes into the last stripe.
*/
public SizeStripeFlushRequest(int targetCount, long targetKvs) {
this.targetCount = targetCount;
this.targetKvs = targetKvs;
}
@Override
public StripeMultiFileWriter createWriter() throws IOException {
return new StripeMultiFileWriter.SizeMultiWriter(
this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
}
}
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -75,9 +76,24 @@ public class StripeCompactionPolicy extends CompactionPolicy {
if (si.getStripeCount() > 0) {
return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
}
int initialCount = this.config.getInitialCount();
long targetKvs = estimateTargetKvs(request.getFiles(), initialCount).getFirst();
return new SplitStripeCompactionRequest(request, OPEN_KEY, OPEN_KEY, targetKvs);
Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
request.getFiles(), this.config.getInitialCount());
return new SplitStripeCompactionRequest(
request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
}
public StripeStoreFlusher.StripeFlushRequest selectFlush(
StripeInformationProvider si, int kvCount) {
if (this.config.isUsingL0Flush()) {
return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request.
}
if (si.getStripeCount() == 0) {
// No stripes - start with the requisite count, derive KVs per stripe.
int initialCount = this.config.getInitialCount();
return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
}
// There are stripes - do according to the boundaries.
return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
}
public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
@ -341,7 +357,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
return totalSize;
}
private static long getTotalFileSize(final Collection<StoreFile> candidates) {
public static long getTotalFileSize(final Collection<StoreFile> candidates) {
long totalSize = 0;
for (StoreFile storeFile : candidates) {
totalSize += storeFile.getReader().length();

View File

@ -125,7 +125,7 @@ public class TestStripeCompactor {
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo);
writers.verifyKvs(output, allFiles);
writers.verifyKvs(output, allFiles, true);
if (allFiles) {
assertEquals(output.length, paths.size());
writers.verifyBoundaries(boundaries);
@ -162,7 +162,7 @@ public class TestStripeCompactor {
List<Path> paths = sc.compact(
createDummyRequest(), targetCount, targetSize, left, right, null, null);
assertEquals(output.length, paths.size());
writers.verifyKvs(output, true);
writers.verifyKvs(output, true, true);
List<byte[]> boundaries = new ArrayList<byte[]>();
boundaries.add(left);
for (int i = 1; i < output.length; ++i) {
@ -242,7 +242,8 @@ public class TestStripeCompactor {
}
// StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
private static class StoreFileWritersCapture implements Answer<StoreFile.Writer> {
public static class StoreFileWritersCapture implements
Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
public static class Writer {
public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@ -251,7 +252,7 @@ public class TestStripeCompactor {
private List<Writer> writers = new ArrayList<Writer>();
@Override
public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
public StoreFile.Writer createWriter() throws IOException {
final Writer realWriter = new Writer();
writers.add(realWriter);
StoreFile.Writer writer = mock(StoreFile.Writer.class);
@ -267,7 +268,12 @@ public class TestStripeCompactor {
return writer;
}
public void verifyKvs(KeyValue[][] kvss, boolean allFiles) {
@Override
public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
return createWriter();
}
public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
if (allFiles) {
assertEquals(kvss.length, writers.size());
}
@ -276,8 +282,13 @@ public class TestStripeCompactor {
KeyValue[] kvs = kvss[i];
if (kvs != null) {
Writer w = writers.get(i - skippedWriters);
assertNotNull(w.data.get(STRIPE_START_KEY));
assertNotNull(w.data.get(STRIPE_END_KEY));
if (requireMetadata) {
assertNotNull(w.data.get(STRIPE_START_KEY));
assertNotNull(w.data.get(STRIPE_END_KEY));
} else {
assertNull(w.data.get(STRIPE_START_KEY));
assertNull(w.data.get(STRIPE_END_KEY));
}
assertEquals(kvs.length, w.kvs.size());
for (int j = 0; j < kvs.length; ++j) {
assertEquals(kvs[j], w.kvs.get(j));

View File

@ -83,7 +83,7 @@ public class TestStripeStoreFileManager {
public void testInsertFilesIntoL0() throws Exception {
StripeStoreFileManager manager = createManager();
MockStoreFile sf = createFile();
manager.insertNewFile(sf);
manager.insertNewFiles(al(sf));
assertEquals(1, manager.getStorefileCount());
Collection<StoreFile> filesForGet = manager.getFilesForScanOrGet(true, KEY_A, KEY_A);
assertEquals(1, filesForGet.size());
@ -99,8 +99,8 @@ public class TestStripeStoreFileManager {
@Test
public void testClearFiles() throws Exception {
StripeStoreFileManager manager = createManager();
manager.insertNewFile(createFile());
manager.insertNewFile(createFile());
manager.insertNewFiles(al(createFile()));
manager.insertNewFiles(al(createFile()));
manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B),
createFile(KEY_B, OPEN_KEY)));
assertEquals(4, manager.getStorefileCount());
@ -120,8 +120,8 @@ public class TestStripeStoreFileManager {
public void testRowKeyBefore() throws Exception {
StripeStoreFileManager manager = createManager();
StoreFile l0File = createFile(), l0File2 = createFile();
manager.insertNewFile(l0File);
manager.insertNewFile(l0File2);
manager.insertNewFiles(al(l0File));
manager.insertNewFiles(al(l0File2));
// Get candidate files.
Iterator<StoreFile> sfs = manager.getCandidateFilesForRowKeyBefore(KV_B);
sfs.next();
@ -174,8 +174,8 @@ public class TestStripeStoreFileManager {
// If there are no stripes, should pick midpoint from the biggest file in L0.
MockStoreFile sf5 = createFile(5, 0);
sf5.splitPoint = new byte[1];
manager.insertNewFile(sf5);
manager.insertNewFile(createFile(1, 0));
manager.insertNewFiles(al(sf5));
manager.insertNewFiles(al(createFile(1, 0)));
assertEquals(sf5.splitPoint, manager.getSplitPoint());
// Same if there's one stripe but the biggest file is still in L0.
@ -259,7 +259,7 @@ public class TestStripeStoreFileManager {
// Populate one L0 file.
MockStoreFile sf0 = createFile();
manager.insertNewFile(sf0);
manager.insertNewFiles(al(sf0));
verifyGetAndScanScenario(manager, null, null, sf0);
verifyGetAndScanScenario(manager, null, KEY_C, sf0);
verifyGetAndScanScenario(manager, KEY_B, null, sf0);
@ -356,14 +356,11 @@ public class TestStripeStoreFileManager {
}
@Test
@SuppressWarnings("unchecked")
public void testAddingCompactionResults() throws Exception {
StripeStoreFileManager manager = createManager();
// First, add some L0 files and "compact" one with new stripe creation.
StoreFile sf_L0_0a = createFile();
StoreFile sf_L0_0b = createFile();
manager.insertNewFile(sf_L0_0a);
manager.insertNewFile(sf_L0_0b);
StoreFile sf_L0_0a = createFile(), sf_L0_0b = createFile();
manager.insertNewFiles(al(sf_L0_0a, sf_L0_0b));
// Try compacting with invalid new branches (gaps, overlaps) - no effect.
verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B)));
@ -384,7 +381,7 @@ public class TestStripeStoreFileManager {
StoreFile sf_L0_1 = createFile();
StoreFile sf_i2B_1 = createFile(OPEN_KEY, KEY_B);
StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
manager.insertNewFile(sf_L0_1);
manager.insertNewFiles(al(sf_L0_1));
manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
@ -400,27 +397,21 @@ public class TestStripeStoreFileManager {
manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
// Try to rebalance two stripes, but don't take all files from them - no effect.
// Rebalance two stripes.
StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
ArrayList<StoreFile> compacted3 = al();
verifyInvalidCompactionScenario(manager, al(sf_B2C_0, sf_C2i_0), al(sf_B2D_4, sf_D2i_4));
// Rebalance two stripes correctly.
manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
// Split the first stripe.
StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
ArrayList<StoreFile> compacted4 = al(createFile(OPEN_KEY, KEY_A), createFile(KEY_A, KEY_B));
manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
// Split the middle stripe.
StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
ArrayList<StoreFile> compacted5 = al(createFile(KEY_B, KEY_C), createFile(KEY_C, KEY_D));
manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
@ -429,14 +420,6 @@ public class TestStripeStoreFileManager {
manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
// Try various range mismatch cases in replaced and new data - no effect.
ArrayList<StoreFile> tmp = al(sf_A2C_7, sf_C2D_6); // [A, C)
verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, KEY_C)));
verifyInvalidCompactionScenario(manager, tmp, al(createFile(OPEN_KEY, KEY_D)));
verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, OPEN_KEY)));
verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, KEY_B)));
verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, keyAfter(KEY_B))));
// Merge lower half.
StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
@ -448,14 +431,40 @@ public class TestStripeStoreFileManager {
verifyAllFiles(manager, al(sf_i2i_9));
}
@Test
public void testCompactionAndFlushConflict() throws Exception {
// Add file flush into stripes
StripeStoreFileManager sfm = createManager();
assertEquals(0, sfm.getStripeCount());
StoreFile sf_i2c = createFile(OPEN_KEY, KEY_C), sf_c2i = createFile(KEY_C, OPEN_KEY);
sfm.insertNewFiles(al(sf_i2c, sf_c2i));
assertEquals(2, sfm.getStripeCount());
// Now try to add conflicting flush - should throw.
StoreFile sf_i2d = createFile(OPEN_KEY, KEY_D), sf_d2i = createFile(KEY_D, OPEN_KEY);
sfm.insertNewFiles(al(sf_i2d, sf_d2i));
assertEquals(2, sfm.getStripeCount());
assertEquals(2, sfm.getLevel0Files().size());
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i);
// Remove these files.
sfm.addCompactionResults(al(sf_i2d, sf_d2i), al());
assertEquals(0, sfm.getLevel0Files().size());
// Add another file to stripe; then "rebalance" stripes w/o it - the file, which was
// presumably flushed during compaction, should go to L0.
StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C);
sfm.insertNewFiles(al(sf_i2c_2));
sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i));
assertEquals(1, sfm.getLevel0Files().size());
verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2);
}
@Test
public void testEmptyResultsForStripes() throws Exception {
// Test that we can compact L0 into a subset of stripes.
StripeStoreFileManager manager = createManager();
StoreFile sf0a = createFile();
StoreFile sf0b = createFile();
manager.insertNewFile(sf0a);
manager.insertNewFile(sf0b);
manager.insertNewFiles(al(sf0a));
manager.insertNewFiles(al(sf0b));
ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B),
createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
manager.addCompactionResults(al(sf0a), compacted);
@ -491,7 +500,7 @@ public class TestStripeStoreFileManager {
conf.setInt("hbase.hstore.blockingStoreFiles", limit);
StripeStoreFileManager sfm = createManager(al(), conf);
for (int i = 0; i < l0Files; ++i) {
sfm.insertNewFile(createFile());
sfm.insertNewFiles(al(createFile()));
}
for (int i = 0; i < filesInStripe; ++i) {
ArrayList<StoreFile> stripe = new ArrayList<StoreFile>();

View File

@ -38,16 +38,20 @@ import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentMatcher;
@ -62,12 +66,51 @@ public class TestStripeCompactionPolicy {
private static final byte[] KEY_C = Bytes.toBytes("ccc");
private static final byte[] KEY_D = Bytes.toBytes("ddd");
private static final byte[] KEY_E = Bytes.toBytes("eee");
private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
private static long defaultSplitSize = 18;
private static float defaultSplitCount = 1.8F;
private final static int defaultInitialCount = 1;
private static long defaultTtl = 1000 * 1000;
@Test
public void testNoStripesFromFlush() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
StripeCompactionPolicy policy = createPolicy(conf);
StripeInformationProvider si = createStripesL0Only(0, 0);
KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
KeyValue[][] expected = new KeyValue[][] { input };
verifyFlush(policy, si, input, expected, null);
}
@Test
public void testOldStripesFromFlush() throws Exception {
StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } };
verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
}
@Test
public void testNewStripesFromFlush() throws Exception {
StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
StripeInformationProvider si = createStripesL0Only(0, 0);
KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
// Starts with one stripe; unlike flush results, must have metadata
KeyValue[][] expected = new KeyValue[][] { input };
verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
}
@Test
public void testSingleStripeCompaction() throws Exception {
// Create a special policy that only compacts single stripes, using standard methods.
@ -424,6 +467,25 @@ public class TestStripeCompactionPolicy {
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end));
}
/** Verify arbitrary flush. */
protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(si, input.length);
StripeMultiFileWriter mw = req.createWriter();
mw.init(null, writers, new KeyValue.KVComparator());
for (KeyValue kv : input) {
mw.append(kv);
}
boolean hasMetadata = boundaries != null;
mw.commitWriters(0, false);
writers.verifyKvs(expected, true, hasMetadata);
if (hasMetadata) {
writers.verifyBoundaries(boundaries);
}
}
private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
return dropDeletes == null ? any(byte[].class)
: (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));