HBASE-15389 Write out multiple files when compaction

This commit is contained in:
zhangduo 2016-03-25 15:33:20 +08:00
parent 4b5b8e01d0
commit d5bc56c266
16 changed files with 1285 additions and 606 deletions

View File

@ -0,0 +1,120 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor.CellSink;
/**
* Base class for cell sink that separates the provided cells into multiple files.
*/
@InterfaceAudience.Private
public abstract class AbstractMultiFileWriter implements CellSink {
private static final Log LOG = LogFactory.getLog(AbstractMultiFileWriter.class);
/** Factory that is used to produce single StoreFile.Writer-s */
protected WriterFactory writerFactory;
/** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
protected StoreScanner sourceScanner;
public interface WriterFactory {
public StoreFile.Writer createWriter() throws IOException;
}
/**
* Initializes multi-writer before usage.
* @param sourceScanner Optional store scanner to obtain the information about read progress.
* @param factory Factory used to produce individual file writers.
*/
public void init(StoreScanner sourceScanner, WriterFactory factory) {
this.writerFactory = factory;
this.sourceScanner = sourceScanner;
}
/**
* Commit all writers.
* <p>
* Notice that here we use the same <code>maxSeqId</code> for all output files since we haven't
* find an easy to find enough sequence ids for different output files in some corner cases. See
* comments in HBASE-15400 for more details.
*/
public List<Path> commitWriters(long maxSeqId, boolean majorCompaction) throws IOException {
preCommitWriters();
Collection<StoreFile.Writer> writers = this.writers();
if (LOG.isDebugEnabled()) {
LOG.debug("Commit " + writers.size() + " writers, maxSeqId=" + maxSeqId
+ ", majorCompaction=" + majorCompaction);
}
List<Path> paths = new ArrayList<Path>();
for (Writer writer : writers) {
if (writer == null) {
continue;
}
writer.appendMetadata(maxSeqId, majorCompaction);
preCloseWriter(writer);
paths.add(writer.getPath());
writer.close();
}
return paths;
}
/**
* Close all writers without throwing any exceptions. This is used when compaction failed usually.
*/
public List<Path> abortWriters() {
List<Path> paths = new ArrayList<Path>();
for (StoreFile.Writer writer : writers()) {
try {
if (writer != null) {
paths.add(writer.getPath());
writer.close();
}
} catch (Exception ex) {
LOG.error("Failed to close the writer after an unfinished compaction.", ex);
}
}
return paths;
}
protected abstract Collection<StoreFile.Writer> writers();
/**
* Subclasses override this method to be called at the end of a successful sequence of append; all
* appends are processed before this method is called.
*/
protected void preCommitWriters() throws IOException {
}
/**
* Subclasses override this method to be called before we close the give writer. Usually you can
* append extra metadata to the writer.
*/
protected void preCloseWriter(StoreFile.Writer writer) throws IOException {
}
}

View File

@ -0,0 +1,83 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.TreeMap;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
/**
* class for cell sink that separates the provided cells into multiple files for date tiered
* compaction.
*/
@InterfaceAudience.Private
public class DateTieredMultiFileWriter extends AbstractMultiFileWriter {
private final NavigableMap<Long, StoreFile.Writer> lowerBoundary2Writer
= new TreeMap<Long, StoreFile.Writer>();
private final boolean needEmptyFile;
/**
* @param needEmptyFile whether need to create an empty store file if we haven't written out
* anything.
*/
public DateTieredMultiFileWriter(List<Long> lowerBoundaries, boolean needEmptyFile) {
for (Long lowerBoundary : lowerBoundaries) {
lowerBoundary2Writer.put(lowerBoundary, null);
}
this.needEmptyFile = needEmptyFile;
}
@Override
public void append(Cell cell) throws IOException {
Map.Entry<Long, StoreFile.Writer> entry = lowerBoundary2Writer.floorEntry(cell.getTimestamp());
StoreFile.Writer writer = entry.getValue();
if (writer == null) {
writer = writerFactory.createWriter();
lowerBoundary2Writer.put(entry.getKey(), writer);
}
writer.append(cell);
}
@Override
protected Collection<Writer> writers() {
return lowerBoundary2Writer.values();
}
@Override
protected void preCommitWriters() throws IOException {
if (!needEmptyFile) {
return;
}
for (StoreFile.Writer writer : lowerBoundary2Writer.values()) {
if (writer != null) {
return;
}
}
// we haven't written out any data, create an empty file to retain metadata
lowerBoundary2Writer.put(lowerBoundary2Writer.firstKey(), writerFactory.createWriter());
}
}

View File

