From 18ecb48662cfb7bded903639f066fbad0f732604 Mon Sep 17 00:00:00 2001 From: sershe Date: Wed, 6 Nov 2013 01:49:36 +0000 Subject: [PATCH] HBASE-8541 implement flush-into-stripes in stripe compactions git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1539211 13f79535-47bb-0310-9956-ffa450edef68 --- .../regionserver/DefaultStoreFileManager.java | 4 +- .../regionserver/DefaultStoreFlusher.java | 8 +- .../hadoop/hbase/regionserver/HStore.java | 24 ++- .../hbase/regionserver/StoreFileManager.java | 4 +- .../hbase/regionserver/StoreFlusher.java | 43 ++--- .../regionserver/StripeMultiFileWriter.java | 21 ++- .../hbase/regionserver/StripeStoreConfig.java | 31 +++- .../hbase/regionserver/StripeStoreEngine.java | 5 +- .../regionserver/StripeStoreFileManager.java | 109 +++++++---- .../regionserver/StripeStoreFlusher.java | 171 ++++++++++++++++++ .../compactions/StripeCompactionPolicy.java | 24 ++- .../regionserver/TestStripeCompactor.java | 25 ++- .../TestStripeStoreFileManager.java | 73 ++++---- .../TestStripeCompactionPolicy.java | 62 +++++++ 14 files changed, 464 insertions(+), 140 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java index a77f89fba2b..86649bf593d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java @@ -68,9 +68,9 @@ class DefaultStoreFileManager implements StoreFileManager { } @Override - public void insertNewFile(StoreFile sf) { + public void insertNewFiles(Collection sfs) throws IOException { ArrayList newFiles = new ArrayList(storefiles); - newFiles.add(sf); + newFiles.addAll(sfs); sortAndSetStoreFiles(newFiles); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 79f93799c25..a5837c29b4f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -53,13 +53,7 @@ public class DefaultStoreFlusher extends StoreFlusher { // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - KeyValueScanner memstoreScanner = - new CollectionBackedScanner(snapshot, store.getComparator()); - InternalScanner scanner = preCreateCoprocScanner(memstoreScanner); - if (scanner == null) { - scanner = createStoreScanner(smallestReadPoint, memstoreScanner); - } - scanner = postCreateCoprocScanner(scanner); + InternalScanner scanner = createScanner(snapshot, smallestReadPoint); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 19be8029043..9cf8c8d1600 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -611,7 +611,7 @@ public class HStore implements Store { // Append the new storefile into the list this.lock.writeLock().lock(); try { - this.storeEngine.getStoreFileManager().insertNewFile(sf); + this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf)); } finally { // We need the lock, as long as we are updating the storeFiles // or changing the memstore. Let us release it before calling @@ -852,9 +852,7 @@ public class HStore implements Store { final List sfs, final SortedSet set) throws IOException { this.lock.writeLock().lock(); try { - for (StoreFile sf : sfs) { - this.storeEngine.getStoreFileManager().insertNewFile(sf); - } + this.storeEngine.getStoreFileManager().insertNewFiles(sfs); this.memstore.clearSnapshot(set); } finally { // We need the lock, as long as we are updating the storeFiles @@ -1122,15 +1120,15 @@ public class HStore implements Store { .append(" to execute."); LOG.info(message.toString()); if (LOG.isTraceEnabled()) { - int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); - long resultSize = 0; - for (StoreFile sf : sfs) { - resultSize += sf.getReader().length(); - } - String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," - + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," - + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; - LOG.trace(traceMessage); + int fileCount = storeEngine.getStoreFileManager().getStorefileCount(); + long resultSize = 0; + for (StoreFile sf : sfs) { + resultSize += sf.getReader().length(); + } + String traceMessage = "COMPACTION start,end,size out,files in,files out,store size," + + "store files [" + compactionStartTime + "," + now + "," + resultSize + "," + + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]"; + LOG.trace(traceMessage); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java index c275dee4580..85be8ae80ad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java @@ -47,10 +47,10 @@ public interface StoreFileManager { void loadFiles(List storeFiles); /** - * Adds new file, either for from MemStore flush or bulk insert, into the structure. + * Adds new files, either for from MemStore flush or bulk insert, into the structure. * @param sf New store file. */ - void insertNewFile(StoreFile sf); + void insertNewFiles(Collection sfs) throws IOException; /** * Adds compaction results into the structure. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index ba515eb56a6..149c0fe2b98 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.compactions.Compactor; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; /** * Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one). @@ -77,31 +78,27 @@ abstract class StoreFlusher { writer.close(); } - /** Calls coprocessor to create a flush scanner based on memstore scanner */ - protected InternalScanner preCreateCoprocScanner( - KeyValueScanner memstoreScanner) throws IOException { - if (store.getCoprocessorHost() != null) { - return store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner); - } - return null; - } - - /** Creates the default flush scanner based on memstore scanner */ - protected InternalScanner createStoreScanner(long smallestReadPoint, - KeyValueScanner memstoreScanner) throws IOException { - Scan scan = new Scan(); - scan.setMaxVersions(store.getScanInfo().getMaxVersions()); - return new StoreScanner(store, store.getScanInfo(), scan, - Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, - smallestReadPoint, HConstants.OLDEST_TIMESTAMP); - } /** - * Calls coprocessor to create a scanner based on default flush scanner - * @return new or default scanner; if null, flush should not proceed. + * Creates the scanner for flushing snapshot. Also calls coprocessors. + * @return The scanner; null if coprocessor is canceling the flush. */ - protected InternalScanner postCreateCoprocScanner(InternalScanner scanner) - throws IOException { + protected InternalScanner createScanner(SortedSet snapshot, + long smallestReadPoint) throws IOException { + KeyValueScanner memstoreScanner = + new CollectionBackedScanner(snapshot, store.getComparator()); + InternalScanner scanner = null; + if (store.getCoprocessorHost() != null) { + scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner); + } + if (scanner == null) { + Scan scan = new Scan(); + scan.setMaxVersions(store.getScanInfo().getMaxVersions()); + scanner = new StoreScanner(store, store.getScanInfo(), scan, + Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES, + smallestReadPoint, HConstants.OLDEST_TIMESTAMP); + } + assert scanner != null; if (store.getCoprocessorHost() != null) { return store.getCoprocessorHost().preFlush(store, scanner); } @@ -114,7 +111,7 @@ abstract class StoreFlusher { * @param sink Sink to write data to. Could be StoreFile.Writer. * @param smallestReadPoint Smallest read point used for the flush. * @return Bytes flushed. -s */ + */ protected long performFlush(InternalScanner scanner, Compactor.CellSink sink, long smallestReadPoint) throws IOException { int compactionKVMax = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java index 6c67a692bf5..588ccc06f4d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java @@ -46,6 +46,9 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { /** Source scanner that is tracking KV count; may be null if source is not StoreScanner */ protected StoreScanner sourceScanner; + /** Whether to write stripe metadata */ + private boolean doWriteStripeMetadata = true; + public interface WriterFactory { public StoreFile.Writer createWriter() throws IOException; } @@ -63,18 +66,24 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { this.comparator = comparator; } + public void setNoStripeMetadata() { + this.doWriteStripeMetadata = false; + } + public List commitWriters(long maxSeqId, boolean isMajor) throws IOException { assert this.existingWriters != null; commitWritersInternal(); assert this.boundaries.size() == (this.existingWriters.size() + 1); - LOG.debug("Writing out metadata for " + this.existingWriters.size() + " writers"); - + LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w") + + "riting out metadata for " + this.existingWriters.size() + " writers"); List paths = new ArrayList(); for (int i = 0; i < this.existingWriters.size(); ++i) { StoreFile.Writer writer = this.existingWriters.get(i); if (writer == null) continue; // writer was skipped due to 0 KVs - writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i)); - writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1)); + if (doWriteStripeMetadata) { + writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i)); + writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1)); + } writer.appendMetadata(maxSeqId, isMajor); paths.add(writer.getPath()); writer.close(); @@ -109,8 +118,8 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink { byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException { if (StripeStoreFileManager.OPEN_KEY != left && comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) { - String error = "The first row is lower than the left boundary of [" - + Bytes.toString(left) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]"; + String error = "The first row is lower than the left boundary of [" + Bytes.toString(left) + + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]"; LOG.error(error); throw new IOException(error); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java index c502dec7e02..e74a065ce9d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java @@ -56,6 +56,9 @@ public class StripeStoreConfig { to get this count from the outset and prevent unnecessary splitting. */ public static final String INITIAL_STRIPE_COUNT_KEY = "hbase.store.stripe.initialStripeCount"; + /** Whether to flush memstore to L0 files, or directly to stripes. */ + public static final String FLUSH_TO_L0_KEY = "hbase.store.stripe.compaction.flushToL0"; + /** When splitting region, the maximum size imbalance to allow in an attempt to split at a stripe boundary, so that no files go to both regions. Most users won't need to change that. */ public static final String MAX_REGION_SPLIT_IMBALANCE_KEY = @@ -70,36 +73,46 @@ public class StripeStoreConfig { private final int initialCount; private final long sizeToSplitAt; private final float splitPartCount; + private final boolean flushIntoL0; private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount - private final double EPSILON = 0.001; // good enough for this, not a real epsilon. + private static final double EPSILON = 0.001; // good enough for this, not a real epsilon. public StripeStoreConfig(Configuration config, StoreConfigInformation sci) { this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4); this.stripeCompactMinFiles = config.getInt(MIN_FILES_KEY, 3); this.stripeCompactMaxFiles = config.getInt(MAX_FILES_KEY, 10); this.maxRegionSplitImbalance = getFloat(config, MAX_REGION_SPLIT_IMBALANCE_KEY, 1.5f, true); + this.flushIntoL0 = config.getBoolean(FLUSH_TO_L0_KEY, false); - this.splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2.0f, true); + float splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2f, true); if (Math.abs(splitPartCount - 1.0) < EPSILON) { - throw new RuntimeException("Split part count cannot be 1: " + this.splitPartCount); + LOG.error("Split part count cannot be 1 (" + this.splitPartCount + "), using the default"); + splitPartCount = 2f; } - // TODO: change when no L0. + this.splitPartCount = splitPartCount; // Arbitrary default split size - 4 times the size of one L0 compaction. + // If we flush into L0 there's no split compaction, but for default value it is ok. double flushSize = sci.getMemstoreFlushSize(); if (flushSize == 0) { flushSize = 128 * 1024 * 1024; } long defaultSplitSize = (long)(flushSize * getLevel0MinFiles() * 4 * splitPartCount); this.sizeToSplitAt = config.getLong(SIZE_TO_SPLIT_KEY, defaultSplitSize); - this.initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1); + int initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1); + if (initialCount == 0) { + LOG.error("Initial stripe count is 0, using the default"); + initialCount = 1; + } + this.initialCount = initialCount; this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount); } private static float getFloat( Configuration config, String key, float defaultValue, boolean moreThanOne) { float value = config.getFloat(key, defaultValue); - if (value == 0) { - LOG.warn(String.format("%s is set to 0; using default value of %f", key, defaultValue)); + if (value < EPSILON) { + LOG.warn(String.format( + "%s is set to 0 or negative; using default value of %f", key, defaultValue)); value = defaultValue; } else if ((value > 1f) != moreThanOne) { value = 1f / value; @@ -123,6 +136,10 @@ public class StripeStoreConfig { return stripeCompactMaxFiles; } + public boolean isUsingL0Flush() { + return flushIntoL0; + } + public long getSplitSize() { return sizeToSplitAt; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java index 954711eb1fd..fe54a93f937 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java @@ -38,7 +38,7 @@ import com.google.common.base.Preconditions; * The storage engine that implements the stripe-based store/compaction scheme. */ @InterfaceAudience.Private -public class StripeStoreEngine extends StoreEngine { static final Log LOG = LogFactory.getLog(StripeStoreEngine.class); private StripeStoreConfig config; @@ -58,8 +58,9 @@ public class StripeStoreEngine extends StoreEngine newFiles = new ArrayList(state.level0Files); - insertFileIntoStripe(newFiles, sf); - ensureLevel0Metadata(sf); - this.state.level0Files = ImmutableList.copyOf(newFiles); - ArrayList newAllFiles = new ArrayList(state.allFilesCached); - newAllFiles.add(sf); - this.state.allFilesCached = ImmutableList.copyOf(newAllFiles); + public void insertNewFiles(Collection sfs) throws IOException { + CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true); + cmc.mergeResults(null, sfs); + debugDumpState("Added new files"); } @Override @@ -304,7 +303,7 @@ public class StripeStoreFileManager + " files replaced by " + results.size()); // In order to be able to fail in the middle of the operation, we'll operate on lazy // copies and apply the result at the end. - CompactionResultsMergeCopy cmc = new CompactionResultsMergeCopy(); + CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false); cmc.mergeResults(compactedFiles, results); debugDumpState("Merged compaction results"); } @@ -628,11 +627,11 @@ public class StripeStoreFileManager } /** - * Non-static helper class for merging compaction results. - * Since we want to merge them atomically (more or less), it operates on lazy copies, and - * then applies copies to real lists as necessary. + * Non-static helper class for merging compaction or flush results. + * Since we want to merge them atomically (more or less), it operates on lazy copies, + * then creates a new state object and puts it in place. */ - private class CompactionResultsMergeCopy { + private class CompactionOrFlushMergeCopy { private ArrayList> stripeFiles = null; private ArrayList level0Files = null; private ArrayList stripeEndRows = null; @@ -641,11 +640,13 @@ public class StripeStoreFileManager private Collection results = null; private List l0Results = new ArrayList(); + private final boolean isFlush; - public CompactionResultsMergeCopy() { + public CompactionOrFlushMergeCopy(boolean isFlush) { // Create a lazy mutable copy (other fields are so lazy they start out as nulls). this.stripeFiles = new ArrayList>( StripeStoreFileManager.this.state.stripeFiles); + this.isFlush = isFlush; } public void mergeResults(Collection compactedFiles, Collection results) @@ -654,8 +655,8 @@ public class StripeStoreFileManager this.compactedFiles = compactedFiles; this.results = results; // Do logical processing. - removeCompactedFiles(); - TreeMap newStripes = processCompactionResults(); + if (!isFlush) removeCompactedFiles(); + TreeMap newStripes = processResults(); if (newStripes != null) { processNewCandidateStripes(newStripes); } @@ -681,7 +682,7 @@ public class StripeStoreFileManager } List newAllFiles = new ArrayList(oldState.allFilesCached); - newAllFiles.removeAll(compactedFiles); + if (!isFlush) newAllFiles.removeAll(compactedFiles); newAllFiles.addAll(results); newState.allFilesCached = ImmutableList.copyOf(newAllFiles); return newState; @@ -689,12 +690,16 @@ public class StripeStoreFileManager private void updateMetadataMaps() { StripeStoreFileManager parent = StripeStoreFileManager.this; - for (StoreFile sf : this.compactedFiles) { - parent.fileStarts.remove(sf); - parent.fileEnds.remove(sf); + if (!isFlush) { + for (StoreFile sf : this.compactedFiles) { + parent.fileStarts.remove(sf); + parent.fileEnds.remove(sf); + } } - for (StoreFile sf : this.l0Results) { - parent.ensureLevel0Metadata(sf); + if (this.l0Results != null) { + for (StoreFile sf : this.l0Results) { + parent.ensureLevel0Metadata(sf); + } } } @@ -729,12 +734,14 @@ public class StripeStoreFileManager * or to the list of new candidate stripes. * @return New candidate stripes. */ - private TreeMap processCompactionResults() throws IOException { + private TreeMap processResults() throws IOException { TreeMap newStripes = null; for (StoreFile sf : this.results) { byte[] startRow = startOf(sf), endRow = endOf(sf); if (isInvalid(endRow) || isInvalid(startRow)) { - LOG.warn("The newly compacted files doesn't have stripe rows set: " + sf.getPath()); + if (!isFlush) { + LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath()); + } insertFileIntoStripe(getLevel0Copy(), sf); this.l0Results.add(sf); continue; @@ -775,7 +782,7 @@ public class StripeStoreFileManager int stripeIndex = findStripeIndexByEndRow(oldEndRow); if (stripeIndex < 0) { throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong" - + " to a known stripe (end row + [" + Bytes.toString(oldEndRow) + "])"); + + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])"); } source = getStripeCopy(stripeIndex); } @@ -803,6 +810,8 @@ public class StripeStoreFileManager throw new IOException("Newly created stripes do not cover the entire key space."); } + boolean canAddNewStripes = true; + Collection filesForL0 = null; if (hasStripes) { // Determine which stripes will need to be removed because they conflict with new stripes. // The new boundaries should match old stripe boundaries, so we should get exact matches. @@ -815,20 +824,49 @@ public class StripeStoreFileManager } int removeTo = findStripeIndexByEndRow(lastEndRow); if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range."); - // Remove old empty stripes. - int originalCount = this.stripeFiles.size(); + // See if there are files in the stripes we are trying to replace. + ArrayList conflictingFiles = new ArrayList(); for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { - if (!this.stripeFiles.get(removeIndex).isEmpty()) { - throw new IOException("Compaction intends to create a new stripe that replaces an" - + " existing one, but the latter contains some files."); + conflictingFiles.addAll(this.stripeFiles.get(removeIndex)); + } + if (!conflictingFiles.isEmpty()) { + // This can be caused by two things - concurrent flush into stripes, or a bug. + // Unfortunately, we cannot tell them apart without looking at timing or something + // like that. We will assume we are dealing with a flush and dump it into L0. + if (isFlush) { + long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values()); + LOG.warn("Stripes were created by a flush, but results of size " + newSize + + " cannot be added because the stripes have changed"); + canAddNewStripes = false; + filesForL0 = newStripes.values(); + } else { + long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles); + LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) " + + " of size " + oldSize + " are moved to L0 due to concurrent stripe change"); + filesForL0 = conflictingFiles; } - if (removeIndex != originalCount - 1) { - this.stripeEndRows.remove(removeIndex); + if (filesForL0 != null) { + for (StoreFile sf : filesForL0) { + insertFileIntoStripe(getLevel0Copy(), sf); + } + l0Results.addAll(filesForL0); + } + } + + if (canAddNewStripes) { + // Remove old empty stripes. + int originalCount = this.stripeFiles.size(); + for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) { + if (removeIndex != originalCount - 1) { + this.stripeEndRows.remove(removeIndex); + } + this.stripeFiles.remove(removeIndex); } - this.stripeFiles.remove(removeIndex); } } + if (!canAddNewStripes) return; // Files were already put into L0. + // Now, insert new stripes. The total ranges match, so we can insert where we removed. byte[] previousEndRow = null; int insertAt = removeFrom; @@ -838,7 +876,8 @@ public class StripeStoreFileManager assert !isOpen(previousEndRow); byte[] startRow = startOf(newStripe.getValue()); if (!rowEquals(previousEndRow, startRow)) { - throw new IOException("The new stripes produced by compaction are not contiguous"); + throw new IOException("The new stripes produced by " + + (isFlush ? "flush" : "compaction") + " are not contiguous"); } } // Add the new stripe. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java new file mode 100644 index 00000000000..cbfd07afc15 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -0,0 +1,171 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.SortedSet; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.monitoring.MonitoredTask; +import org.apache.hadoop.hbase.regionserver.StoreFile.Writer; +import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; +import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy; +import org.apache.hadoop.hbase.util.CollectionBackedScanner; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or + * into separate striped files, avoiding L0. + */ +public class StripeStoreFlusher extends StoreFlusher { + private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class); + private final Object flushLock = new Object(); + private final StripeCompactionPolicy policy; + private final StripeCompactionPolicy.StripeInformationProvider stripes; + + public StripeStoreFlusher(Configuration conf, Store store, + StripeCompactionPolicy policy, StripeStoreFileManager stripes) { + super(conf, store); + this.policy = policy; + this.stripes = stripes; + } + + @Override + public List flushSnapshot(SortedSet snapshot, long cacheFlushSeqNum, + final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status) + throws IOException { + List result = null; + int kvCount = snapshot.size(); + if (kvCount == 0) return result; // don't flush if there are no entries + + long smallestReadPoint = store.getSmallestReadPoint(); + InternalScanner scanner = createScanner(snapshot, smallestReadPoint); + if (scanner == null) { + return result; // NULL scanner returned from coprocessor hooks means skip normal processing + } + + // Let policy select flush method. + StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount); + + long flushedBytes = 0; + boolean success = false; + StripeMultiFileWriter mw = null; + try { + mw = req.createWriter(); // Writer according to the policy. + StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount); + StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null; + mw.init(storeScanner, factory, store.getComparator()); + + synchronized (flushLock) { + flushedBytes = performFlush(scanner, mw, smallestReadPoint); + result = mw.commitWriters(cacheFlushSeqNum, false); + success = true; + } + } finally { + if (!success && (mw != null)) { + result.clear(); + for (Path leftoverFile : mw.abortWriters()) { + try { + store.getFileSystem().delete(leftoverFile, false); + } catch (Exception e) { + LOG.error("Failed to delete a file after failed flush: " + e); + } + } + } + flushedSize.set(flushedBytes); + try { + scanner.close(); + } catch (IOException ex) { + LOG.warn("Failed to close flush scanner, ignoring", ex); + } + } + return result; + } + + private StripeMultiFileWriter.WriterFactory createWriterFactory( + final TimeRangeTracker tracker, final long kvCount) { + return new StripeMultiFileWriter.WriterFactory() { + @Override + public Writer createWriter() throws IOException { + StoreFile.Writer writer = store.createWriterInTmp( + kvCount, store.getFamily().getCompression(), false, true, true); + writer.setTimeRangeTracker(tracker); + return writer; + } + }; + } + + /** Stripe flush request wrapper that writes a non-striped file. */ + public static class StripeFlushRequest { + @VisibleForTesting + public StripeMultiFileWriter createWriter() throws IOException { + StripeMultiFileWriter writer = + new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY); + writer.setNoStripeMetadata(); + return writer; + } + } + + /** Stripe flush request wrapper based on boundaries. */ + public static class BoundaryStripeFlushRequest extends StripeFlushRequest { + private final List targetBoundaries; + + /** @param targetBoundaries New files should be written with these boundaries. */ + public BoundaryStripeFlushRequest(List targetBoundaries) { + this.targetBoundaries = targetBoundaries; + } + + @Override + public StripeMultiFileWriter createWriter() throws IOException { + return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null); + } + } + + /** Stripe flush request wrapper based on size. */ + public static class SizeStripeFlushRequest extends StripeFlushRequest { + private final int targetCount; + private final long targetKvs; + + /** + * @param targetCount The maximum number of stripes to flush into. + * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than + * total number of kvs, all the overflow data goes into the last stripe. + */ + public SizeStripeFlushRequest(int targetCount, long targetKvs) { + this.targetCount = targetCount; + this.targetKvs = targetKvs; + } + + @Override + public StripeMultiFileWriter createWriter() throws IOException { + return new StripeMultiFileWriter.SizeMultiWriter( + this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java index 901400c5553..9b845306cfc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreUtils; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; +import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcatenatedLists; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -75,9 +76,24 @@ public class StripeCompactionPolicy extends CompactionPolicy { if (si.getStripeCount() > 0) { return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries()); } - int initialCount = this.config.getInitialCount(); - long targetKvs = estimateTargetKvs(request.getFiles(), initialCount).getFirst(); - return new SplitStripeCompactionRequest(request, OPEN_KEY, OPEN_KEY, targetKvs); + Pair targetKvsAndCount = estimateTargetKvs( + request.getFiles(), this.config.getInitialCount()); + return new SplitStripeCompactionRequest( + request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst()); + } + + public StripeStoreFlusher.StripeFlushRequest selectFlush( + StripeInformationProvider si, int kvCount) { + if (this.config.isUsingL0Flush()) { + return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request. + } + if (si.getStripeCount() == 0) { + // No stripes - start with the requisite count, derive KVs per stripe. + int initialCount = this.config.getInitialCount(); + return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount); + } + // There are stripes - do according to the boundaries. + return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries()); } public StripeCompactionRequest selectCompaction(StripeInformationProvider si, @@ -341,7 +357,7 @@ public class StripeCompactionPolicy extends CompactionPolicy { return totalSize; } - private static long getTotalFileSize(final Collection candidates) { + public static long getTotalFileSize(final Collection candidates) { long totalSize = 0; for (StoreFile storeFile : candidates) { totalSize += storeFile.getReader().length(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java index a71b3c5da06..43b8254088b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java @@ -125,7 +125,7 @@ public class TestStripeCompactor { StripeCompactor sc = createCompactor(writers, input); List paths = sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo); - writers.verifyKvs(output, allFiles); + writers.verifyKvs(output, allFiles, true); if (allFiles) { assertEquals(output.length, paths.size()); writers.verifyBoundaries(boundaries); @@ -162,7 +162,7 @@ public class TestStripeCompactor { List paths = sc.compact( createDummyRequest(), targetCount, targetSize, left, right, null, null); assertEquals(output.length, paths.size()); - writers.verifyKvs(output, true); + writers.verifyKvs(output, true, true); List boundaries = new ArrayList(); boundaries.add(left); for (int i = 1; i < output.length; ++i) { @@ -242,7 +242,8 @@ public class TestStripeCompactor { } // StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted. - private static class StoreFileWritersCapture implements Answer { + public static class StoreFileWritersCapture implements + Answer, StripeMultiFileWriter.WriterFactory { public static class Writer { public ArrayList kvs = new ArrayList(); public TreeMap data = new TreeMap(Bytes.BYTES_COMPARATOR); @@ -251,7 +252,7 @@ public class TestStripeCompactor { private List writers = new ArrayList(); @Override - public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable { + public StoreFile.Writer createWriter() throws IOException { final Writer realWriter = new Writer(); writers.add(realWriter); StoreFile.Writer writer = mock(StoreFile.Writer.class); @@ -267,7 +268,12 @@ public class TestStripeCompactor { return writer; } - public void verifyKvs(KeyValue[][] kvss, boolean allFiles) { + @Override + public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable { + return createWriter(); + } + + public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) { if (allFiles) { assertEquals(kvss.length, writers.size()); } @@ -276,8 +282,13 @@ public class TestStripeCompactor { KeyValue[] kvs = kvss[i]; if (kvs != null) { Writer w = writers.get(i - skippedWriters); - assertNotNull(w.data.get(STRIPE_START_KEY)); - assertNotNull(w.data.get(STRIPE_END_KEY)); + if (requireMetadata) { + assertNotNull(w.data.get(STRIPE_START_KEY)); + assertNotNull(w.data.get(STRIPE_END_KEY)); + } else { + assertNull(w.data.get(STRIPE_START_KEY)); + assertNull(w.data.get(STRIPE_END_KEY)); + } assertEquals(kvs.length, w.kvs.size()); for (int j = 0; j < kvs.length; ++j) { assertEquals(kvs[j], w.kvs.get(j)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java index 4d28c78dc72..0fdf5d8201a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java @@ -83,7 +83,7 @@ public class TestStripeStoreFileManager { public void testInsertFilesIntoL0() throws Exception { StripeStoreFileManager manager = createManager(); MockStoreFile sf = createFile(); - manager.insertNewFile(sf); + manager.insertNewFiles(al(sf)); assertEquals(1, manager.getStorefileCount()); Collection filesForGet = manager.getFilesForScanOrGet(true, KEY_A, KEY_A); assertEquals(1, filesForGet.size()); @@ -99,8 +99,8 @@ public class TestStripeStoreFileManager { @Test public void testClearFiles() throws Exception { StripeStoreFileManager manager = createManager(); - manager.insertNewFile(createFile()); - manager.insertNewFile(createFile()); + manager.insertNewFiles(al(createFile())); + manager.insertNewFiles(al(createFile())); manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B), createFile(KEY_B, OPEN_KEY))); assertEquals(4, manager.getStorefileCount()); @@ -120,8 +120,8 @@ public class TestStripeStoreFileManager { public void testRowKeyBefore() throws Exception { StripeStoreFileManager manager = createManager(); StoreFile l0File = createFile(), l0File2 = createFile(); - manager.insertNewFile(l0File); - manager.insertNewFile(l0File2); + manager.insertNewFiles(al(l0File)); + manager.insertNewFiles(al(l0File2)); // Get candidate files. Iterator sfs = manager.getCandidateFilesForRowKeyBefore(KV_B); sfs.next(); @@ -174,8 +174,8 @@ public class TestStripeStoreFileManager { // If there are no stripes, should pick midpoint from the biggest file in L0. MockStoreFile sf5 = createFile(5, 0); sf5.splitPoint = new byte[1]; - manager.insertNewFile(sf5); - manager.insertNewFile(createFile(1, 0)); + manager.insertNewFiles(al(sf5)); + manager.insertNewFiles(al(createFile(1, 0))); assertEquals(sf5.splitPoint, manager.getSplitPoint()); // Same if there's one stripe but the biggest file is still in L0. @@ -259,7 +259,7 @@ public class TestStripeStoreFileManager { // Populate one L0 file. MockStoreFile sf0 = createFile(); - manager.insertNewFile(sf0); + manager.insertNewFiles(al(sf0)); verifyGetAndScanScenario(manager, null, null, sf0); verifyGetAndScanScenario(manager, null, KEY_C, sf0); verifyGetAndScanScenario(manager, KEY_B, null, sf0); @@ -356,14 +356,11 @@ public class TestStripeStoreFileManager { } @Test - @SuppressWarnings("unchecked") public void testAddingCompactionResults() throws Exception { StripeStoreFileManager manager = createManager(); // First, add some L0 files and "compact" one with new stripe creation. - StoreFile sf_L0_0a = createFile(); - StoreFile sf_L0_0b = createFile(); - manager.insertNewFile(sf_L0_0a); - manager.insertNewFile(sf_L0_0b); + StoreFile sf_L0_0a = createFile(), sf_L0_0b = createFile(); + manager.insertNewFiles(al(sf_L0_0a, sf_L0_0b)); // Try compacting with invalid new branches (gaps, overlaps) - no effect. verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B))); @@ -384,7 +381,7 @@ public class TestStripeStoreFileManager { StoreFile sf_L0_1 = createFile(); StoreFile sf_i2B_1 = createFile(OPEN_KEY, KEY_B); StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C); - manager.insertNewFile(sf_L0_1); + manager.insertNewFiles(al(sf_L0_1)); manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1)); verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1)); @@ -400,27 +397,21 @@ public class TestStripeStoreFileManager { manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3)); verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3)); - // Try to rebalance two stripes, but don't take all files from them - no effect. + // Rebalance two stripes. StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D); StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY); - ArrayList compacted3 = al(); - verifyInvalidCompactionScenario(manager, al(sf_B2C_0, sf_C2i_0), al(sf_B2D_4, sf_D2i_4)); - - // Rebalance two stripes correctly. manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4)); verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4)); // Split the first stripe. StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A); StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B); - ArrayList compacted4 = al(createFile(OPEN_KEY, KEY_A), createFile(KEY_A, KEY_B)); manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5)); verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5)); // Split the middle stripe. StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C); StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D); - ArrayList compacted5 = al(createFile(KEY_B, KEY_C), createFile(KEY_C, KEY_D)); manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6)); verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6)); @@ -429,14 +420,6 @@ public class TestStripeStoreFileManager { manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7)); verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7)); - // Try various range mismatch cases in replaced and new data - no effect. - ArrayList tmp = al(sf_A2C_7, sf_C2D_6); // [A, C) - verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, KEY_C))); - verifyInvalidCompactionScenario(manager, tmp, al(createFile(OPEN_KEY, KEY_D))); - verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, OPEN_KEY))); - verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, KEY_B))); - verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, keyAfter(KEY_B)))); - // Merge lower half. StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C); manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8)); @@ -448,14 +431,40 @@ public class TestStripeStoreFileManager { verifyAllFiles(manager, al(sf_i2i_9)); } + @Test + public void testCompactionAndFlushConflict() throws Exception { + // Add file flush into stripes + StripeStoreFileManager sfm = createManager(); + assertEquals(0, sfm.getStripeCount()); + StoreFile sf_i2c = createFile(OPEN_KEY, KEY_C), sf_c2i = createFile(KEY_C, OPEN_KEY); + sfm.insertNewFiles(al(sf_i2c, sf_c2i)); + assertEquals(2, sfm.getStripeCount()); + // Now try to add conflicting flush - should throw. + StoreFile sf_i2d = createFile(OPEN_KEY, KEY_D), sf_d2i = createFile(KEY_D, OPEN_KEY); + sfm.insertNewFiles(al(sf_i2d, sf_d2i)); + assertEquals(2, sfm.getStripeCount()); + assertEquals(2, sfm.getLevel0Files().size()); + verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i); + // Remove these files. + sfm.addCompactionResults(al(sf_i2d, sf_d2i), al()); + assertEquals(0, sfm.getLevel0Files().size()); + // Add another file to stripe; then "rebalance" stripes w/o it - the file, which was + // presumably flushed during compaction, should go to L0. + StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C); + sfm.insertNewFiles(al(sf_i2c_2)); + sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i)); + assertEquals(1, sfm.getLevel0Files().size()); + verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2); + } + @Test public void testEmptyResultsForStripes() throws Exception { // Test that we can compact L0 into a subset of stripes. StripeStoreFileManager manager = createManager(); StoreFile sf0a = createFile(); StoreFile sf0b = createFile(); - manager.insertNewFile(sf0a); - manager.insertNewFile(sf0b); + manager.insertNewFiles(al(sf0a)); + manager.insertNewFiles(al(sf0b)); ArrayList compacted = al(createFile(OPEN_KEY, KEY_B), createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY)); manager.addCompactionResults(al(sf0a), compacted); @@ -491,7 +500,7 @@ public class TestStripeStoreFileManager { conf.setInt("hbase.hstore.blockingStoreFiles", limit); StripeStoreFileManager sfm = createManager(al(), conf); for (int i = 0; i < l0Files; ++i) { - sfm.insertNewFile(createFile()); + sfm.insertNewFiles(al(createFile())); } for (int i = 0; i < filesInStripe; ++i) { ArrayList stripe = new ArrayList(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java index af248014bc5..e3c1f97fdec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java @@ -38,16 +38,20 @@ import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.regionserver.StoreConfigInformation; import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter; import org.apache.hadoop.hbase.regionserver.StripeStoreConfig; import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager; +import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher; import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.ConcatenatedLists; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; +import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture; import org.junit.Test; import org.junit.experimental.categories.Category; import org.mockito.ArgumentMatcher; @@ -62,12 +66,51 @@ public class TestStripeCompactionPolicy { private static final byte[] KEY_C = Bytes.toBytes("ccc"); private static final byte[] KEY_D = Bytes.toBytes("ddd"); private static final byte[] KEY_E = Bytes.toBytes("eee"); + private static final KeyValue KV_A = new KeyValue(KEY_A, 0L); + private static final KeyValue KV_B = new KeyValue(KEY_B, 0L); + private static final KeyValue KV_C = new KeyValue(KEY_C, 0L); + private static final KeyValue KV_D = new KeyValue(KEY_D, 0L); + private static final KeyValue KV_E = new KeyValue(KEY_E, 0L); + private static long defaultSplitSize = 18; private static float defaultSplitCount = 1.8F; private final static int defaultInitialCount = 1; private static long defaultTtl = 1000 * 1000; + @Test + public void testNoStripesFromFlush() throws Exception { + Configuration conf = HBaseConfiguration.create(); + conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true); + StripeCompactionPolicy policy = createPolicy(conf); + StripeInformationProvider si = createStripesL0Only(0, 0); + + KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E }; + KeyValue[][] expected = new KeyValue[][] { input }; + verifyFlush(policy, si, input, expected, null); + } + + @Test + public void testOldStripesFromFlush() throws Exception { + StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); + StripeInformationProvider si = createStripes(0, KEY_C, KEY_D); + + KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; + KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B }, + new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } }; + verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY }); + } + + @Test + public void testNewStripesFromFlush() throws Exception { + StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create()); + StripeInformationProvider si = createStripesL0Only(0, 0); + KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E }; + // Starts with one stripe; unlike flush results, must have metadata + KeyValue[][] expected = new KeyValue[][] { input }; + verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY }); + } + @Test public void testSingleStripeCompaction() throws Exception { // Create a special policy that only compacts single stripes, using standard methods. @@ -424,6 +467,25 @@ public class TestStripeCompactionPolicy { dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end)); } + /** Verify arbitrary flush. */ + protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si, + KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException { + StoreFileWritersCapture writers = new StoreFileWritersCapture(); + StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(si, input.length); + StripeMultiFileWriter mw = req.createWriter(); + mw.init(null, writers, new KeyValue.KVComparator()); + for (KeyValue kv : input) { + mw.append(kv); + } + boolean hasMetadata = boundaries != null; + mw.commitWriters(0, false); + writers.verifyKvs(expected, true, hasMetadata); + if (hasMetadata) { + writers.verifyBoundaries(boundaries); + } + } + + private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) { return dropDeletes == null ? any(byte[].class) : (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));