@ -251,6 +251,13 @@ public class StoreFile {
this.cfBloomType = other.cfBloomType;
}
/**
* Clone a StoreFile for opening private reader.
*/
public StoreFile cloneForReader() {
return new StoreFile(this);
}
/**
* @return the StoreFile object associated to this StoreFile.
* null if the StoreFile is not a reference.
@ -285,7 +292,7 @@ public class StoreFile {
* @return True if this is HFile.
*/
public boolean isHFile() {
return this.fileInfo.isHFile(this.fileInfo.getPath());
return StoreFileInfo.isHFile(this.fileInfo.getPath());
}
/**

View File

@ -19,51 +19,35 @@ package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Base class for cell sink that separates the provided cells into multiple files.
* Base class for cell sink that separates the provided cells into multiple files for stripe
* compaction.
*/
@InterfaceAudience.Private
public abstract class StripeMultiFileWriter implements Compactor.CellSink {
public abstract class StripeMultiFileWriter extends AbstractMultiFileWriter {
private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
/** Factory that is used to produce single StoreFile.Writer-s */
protected WriterFactory writerFactory;
protected KVComparator comparator;
protected final KVComparator comparator;
protected List<StoreFile.Writer> existingWriters;
protected List<byte[]> boundaries;
/** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
protected StoreScanner sourceScanner;
/** Whether to write stripe metadata */
private boolean doWriteStripeMetadata = true;
public interface WriterFactory {
public StoreFile.Writer createWriter() throws IOException;
}
/**
* Initializes multi-writer before usage.
* @param sourceScanner Optional store scanner to obtain the information about read progress.
* @param factory Factory used to produce individual file writers.
* @param comparator Comparator used to compare rows.
*/
public void init(StoreScanner sourceScanner, WriterFactory factory, KVComparator comparator)
throws IOException {
this.writerFactory = factory;
this.sourceScanner = sourceScanner;
public StripeMultiFileWriter(KVComparator comparator) {
this.comparator = comparator;
}
@ -71,41 +55,35 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
this.doWriteStripeMetadata = false;
}
public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
assert this.existingWriters != null;
commitWritersInternal();
assert this.boundaries.size() == (this.existingWriters.size() + 1);
LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
+ "riting out metadata for " + this.existingWriters.size() + " writers");
List<Path> paths = new ArrayList<Path>();
for (int i = 0; i < this.existingWriters.size(); ++i) {
StoreFile.Writer writer = this.existingWriters.get(i);
if (writer == null) continue; // writer was skipped due to 0 KVs
if (doWriteStripeMetadata) {
writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
}
writer.appendMetadata(maxSeqId, isMajor);
paths.add(writer.getPath());
writer.close();
}
this.existingWriters = null;
return paths;
@Override
protected Collection<Writer> writers() {
return existingWriters;
}
public List<Path> abortWriters() {
protected abstract void preCommitWritersInternal() throws IOException;
@Override
protected final void preCommitWriters() throws IOException {
// do some sanity check here.
assert this.existingWriters != null;
List<Path> paths = new ArrayList<Path>();
for (StoreFile.Writer writer : this.existingWriters) {
try {
paths.add(writer.getPath());
writer.close();
} catch (Exception ex) {
LOG.error("Failed to close the writer after an unfinished compaction.", ex);
preCommitWritersInternal();
assert this.boundaries.size() == (this.existingWriters.size() + 1);
}
@Override
protected void preCloseWriter(Writer writer) throws IOException {
if (doWriteStripeMetadata) {
if (LOG.isDebugEnabled()) {
LOG.debug("Write stripe metadata for " + writer.getPath().toString());
}
int index = existingWriters.indexOf(writer);
writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, boundaries.get(index));
writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, boundaries.get(index + 1));
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip writing stripe metadata for " + writer.getPath().toString());
}
}
this.existingWriters = null;
return paths;
}
/**
@ -115,12 +93,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
* @param rowOffset Offset for row.
* @param rowLength Length for row.
*/
protected void sanityCheckLeft(
byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
if (StripeStoreFileManager.OPEN_KEY != left &&
comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
+ "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
protected void sanityCheckLeft(byte[] left, byte[] row, int rowOffset, int rowLength)
throws IOException {
if (StripeStoreFileManager.OPEN_KEY != left
&& comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
String error =
"The first row is lower than the left boundary of [" + Bytes.toString(left) + "]: ["
+ Bytes.toString(row, rowOffset, rowLength) + "]";
LOG.error(error);
throw new IOException(error);
}
@ -133,27 +112,22 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
* @param rowOffset Offset for row.
* @param rowLength Length for row.
*/
protected void sanityCheckRight(
byte[] right, byte[] row, int rowOffset, int rowLength) throws IOException {
if (StripeStoreFileManager.OPEN_KEY != right &&
comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
String error = "The last row is higher or equal than the right boundary of ["
+ Bytes.toString(right) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
protected void sanityCheckRight(byte[] right, byte[] row, int rowOffset, int rowLength)
throws IOException {
if (StripeStoreFileManager.OPEN_KEY != right
&& comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
String error =
"The last row is higher or equal than the right boundary of [" + Bytes.toString(right)
+ "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
LOG.error(error);
throw new IOException(error);
}
}
/**
* Subclasses override this method to be called at the end of a successful sequence of
* append; all appends are processed before this method is called.
*/
protected abstract void commitWritersInternal() throws IOException;
/**
* MultiWriter that separates the cells based on fixed row-key boundaries.
* All the KVs between each pair of neighboring boundaries from the list supplied to ctor
* will end up in one file, and separate from all other such pairs.
* MultiWriter that separates the cells based on fixed row-key boundaries. All the KVs between
* each pair of neighboring boundaries from the list supplied to ctor will end up in one file, and
* separate from all other such pairs.
*/
public static class BoundaryMultiWriter extends StripeMultiFileWriter {
private StoreFile.Writer currentWriter;
@ -166,27 +140,28 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
/**
* @param targetBoundaries The boundaries on which writers/files are separated.
* @param majorRangeFrom Major range is the range for which at least one file should be
* written (because all files are included in compaction).
* majorRangeFrom is the left boundary.
* @param majorRangeFrom Major range is the range for which at least one file should be written
* (because all files are included in compaction). majorRangeFrom is the left boundary.
* @param majorRangeTo The right boundary of majorRange (see majorRangeFrom).
*/
public BoundaryMultiWriter(List<byte[]> targetBoundaries,
public BoundaryMultiWriter(KVComparator comparator, List<byte[]> targetBoundaries,
byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
super();
super(comparator);
this.boundaries = targetBoundaries;
this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
// "major" range (range for which all files are included) boundaries, if any,
// must match some target boundaries, let's find them.
assert (majorRangeFrom == null) == (majorRangeTo == null);
assert (majorRangeFrom == null) == (majorRangeTo == null);
if (majorRangeFrom != null) {
majorRangeFromIndex = (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0
: Collections.binarySearch(this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
majorRangeToIndex = (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
: Collections.binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
majorRangeFromIndex =
(majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0 : Collections.binarySearch(
this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
majorRangeToIndex =
(majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size() : Collections
.binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
throw new IOException("Major range does not match writer boundaries: [" +
Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
throw new IOException("Major range does not match writer boundaries: ["
+ Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
+ majorRangeFromIndex + " to " + majorRangeToIndex);
}
}
@ -196,8 +171,8 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
public void append(Cell cell) throws IOException {
if (currentWriter == null && existingWriters.isEmpty()) {
// First append ever, do a sanity check.
sanityCheckLeft(this.boundaries.get(0),
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
sanityCheckLeft(this.boundaries.get(0), cell.getRowArray(), cell.getRowOffset(),
cell.getRowLength());
}
prepareWriterFor(cell);
currentWriter.append(cell);
@ -206,20 +181,20 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
}
private boolean isCellAfterCurrentWriter(Cell cell) {
return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
(comparator.compareRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) && (comparator.compareRows(
cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(), currentWriterEndKey, 0,
currentWriterEndKey.length) >= 0));
}
@Override
protected void commitWritersInternal() throws IOException {
protected void preCommitWritersInternal() throws IOException {
stopUsingCurrentWriter();
while (existingWriters.size() < boundaries.size() - 1) {
createEmptyWriter();
}
if (lastCell != null) {
sanityCheckRight(boundaries.get(boundaries.size() - 1),
lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength());
sanityCheckRight(boundaries.get(boundaries.size() - 1), lastCell.getRowArray(),
lastCell.getRowOffset(), lastCell.getRowLength());
}
}
@ -239,14 +214,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
}
/**
* Called if there are no cells for some stripe.
* We need to have something in the writer list for this stripe, so that writer-boundary
* list indices correspond to each other. We can insert null in the writer list for that
* purpose, except in the following cases where we actually need a file:
* 1) If we are in range for which we are compacting all the files, we need to create an
* empty file to preserve stripe metadata.
* 2) If we have not produced any file at all for this compactions, and this is the
* last chance (the last stripe), we need to preserve last seqNum (see also HBASE-6059).
* Called if there are no cells for some stripe. We need to have something in the writer list
* for this stripe, so that writer-boundary list indices correspond to each other. We can insert
* null in the writer list for that purpose, except in the following cases where we actually
* need a file: 1) If we are in range for which we are compacting all the files, we need to
* create an empty file to preserve stripe metadata. 2) If we have not produced any file at all
* for this compactions, and this is the last chance (the last stripe), we need to preserve last
* seqNum (see also HBASE-6059).
*/
private void createEmptyWriter() throws IOException {
int index = existingWriters.size();
@ -256,12 +230,13 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
boolean needEmptyFile = isInMajorRange || isLastWriter;
existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
hasAnyWriter |= needEmptyFile;
currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
? null : boundaries.get(existingWriters.size() + 1);
currentWriterEndKey =
(existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
.size() + 1);
}
private void checkCanCreateWriter() throws IOException {
int maxWriterCount = boundaries.size() - 1;
int maxWriterCount = boundaries.size() - 1;
assert existingWriters.size() <= maxWriterCount;
if (existingWriters.size() >= maxWriterCount) {
throw new IOException("Cannot create any more writers (created " + existingWriters.size()
@ -278,16 +253,16 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
cellsInCurrentWriter = 0;
}
currentWriter = null;
currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
? null : boundaries.get(existingWriters.size() + 1);
currentWriterEndKey =
(existingWriters.size() + 1 == boundaries.size()) ? null : boundaries.get(existingWriters
.size() + 1);
}
}
/**
* MultiWriter that separates the cells based on target cell number per file and file count.
* New file is started every time the target number of KVs is reached, unless the fixed
* count of writers has already been created (in that case all the remaining KVs go into
* the last writer).
* MultiWriter that separates the cells based on target cell number per file and file count. New
* file is started every time the target number of KVs is reached, unless the fixed count of
* writers has already been created (in that case all the remaining KVs go into the last writer).
*/
public static class SizeMultiWriter extends StripeMultiFileWriter {
private int targetCount;
@ -308,8 +283,9 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
* @param left The left boundary of the first writer.
* @param right The right boundary of the last writer.
*/
public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
super();
public SizeMultiWriter(KVComparator comparator, int targetCount, long targetKvs, byte[] left,
byte[] right) {
super(comparator);
this.targetCount = targetCount;
this.targetCells = targetKvs;
this.left = left;
@ -330,10 +306,10 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
doCreateWriter = true;
} else if (lastRowInCurrentWriter != null
&& !comparator.matchingRows(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
+ "] row; wrote out " + cellsInCurrentWriter + " kvs");
+ "] row; wrote out " + cellsInCurrentWriter + " kvs");
}
lastRowInCurrentWriter = null;
cellsInCurrentWriter = 0;
@ -355,34 +331,36 @@ public abstract class StripeMultiFileWriter implements Compactor.CellSink {
++cellsInCurrentWriter;
cellsSeen = cellsInCurrentWriter;
if (this.sourceScanner != null) {
cellsSeen = Math.max(cellsSeen,
this.sourceScanner.getEstimatedNumberOfKvsScanned() - cellsSeenInPrevious);
cellsSeen =
Math.max(cellsSeen, this.sourceScanner.getEstimatedNumberOfKvsScanned()
- cellsSeenInPrevious);
}
// If we are not already waiting for opportunity to close, start waiting if we can
// create any more writers and if the current one is too big.
if (lastRowInCurrentWriter == null
&& existingWriters.size() < targetCount
if (lastRowInCurrentWriter == null && existingWriters.size() < targetCount
&& cellsSeen >= targetCells) {
lastRowInCurrentWriter = cell.getRow(); // make a copy
if (LOG.isDebugEnabled()) {
LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
lastRowInCurrentWriter) + "] row; observed " + cellsSeen + " kvs and wrote out "
+ cellsInCurrentWriter + " kvs");
LOG.debug("Preparing to start a new writer after ["
+ Bytes.toString(lastRowInCurrentWriter) + "] row; observed " + cellsSeen
+ " kvs and wrote out " + cellsInCurrentWriter + " kvs");
}
}
}
@Override
protected void commitWritersInternal() throws IOException {
protected void preCommitWritersInternal() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping with " + cellsInCurrentWriter + " kvs in last writer" +
((this.sourceScanner == null) ? "" : ("; observed estimated "
LOG.debug("Stopping with "
+ cellsInCurrentWriter
+ " kvs in last writer"
+ ((this.sourceScanner == null) ? "" : ("; observed estimated "
+ this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
}
if (lastCell != null) {
sanityCheckRight(
right, lastCell.getRowArray(), lastCell.getRowOffset(), lastCell.getRowLength());
sanityCheckRight(right, lastCell.getRowArray(), lastCell.getRowOffset(),
lastCell.getRowLength());
}
// When expired stripes were going to be merged into one, and if no writer was created during

View File

@ -24,19 +24,19 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import com.google.common.annotations.VisibleForTesting;
/**
* Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
* into separate striped files, avoiding L0.
@ -69,7 +69,8 @@ public class StripeStoreFlusher extends StoreFlusher {
}
// Let policy select flush method.
StripeFlushRequest req = this.policy.selectFlush(this.stripes, cellsCount);
StripeFlushRequest req = this.policy.selectFlush(store.getComparator(), this.stripes,
cellsCount);
boolean success = false;
StripeMultiFileWriter mw = null;
@ -78,7 +79,7 @@ public class StripeStoreFlusher extends StoreFlusher {
StripeMultiFileWriter.WriterFactory factory = createWriterFactory(
snapshot.getTimeRangeTracker(), cellsCount);
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
mw.init(storeScanner, factory);
synchronized (flushLock) {
performFlush(scanner, mw, smallestReadPoint, throughputController);
@ -123,10 +124,17 @@ public class StripeStoreFlusher extends StoreFlusher {
/** Stripe flush request wrapper that writes a non-striped file. */
public static class StripeFlushRequest {
protected final KVComparator comparator;
public StripeFlushRequest(KVComparator comparator) {
this.comparator = comparator;
}
@VisibleForTesting
public StripeMultiFileWriter createWriter() throws IOException {
StripeMultiFileWriter writer =
new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(comparator, 1,
Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
writer.setNoStripeMetadata();
return writer;
}
@ -137,13 +145,15 @@ public class StripeStoreFlusher extends StoreFlusher {
private final List<byte[]> targetBoundaries;
/** @param targetBoundaries New files should be written with these boundaries. */
public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
public BoundaryStripeFlushRequest(KVComparator comparator, List<byte[]> targetBoundaries) {
super(comparator);
this.targetBoundaries = targetBoundaries;
}
@Override
public StripeMultiFileWriter createWriter() throws IOException {
return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
return new StripeMultiFileWriter.BoundaryMultiWriter(comparator, targetBoundaries, null,
null);
}
}
@ -157,15 +167,16 @@ public class StripeStoreFlusher extends StoreFlusher {
* @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
* total number of kvs, all the overflow data goes into the last stripe.
*/
public SizeStripeFlushRequest(int targetCount, long targetKvs) {
public SizeStripeFlushRequest(KVComparator comparator, int targetCount, long targetKvs) {
super(comparator);
this.targetCount = targetCount;
this.targetKvs = targetKvs;
}
@Override
public StripeMultiFileWriter createWriter() throws IOException {
return new StripeMultiFileWriter.SizeMultiWriter(
this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
return new StripeMultiFileWriter.SizeMultiWriter(comparator, this.targetCount, this.targetKvs,
OPEN_KEY, OPEN_KEY);
}
}
}

View File

@ -0,0 +1,161 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.AbstractMultiFileWriter.WriterFactory;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import com.google.common.io.Closeables;
/**
* Base class for implementing a Compactor which will generate multiple output files after
* compaction.
*/
@InterfaceAudience.Private
public abstract class AbstractMultiOutputCompactor<T extends AbstractMultiFileWriter>
extends Compactor {
private static final Log LOG = LogFactory.getLog(AbstractMultiOutputCompactor.class);
public AbstractMultiOutputCompactor(Configuration conf, Store store) {
super(conf, store);
}
protected interface InternalScannerFactory {
ScanType getScanType(CompactionRequest request);
InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException;
}
protected List<Path> compact(T writer, final CompactionRequest request,
InternalScannerFactory scannerFactory, ThroughputController throughputController, User user)
throws IOException {
final FileDetails fd = getFileDetails(request.getFiles(), request.isAllFiles());
this.progress = new CompactionProgress(fd.maxKeyCount);
// Find the smallest read point across all the Scanners.
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners;
Collection<StoreFile> readersToClose;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(f.cloneForReader());
}
scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));
} else {
readersToClose = Collections.emptyList();
scanners = createFileScanners(request.getFiles(), smallestReadPoint,
store.throttleCompaction(request.getSize()));
}
InternalScanner scanner = null;
boolean finished = false;
try {
/* Include deletes, unless we are doing a major compaction */
ScanType scanType = scannerFactory.getScanType(request);
scanner = preCreateCoprocScanner(request, scanType, fd.earliestPutTs, scanners);
if (scanner == null) {
scanner = scannerFactory.createScanner(scanners, scanType, fd, smallestReadPoint);
}
scanner = postCreateCoprocScanner(request, scanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<Path>();
}
boolean cleanSeqId = false;
if (fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
// Create the writer factory for compactions.
final boolean needMvcc = fd.maxMVCCReadpoint >= 0;
WriterFactory writerFactory = new WriterFactory() {
@Override
public Writer createWriter() throws IOException {
return store.createWriterInTmp(fd.maxKeyCount, compactionCompression, true, needMvcc,
fd.maxTagsLength > 0, store.throttleCompaction(request.getSize()));
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
StoreScanner storeScanner =
(scanner instanceof StoreScanner) ? (StoreScanner) scanner : null;
writer.init(storeScanner, writerFactory);
finished = performCompaction(scanner, writer, smallestReadPoint, cleanSeqId,
throughputController);
if (!finished) {
throw new InterruptedIOException("Aborting compaction of store " + store + " in region "
+ store.getRegionInfo().getRegionNameAsString() + " because it was interrupted.");
}
} finally {
Closeables.close(scanner, true);
for (StoreFile f : readersToClose) {
try {
f.closeReader(true);
} catch (IOException e) {
LOG.warn("Exception closing " + f, e);
}
}
if (!finished) {
FileSystem fs = store.getFileSystem();
for (Path leftoverFile : writer.abortWriters()) {
try {
fs.delete(leftoverFile, false);
} catch (IOException e) {
LOG.error("Failed to delete the leftover file " + leftoverFile
+ " after an unfinished compaction.",
e);
}
}
}
}
assert finished : "We should have exited the method on all error paths";
return commitMultiWriter(writer, fd, request);
}
protected abstract List<Path> commitMultiWriter(T writer, FileDetails fd,
CompactionRequest request) throws IOException;
}

View File

@ -60,14 +60,14 @@ import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
public abstract class Compactor {
private static final Log LOG = LogFactory.getLog(Compactor.class);
protected CompactionProgress progress;
protected Configuration conf;
protected Store store;
protected final Configuration conf;
protected final Store store;
protected final int compactionKVMax;
protected final Compression.Algorithm compactionCompression;
private int compactionKVMax;
protected Compression.Algorithm compactionCompression;
/** specify how many days to keep MVCC values during major compaction **/
protected int keepSeqIdPeriod;
protected final int keepSeqIdPeriod;
//TODO: depending on Store is not good but, realistically, all compactors currently do.
Compactor(final Configuration conf, final Store store) {

View File

@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.DateTieredMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
/**
* This compactor will generate StoreFile for different time ranges.
*/
@InterfaceAudience.Private
public class DateTieredCompactor extends AbstractMultiOutputCompactor<DateTieredMultiFileWriter> {
private static final Log LOG = LogFactory.getLog(DateTieredCompactor.class);
public DateTieredCompactor(Configuration conf, Store store) {
super(conf, store);
}
private boolean needEmptyFile(CompactionRequest request) {
// if we are going to compact the last N files, then we need to emit an empty file to retain the
// maxSeqId if we haven't written out anything.
return StoreFile.getMaxSequenceIdInList(request.getFiles()) == store.getMaxSequenceId();
}
public List<Path> compact(final CompactionRequest request, List<Long> lowerBoundaries,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + lowerBoundaries.size()
+ "windows, lower boundaries: " + lowerBoundaries);
}
DateTieredMultiFileWriter writer =
new DateTieredMultiFileWriter(lowerBoundaries, needEmptyFile(request));
return compact(writer, request, new InternalScannerFactory() {
@Override
public ScanType getScanType(CompactionRequest request) {
return request.isAllFiles() ? ScanType.COMPACT_DROP_DELETES
: ScanType.COMPACT_RETAIN_DELETES;
}
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
return DateTieredCompactor.this.createScanner(store, scanners, scanType, smallestReadPoint,
fd.earliestPutTs);
}
}, throughputController, user);
}
@Override
protected List<Path> commitMultiWriter(DateTieredMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
return writer.commitWriters(fd.maxSeqId, request.isAllFiles());
}
}

View File

@ -65,10 +65,10 @@ public class DefaultCompactor extends Compactor {
Collection<StoreFile> readersToClose;
if (this.conf.getBoolean("hbase.regionserver.compaction.private.readers", true)) {
// clone all StoreFiles, so we'll do the compaction on a independent copy of StoreFiles,
// HFileFiles, and their readers
// HFiles, and their readers
readersToClose = new ArrayList<StoreFile>(request.getFiles().size());
for (StoreFile f : request.getFiles()) {
readersToClose.add(new StoreFile(f));
readersToClose.add(f.cloneForReader());
}
scanners = createFileScanners(readersToClose, smallestReadPoint,
store.throttleCompaction(request.getSize()));

View File

@ -25,11 +25,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
@ -42,8 +45,6 @@ import org.apache.hadoop.hbase.util.ConcatenatedLists;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Pair;
import com.google.common.collect.ImmutableList;
/**
* Stripe store implementation of compaction policy.
*/
@ -84,18 +85,20 @@ public class StripeCompactionPolicy extends CompactionPolicy {
request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
}
public StripeStoreFlusher.StripeFlushRequest selectFlush(
public StripeStoreFlusher.StripeFlushRequest selectFlush(KVComparator comparator,
StripeInformationProvider si, int kvCount) {
if (this.config.isUsingL0Flush()) {
return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request.
// L0 is used, return dumb request.
return new StripeStoreFlusher.StripeFlushRequest(comparator);
}
if (si.getStripeCount() == 0) {
// No stripes - start with the requisite count, derive KVs per stripe.
int initialCount = this.config.getInitialCount();
return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
return new StripeStoreFlusher.SizeStripeFlushRequest(comparator, initialCount,
kvCount / initialCount);
}
// There are stripes - do according to the boundaries.
return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
return new StripeStoreFlusher.BoundaryStripeFlushRequest(comparator, si.getStripeBoundaries());
}
public StripeCompactionRequest selectCompaction(StripeInformationProvider si,

View File

@ -18,50 +18,65 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.Bytes;
/**
* This is the placeholder for stripe compactor. The implementation,
* as well as the proper javadoc, will be added in HBASE-7967.
* This is the placeholder for stripe compactor. The implementation, as well as the proper javadoc,
* will be added in HBASE-7967.
*/
@InterfaceAudience.Private
public class StripeCompactor extends Compactor {
public class StripeCompactor extends AbstractMultiOutputCompactor<StripeMultiFileWriter> {
private static final Log LOG = LogFactory.getLog(StripeCompactor.class);
public StripeCompactor(Configuration conf, Store store) {
super(conf, store);
}
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController) throws IOException {
return compact(request, targetBoundaries, majorRangeFromRow, majorRangeToRow,
throughputController, null);
private final class StripeInternalScannerFactory implements InternalScannerFactory {
private final byte[] majorRangeFromRow;
private final byte[] majorRangeToRow;
public StripeInternalScannerFactory(byte[] majorRangeFromRow, byte[] majorRangeToRow) {
this.majorRangeFromRow = majorRangeFromRow;
this.majorRangeToRow = majorRangeToRow;
}
@Override
public ScanType getScanType(CompactionRequest request) {
// If majorRangeFromRow and majorRangeToRow are not null, then we will not use the return
// value to create InternalScanner. See the createScanner method below. The return value is
// also used when calling coprocessor hooks.
return ScanType.COMPACT_RETAIN_DELETES;
}
@Override
public InternalScanner createScanner(List<StoreFileScanner> scanners, ScanType scanType,
FileDetails fd, long smallestReadPoint) throws IOException {
return (majorRangeFromRow == null) ? StripeCompactor.this.createScanner(store, scanners,
scanType, smallestReadPoint, fd.earliestPutTs) : StripeCompactor.this.createScanner(store,
scanners, smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
}
}
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
byte[] majorRangeFromRow, byte[] majorRangeToRow, ThroughputController throughputController,
User user) throws IOException {
if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
@ -70,115 +85,32 @@ public class StripeCompactor extends Compactor {
}
LOG.debug(sb.toString());
}
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
targetBoundaries, majorRangeFromRow, majorRangeToRow);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
throughputController, user);
}
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController) throws IOException {
return compact(request, targetCount, targetSize, left, right, majorRangeFromRow,
majorRangeToRow, throughputController, null);
StripeMultiFileWriter writer =
new StripeMultiFileWriter.BoundaryMultiWriter(store.getComparator(), targetBoundaries,
majorRangeFromRow, majorRangeToRow);
return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
majorRangeToRow), throughputController, user);
}
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + targetSize
+ " target file size, no more than " + targetCount + " files, in ["
+ Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
LOG.debug("Executing compaction with " + targetSize + " target file size, no more than "
+ targetCount + " files, in [" + Bytes.toString(left) + "] [" + Bytes.toString(right)
+ "] range");
}
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
targetCount, targetSize, left, right);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow,
throughputController, user);
StripeMultiFileWriter writer =
new StripeMultiFileWriter.SizeMultiWriter(store.getComparator(), targetCount, targetSize,
left, right);
return compact(writer, request, new StripeInternalScannerFactory(majorRangeFromRow,
majorRangeToRow), throughputController, user);
}
private List<Path> compactInternal(StripeMultiFileWriter mw, final CompactionRequest request,
byte[] majorRangeFromRow, byte[] majorRangeToRow,
ThroughputController throughputController, User user) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles();
final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount);
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners = createFileScanners(filesToCompact,
smallestReadPoint, store.throttleCompaction(request.getSize()));
boolean finished = false;
InternalScanner scanner = null;
boolean cleanSeqId = false;
try {
// Get scanner to use.
ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners, user);
if (scanner == null) {
scanner = (majorRangeFromRow == null)
? createScanner(store, scanners,
ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs)
: createScanner(store, scanners,
smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
}
scanner = postCreateCoprocScanner(request, coprocScanType, scanner, user);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<Path>();
}
// Create the writer factory for compactions.
if(fd.minSeqIdToKeep > 0) {
smallestReadPoint = Math.min(fd.minSeqIdToKeep, smallestReadPoint);
cleanSeqId = true;
}
final boolean needMvcc = fd.maxMVCCReadpoint > 0;
final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
@Override
public Writer createWriter() throws IOException {
return store.createWriterInTmp(
fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0,
store.throttleCompaction(request.getSize()));
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
finished =
performCompaction(scanner, mw, smallestReadPoint, cleanSeqId, throughputController);
if (!finished) {
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
" because it was interrupted.");
}
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (Throwable t) {
// Don't fail the compaction if this fails.
LOG.error("Failed to close scanner after compaction.", t);
}
}
if (!finished) {
for (Path leftoverFile : mw.abortWriters()) {
try {
store.getFileSystem().delete(leftoverFile, false);
} catch (Exception ex) {
LOG.error("Failed to delete the leftover file after an unfinished compaction.", ex);
}
}
}
}
assert finished : "We should have exited the method on all error paths";
List<Path> newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor());
@Override
protected List<Path> commitMultiWriter(StripeMultiFileWriter writer, FileDetails fd,
CompactionRequest request) throws IOException {
List<Path> newFiles = writer.commitWriters(fd.maxSeqId, request.isMajor());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles;
}

View File

@ -1,323 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Category(SmallTests.class)
public class TestStripeCompactor {
private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
private static final byte[] KEY_B = Bytes.toBytes("bbb");
private static final byte[] KEY_C = Bytes.toBytes("ccc");
private static final byte[] KEY_D = Bytes.toBytes("ddd");
private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
private static final KeyValue KV_B = kvAfter(KEY_B);
private static final KeyValue KV_C = kvAfter(KEY_C);
private static final KeyValue KV_D = kvAfter(KEY_D);
private static KeyValue kvAfter(byte[] key) {
return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
}
private static <T> T[] a(T... a) {
return a;
}
private static KeyValue[] e() {
return TestStripeCompactor.<KeyValue>a();
}
@Test
public void testBoundaryCompactions() throws Exception {
// General verification
verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
}
@Test
public void testBoundaryCompactionEmptyFiles() throws Exception {
// No empty file if there're already files.
verifyBoundaryCompaction(
a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false);
verifyBoundaryCompaction(a(KV_A, KV_C),
a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false);
// But should be created if there are no file.
verifyBoundaryCompaction(
e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false);
// In major range if there's major range.
verifyBoundaryCompaction(
e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false);
verifyBoundaryCompaction(
e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false);
// Major range should have files regardless of KVs.
verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
}
public static void verifyBoundaryCompaction(
KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception {
verifyBoundaryCompaction(input, boundaries, output, null, null, true);
}
public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries,
KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles)
throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo,
NoLimitThroughputController.INSTANCE);
writers.verifyKvs(output, allFiles, true);
if (allFiles) {
assertEquals(output.length, paths.size());
writers.verifyBoundaries(boundaries);
}
}
@Test
public void testSizeCompactions() throws Exception {
// General verification with different sizes.
verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
// Verify row boundaries are preserved.
verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
// Too much data, count limits the number of files.
verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
a(a(KV_A), a(KV_B, KV_C, KV_D)));
verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
new KeyValue[][] { a(KV_A, KV_B, KV_C) });
// Too little data/large count, no extra files.
verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
a(a(KV_A, KV_B), a(KV_C, KV_D)));
}
public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize,
byte[] left, byte[] right, KeyValue[][] output) throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null, null,
NoLimitThroughputController.INSTANCE);
assertEquals(output.length, paths.size());
writers.verifyKvs(output, true, true);
List<byte[]> boundaries = new ArrayList<byte[]>();
boundaries.add(left);
for (int i = 1; i < output.length; ++i) {
boundaries.add(output[i][0].getRow());
}
boundaries.add(right);
writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
}
private static StripeCompactor createCompactor(
StoreFileWritersCapture writers, KeyValue[] input) throws Exception {
Configuration conf = HBaseConfiguration.create();
final Scanner scanner = new Scanner(input);
// Create store mock that is satisfactory for compactor.
HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, new KVComparator());
Store store = mock(Store.class);
when(store.getFamily()).thenReturn(col);
when(store.getScanInfo()).thenReturn(si);
when(store.areWritesEnabled()).thenReturn(true);
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
anyBoolean(), anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.getComparator()).thenReturn(new KVComparator());
return new StripeCompactor(conf, store) {
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
return scanner;
}
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
return scanner;
}
};
}
private static CompactionRequest createDummyRequest() throws Exception {
// "Files" are totally unused, it's Scanner class below that gives compactor fake KVs.
// But compaction depends on everything under the sun, so stub everything with dummies.
StoreFile sf = mock(StoreFile.class);
StoreFile.Reader r = mock(StoreFile.Reader.class);
when(r.length()).thenReturn(1L);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
.thenReturn(mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
when(sf.createReader()).thenReturn(r);
when(sf.createReader(anyBoolean())).thenReturn(r);
return new CompactionRequest(Arrays.asList(sf));
}
private static class Scanner implements InternalScanner {
private final ArrayList<KeyValue> kvs;
public Scanner(KeyValue... kvs) {
this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (kvs.isEmpty()) return false;
results.add(kvs.remove(0));
return !kvs.isEmpty();
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext)
throws IOException {
return next(result);
}
@Override
public void close() throws IOException {}
}
// StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
public static class StoreFileWritersCapture implements
Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
public static class Writer {
public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
}
private List<Writer> writers = new ArrayList<Writer>();
@Override
public StoreFile.Writer createWriter() throws IOException {
final Writer realWriter = new Writer();
writers.add(realWriter);
StoreFile.Writer writer = mock(StoreFile.Writer.class);
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]);
}}).when(writer).append(any(KeyValue.class));
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
return realWriter.data.put((byte[])args[0], (byte[])args[1]);
}}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
return writer;
}
@Override
public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
return createWriter();
}
public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
if (allFiles) {
assertEquals(kvss.length, writers.size());
}
int skippedWriters = 0;
for (int i = 0; i < kvss.length; ++i) {
KeyValue[] kvs = kvss[i];
if (kvs != null) {
Writer w = writers.get(i - skippedWriters);
if (requireMetadata) {
assertNotNull(w.data.get(STRIPE_START_KEY));
assertNotNull(w.data.get(STRIPE_END_KEY));
} else {
assertNull(w.data.get(STRIPE_START_KEY));
assertNull(w.data.get(STRIPE_END_KEY));
}
assertEquals(kvs.length, w.kvs.size());
for (int j = 0; j < kvs.length; ++j) {
assertEquals(kvs[j], w.kvs.get(j));
}
} else {
assertFalse(allFiles);
++skippedWriters;
}
}
}
public void verifyBoundaries(byte[][] boundaries) {
assertEquals(boundaries.length - 1, writers.size());
for (int i = 0; i < writers.size(); ++i) {
assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY));
assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY));
}
}
}
}

View File

@ -0,0 +1,212 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TreeMap;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.util.Bytes;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestCompactor {
public static StoreFile createDummyStoreFile(long maxSequenceId) throws Exception {
// "Files" are totally unused, it's Scanner class below that gives compactor fake KVs.
// But compaction depends on everything under the sun, so stub everything with dummies.
StoreFile sf = mock(StoreFile.class);
StoreFile.Reader r = mock(StoreFile.Reader.class);
when(r.length()).thenReturn(1L);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
.thenReturn(mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
when(sf.createReader()).thenReturn(r);
when(sf.createReader(anyBoolean())).thenReturn(r);
when(sf.cloneForReader()).thenReturn(sf);
when(sf.getMaxSequenceId()).thenReturn(maxSequenceId);
return sf;
}
public static CompactionRequest createDummyRequest() throws Exception {
return new CompactionRequest(Arrays.asList(createDummyStoreFile(1L)));
}
// StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
public static class StoreFileWritersCapture
implements Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
public static class Writer {
public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
public boolean hasMetadata;
}
private List<Writer> writers = new ArrayList<Writer>();
@Override
public StoreFile.Writer createWriter() throws IOException {
final Writer realWriter = new Writer();
writers.add(realWriter);
StoreFile.Writer writer = mock(StoreFile.Writer.class);
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
return realWriter.kvs.add((KeyValue) invocation.getArguments()[0]);
}
}).when(writer).append(any(KeyValue.class));
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
return realWriter.data.put((byte[]) args[0], (byte[]) args[1]);
}
}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
realWriter.hasMetadata = true;
return null;
}
}).when(writer).appendMetadata(any(long.class), any(boolean.class));
doAnswer(new Answer<Path>() {
@Override
public Path answer(InvocationOnMock invocation) throws Throwable {
return new Path("foo");
}
}).when(writer).getPath();
return writer;
}
@Override
public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
return createWriter();
}
public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
if (allFiles) {
assertEquals(kvss.length, writers.size());
}
int skippedWriters = 0;
for (int i = 0; i < kvss.length; ++i) {
KeyValue[] kvs = kvss[i];
if (kvs != null) {
Writer w = writers.get(i - skippedWriters);
if (requireMetadata) {
assertNotNull(w.data.get(STRIPE_START_KEY));
assertNotNull(w.data.get(STRIPE_END_KEY));
} else {
assertNull(w.data.get(STRIPE_START_KEY));
assertNull(w.data.get(STRIPE_END_KEY));
}
assertEquals(kvs.length, w.kvs.size());
for (int j = 0; j < kvs.length; ++j) {
assertEquals(kvs[j], w.kvs.get(j));
}
} else {
assertFalse(allFiles);
++skippedWriters;
}
}
}
public void verifyBoundaries(byte[][] boundaries) {
assertEquals(boundaries.length - 1, writers.size());
for (int i = 0; i < writers.size(); ++i) {
assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY));
assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY));
}
}
public void verifyKvs(KeyValue[][] kvss, boolean allFiles, List<Long> boundaries) {
if (allFiles) {
assertEquals(kvss.length, writers.size());
}
int skippedWriters = 0;
for (int i = 0; i < kvss.length; ++i) {
KeyValue[] kvs = kvss[i];
if (kvs != null) {
Writer w = writers.get(i - skippedWriters);
assertEquals(kvs.length, w.kvs.size());
for (int j = 0; j < kvs.length; ++j) {
assertTrue(kvs[j].getTimestamp() >= boundaries.get(i));
assertTrue(kvs[j].getTimestamp() < boundaries.get(i + 1));
assertEquals(kvs[j], w.kvs.get(j));
}
} else {
assertFalse(allFiles);
++skippedWriters;
}
}
}
public List<Writer> getWriters() {
return writers;
}
}
public static class Scanner implements InternalScanner {
private final ArrayList<KeyValue> kvs;
public Scanner(KeyValue... kvs) {
this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (kvs.isEmpty()) return false;
results.add(kvs.remove(0));
return !kvs.isEmpty();
}
@Override
public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException {
return next(result);
}
@Override
public void close() throws IOException {
}
}
}

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyStoreFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, SmallTests.class })
public class TestDateTieredCompactor {
private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
private static final KeyValue KV_A = new KeyValue(Bytes.toBytes("aaa"), 100L);
private static final KeyValue KV_B = new KeyValue(Bytes.toBytes("bbb"), 200L);
private static final KeyValue KV_C = new KeyValue(Bytes.toBytes("ccc"), 300L);
private static final KeyValue KV_D = new KeyValue(Bytes.toBytes("ddd"), 400L);
@Parameters(name = "{index}: usePrivateReaders={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[] { true }, new Object[] { false });
}
@Parameter
public boolean usePrivateReaders;
private DateTieredCompactor createCompactor(StoreFileWritersCapture writers,
final KeyValue[] input, List<StoreFile> storefiles) throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
final Scanner scanner = new Scanner(input);
// Create store mock that is satisfactory for compactor.
HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, new KVComparator());
final Store store = mock(Store.class);
when(store.getStorefiles()).thenReturn(storefiles);
when(store.getFamily()).thenReturn(col);
when(store.getScanInfo()).thenReturn(si);
when(store.areWritesEnabled()).thenReturn(true);
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.getComparator()).thenReturn(new KVComparator());
long maxSequenceId = StoreFile.getMaxSequenceIdInList(storefiles);
when(store.getMaxSequenceId()).thenReturn(maxSequenceId);
return new DateTieredCompactor(conf, store) {
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
return scanner;
}
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
return scanner;
}
};
}
private void verify(KeyValue[] input, List<Long> boundaries, KeyValue[][] output,
boolean allFiles) throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StoreFile sf1 = createDummyStoreFile(1L);
StoreFile sf2 = createDummyStoreFile(2L);
DateTieredCompactor dtc = createCompactor(writers, input, Arrays.asList(sf1, sf2));
List<Path> paths = dtc.compact(new CompactionRequest(Arrays.asList(sf1)),
boundaries.subList(0, boundaries.size() - 1), NoLimitThroughputController.INSTANCE, null);
writers.verifyKvs(output, allFiles, boundaries);
if (allFiles) {
assertEquals(output.length, paths.size());
}
}
@SuppressWarnings("unchecked")
private static <T> T[] a(T... a) {
return a;
}
@Test
public void test() throws Exception {
verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(100L, 200L, 300L, 400L, 500L),
a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)), true);
verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, 200L, Long.MAX_VALUE),
a(a(KV_A), a(KV_B, KV_C, KV_D)), false);
verify(a(KV_A, KV_B, KV_C, KV_D), Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
new KeyValue[][] { a(KV_A, KV_B, KV_C, KV_D) }, false);
}
@Test
public void testEmptyOutputFile() throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
CompactionRequest request = createDummyRequest();
DateTieredCompactor dtc = createCompactor(writers, new KeyValue[0],
new ArrayList<StoreFile>(request.getFiles()));
List<Path> paths = dtc.compact(request, Arrays.asList(Long.MIN_VALUE, Long.MAX_VALUE),
NoLimitThroughputController.INSTANCE, null);
assertEquals(1, paths.size());
List<StoreFileWritersCapture.Writer> dummyWriters = writers.getWriters();
assertEquals(1, dummyWriters.size());
StoreFileWritersCapture.Writer dummyWriter = dummyWriters.get(0);
assertTrue(dummyWriter.kvs.isEmpty());
assertTrue(dummyWriter.hasMetadata);
}
}

View File

@ -43,6 +43,9 @@ import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
@ -50,6 +53,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.BloomType;
@ -64,8 +68,8 @@ import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests;
@ -75,11 +79,13 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
import org.mockito.ArgumentMatcher;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@RunWith(Parameterized.class)
@Category(SmallTests.class)
public class TestStripeCompactionPolicy {
private static final byte[] KEY_A = Bytes.toBytes("aaa");
@ -99,6 +105,13 @@ public class TestStripeCompactionPolicy {
private final static int defaultInitialCount = 1;
private static long defaultTtl = 1000 * 1000;
@Parameters(name = "{index}: usePrivateReaders={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[] { true }, new Object[] { false });
}
@Parameter
public boolean usePrivateReaders;
@Test
public void testNoStripesFromFlush() throws Exception {
Configuration conf = HBaseConfiguration.create();
@ -388,6 +401,7 @@ public class TestStripeCompactionPolicy {
}
}
@SuppressWarnings("unchecked")
private static StripeCompactionPolicy.StripeInformationProvider createStripesWithFiles(
List<StoreFile>... stripeFiles) throws Exception {
return createStripesWithFiles(createBoundaries(stripeFiles.length),
@ -564,9 +578,10 @@ public class TestStripeCompactionPolicy {
protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(si, input.length);
StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(new KVComparator(), si,
input.length);
StripeMultiFileWriter mw = req.createWriter();
mw.init(null, writers, new KeyValue.KVComparator());
mw.init(null, writers);
for (KeyValue kv : input) {
mw.append(kv);
}
@ -728,6 +743,7 @@ public class TestStripeCompactionPolicy {
when(sf.getReader()).thenReturn(r);
when(sf.createReader(anyBoolean())).thenReturn(r);
when(sf.createReader()).thenReturn(r);
when(sf.cloneForReader()).thenReturn(sf);
return sf;
}
@ -740,7 +756,7 @@ public class TestStripeCompactionPolicy {
when(sf.getMetadataValue(StripeStoreFileManager.STRIPE_END_KEY)).thenReturn(endKey);
}
private static StripeCompactor createCompactor() throws Exception {
private StripeCompactor createCompactor() throws Exception {
HColumnDescriptor col = new HColumnDescriptor(Bytes.toBytes("foo"));
StoreFileWritersCapture writers = new StoreFileWritersCapture();
Store store = mock(Store.class);
@ -753,6 +769,7 @@ public class TestStripeCompactionPolicy {
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
final Scanner scanner = new Scanner();
return new StripeCompactor(conf, store) {
@Override

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import static org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.createDummyRequest;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.Scanner;
import org.apache.hadoop.hbase.regionserver.compactions.TestCompactor.StoreFileWritersCapture;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, SmallTests.class })
public class TestStripeCompactor {
private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
private static final byte[] KEY_B = Bytes.toBytes("bbb");
private static final byte[] KEY_C = Bytes.toBytes("ccc");
private static final byte[] KEY_D = Bytes.toBytes("ddd");
private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
private static final KeyValue KV_B = kvAfter(KEY_B);
private static final KeyValue KV_C = kvAfter(KEY_C);
private static final KeyValue KV_D = kvAfter(KEY_D);
@Parameters(name = "{index}: usePrivateReaders={0}")
public static Iterable<Object[]> data() {
return Arrays.asList(new Object[] { true }, new Object[] { false });
}
@Parameter
public boolean usePrivateReaders;
private static KeyValue kvAfter(byte[] key) {
return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
}
@SuppressWarnings("unchecked")
private static <T> T[] a(T... a) {
return a;
}
private static KeyValue[] e() {
return TestStripeCompactor.<KeyValue> a();
}
@Test
public void testBoundaryCompactions() throws Exception {
// General verification
verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
}
@Test
public void testBoundaryCompactionEmptyFiles() throws Exception {
// No empty file if there're already files.
verifyBoundaryCompaction(a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null),
null, null, false);
verifyBoundaryCompaction(a(KV_A, KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D),
a(a(KV_A), null, a(KV_C)), null, null, false);
// But should be created if there are no file.
verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null,
null, false);
// In major range if there's major range.
verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B,
KEY_C, false);
verifyBoundaryCompaction(e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY,
KEY_C, false);
// Major range should have files regardless of KVs.
verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
}
private void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, KeyValue[][] output)
throws Exception {
verifyBoundaryCompaction(input, boundaries, output, null, null, true);
}
private void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries, KeyValue[][] output,
byte[] majorFrom, byte[] majorTo, boolean allFiles) throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths = sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom,
majorTo, NoLimitThroughputController.INSTANCE, null);
writers.verifyKvs(output, allFiles, true);
if (allFiles) {
assertEquals(output.length, paths.size());
writers.verifyBoundaries(boundaries);
}
}
@Test
public void testSizeCompactions() throws Exception {
// General verification with different sizes.
verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
// Verify row boundaries are preserved.
verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
// Too much data, count limits the number of files.
verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
a(a(KV_A), a(KV_B, KV_C, KV_D)));
verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
new KeyValue[][] { a(KV_A, KV_B, KV_C) });
// Too little data/large count, no extra files.
verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
a(a(KV_A, KV_B), a(KV_C, KV_D)));
}
private void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize, byte[] left,
byte[] right, KeyValue[][] output) throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths = sc.compact(createDummyRequest(), targetCount, targetSize, left, right, null,
null, NoLimitThroughputController.INSTANCE, null);
assertEquals(output.length, paths.size());
writers.verifyKvs(output, true, true);
List<byte[]> boundaries = new ArrayList<byte[]>();
boundaries.add(left);
for (int i = 1; i < output.length; ++i) {
boundaries.add(CellUtil.cloneRow(output[i][0]));
}
boundaries.add(right);
writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
}
private StripeCompactor createCompactor(StoreFileWritersCapture writers, KeyValue[] input)
throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean("hbase.regionserver.compaction.private.readers", usePrivateReaders);
final Scanner scanner = new Scanner(input);
// Create store mock that is satisfactory for compactor.
HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
ScanInfo si = new ScanInfo(conf, col, Long.MAX_VALUE, 0, new KVComparator());
Store store = mock(Store.class);
when(store.getFamily()).thenReturn(col);
when(store.getScanInfo()).thenReturn(si);
when(store.areWritesEnabled()).thenReturn(true);
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class), anyBoolean(),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.getComparator()).thenReturn(new KVComparator());
return new StripeCompactor(conf, store) {
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
return scanner;
}
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
return scanner;
}
};
}
}