HBASE-7967 implement compactor for stripe compactions

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1536570 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
sershe 2013-10-29 00:40:22 +00:00
parent 9023d46908
commit 60ce11220f
7 changed files with 855 additions and 20 deletions

View File

@ -81,6 +81,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
* KVs skipped via seeking to next row/column. TODO: estimate them? * KVs skipped via seeking to next row/column. TODO: estimate them?
*/ */
private long kvsScanned = 0; private long kvsScanned = 0;
private KeyValue prevKV = null;
/** We don't ever expect to change this, the constant is just for clarity. */ /** We don't ever expect to change this, the constant is just for clarity. */
static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true; static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
@ -411,7 +412,6 @@ public class StoreScanner extends NonLazyKeyValueScanner
} }
KeyValue kv; KeyValue kv;
KeyValue prevKV = null;
// Only do a sanity-check if store and comparator are available. // Only do a sanity-check if store and comparator are available.
KeyValue.KVComparator comparator = KeyValue.KVComparator comparator =
@ -419,7 +419,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
int count = 0; int count = 0;
LOOP: while((kv = this.heap.peek()) != null) { LOOP: while((kv = this.heap.peek()) != null) {
++kvsScanned; if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
// Check that the heap gives us KVs in an increasing order. // Check that the heap gives us KVs in an increasing order.
assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 : assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
"Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store; "Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;

View File

@ -0,0 +1,380 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Base class for cell sink that separates the provided cells into multiple files.
*/
public abstract class StripeMultiFileWriter implements Compactor.CellSink {
private static final Log LOG = LogFactory.getLog(StripeMultiFileWriter.class);
/** Factory that is used to produce single StoreFile.Writer-s */
protected WriterFactory writerFactory;
protected KVComparator comparator;
protected List<StoreFile.Writer> existingWriters;
protected List<byte[]> boundaries;
/** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
protected StoreScanner sourceScanner;
public interface WriterFactory {
public StoreFile.Writer createWriter() throws IOException;
}
/**
* Initializes multi-writer before usage.
* @param sourceScanner Optional store scanner to obtain the information about read progress.
* @param factory Factory used to produce individual file writers.
* @param comparator Comparator used to compare rows.
*/
public void init(StoreScanner sourceScanner, WriterFactory factory, KVComparator comparator)
throws IOException {
this.writerFactory = factory;
this.sourceScanner = sourceScanner;
this.comparator = comparator;
}
public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
assert this.existingWriters != null;
commitWritersInternal();
assert this.boundaries.size() == (this.existingWriters.size() + 1);
LOG.debug("Writing out metadata for " + this.existingWriters.size() + " writers");
List<Path> paths = new ArrayList<Path>();
for (int i = 0; i < this.existingWriters.size(); ++i) {
StoreFile.Writer writer = this.existingWriters.get(i);
if (writer == null) continue; // writer was skipped due to 0 KVs
writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
writer.appendMetadata(maxSeqId, isMajor);
paths.add(writer.getPath());
writer.close();
}
this.existingWriters = null;
return paths;
}
public List<Path> abortWriters() {
assert this.existingWriters != null;
List<Path> paths = new ArrayList<Path>();
for (StoreFile.Writer writer : this.existingWriters) {
try {
paths.add(writer.getPath());
writer.close();
} catch (Exception ex) {
LOG.error("Failed to close the writer after an unfinished compaction.", ex);
}
}
this.existingWriters = null;
return paths;
}
/**
* Subclasses can call this method to make sure the first KV is within multi-writer range.
* @param left The left boundary of the writer.
* @param row The row to check.
* @param rowOffset Offset for row.
* @param rowLength Length for row.
*/
protected void sanityCheckLeft(
byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
if (StripeStoreFileManager.OPEN_KEY != left &&
comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
String error = "The first row is lower than the left boundary of ["
+ Bytes.toString(left) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
LOG.error(error);
throw new IOException(error);
}
}
/**
* Subclasses can call this method to make sure the last KV is within multi-writer range.
* @param right The right boundary of the writer.
* @param row The row to check.
* @param rowOffset Offset for row.
* @param rowLength Length for row.
*/
protected void sanityCheckRight(
byte[] right, byte[] row, int rowOffset, int rowLength) throws IOException {
if (StripeStoreFileManager.OPEN_KEY != right &&
comparator.compareRows(row, rowOffset, rowLength, right, 0, right.length) >= 0) {
String error = "The last row is higher or equal than the right boundary of ["
+ Bytes.toString(right) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
LOG.error(error);
throw new IOException(error);
}
}
/**
* Subclasses override this method to be called at the end of a successful sequence of
* append; all appends are processed before this method is called.
*/
protected abstract void commitWritersInternal() throws IOException;
/**
* MultiWriter that separates the cells based on fixed row-key boundaries.
* All the KVs between each pair of neighboring boundaries from the list supplied to ctor
* will end up in one file, and separate from all other such pairs.
*/
public static class BoundaryMultiWriter extends StripeMultiFileWriter {
private StoreFile.Writer currentWriter;
private byte[] currentWriterEndKey;
private KeyValue lastKv;
private long kvsInCurrentWriter = 0;
private int majorRangeFromIndex = -1, majorRangeToIndex = -1;
private boolean hasAnyWriter = false;
/**
* @param targetBoundaries The boundaries on which writers/files are separated.
* @param majorRangeFrom Major range is the range for which at least one file should be
* written (because all files are included in compaction).
* majorRangeFrom is the left boundary.
* @param majorRangeTo The right boundary of majorRange (see majorRangeFrom).
*/
public BoundaryMultiWriter(List<byte[]> targetBoundaries,
byte[] majorRangeFrom, byte[] majorRangeTo) throws IOException {
super();
this.boundaries = targetBoundaries;
this.existingWriters = new ArrayList<StoreFile.Writer>(this.boundaries.size() - 1);
// "major" range (range for which all files are included) boundaries, if any,
// must match some target boundaries, let's find them.
assert (majorRangeFrom == null) == (majorRangeTo == null);
if (majorRangeFrom != null) {
majorRangeFromIndex = (majorRangeFrom == StripeStoreFileManager.OPEN_KEY) ? 0
: Collections.binarySearch(this.boundaries, majorRangeFrom, Bytes.BYTES_COMPARATOR);
majorRangeToIndex = (majorRangeTo == StripeStoreFileManager.OPEN_KEY) ? boundaries.size()
: Collections.binarySearch(this.boundaries, majorRangeTo, Bytes.BYTES_COMPARATOR);
if (this.majorRangeFromIndex < 0 || this.majorRangeToIndex < 0) {
throw new IOException("Major range does not match writer boundaries: [" +
Bytes.toString(majorRangeFrom) + "] [" + Bytes.toString(majorRangeTo) + "]; from "
+ majorRangeFromIndex + " to " + majorRangeToIndex);
}
}
}
@Override
public void append(KeyValue kv) throws IOException {
if (currentWriter == null && existingWriters.isEmpty()) {
// First append ever, do a sanity check.
sanityCheckLeft(this.boundaries.get(0),
kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
}
prepareWriterFor(kv);
currentWriter.append(kv);
lastKv = kv; // for the sanity check
++kvsInCurrentWriter;
}
private boolean isKvAfterCurrentWriter(KeyValue kv) {
return ((currentWriterEndKey != StripeStoreFileManager.OPEN_KEY) &&
(comparator.compareRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
currentWriterEndKey, 0, currentWriterEndKey.length) >= 0));
}
@Override
protected void commitWritersInternal() throws IOException {
stopUsingCurrentWriter();
while (existingWriters.size() < boundaries.size() - 1) {
createEmptyWriter();
}
if (lastKv != null) {
sanityCheckRight(boundaries.get(boundaries.size() - 1),
lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
}
}
private void prepareWriterFor(KeyValue kv) throws IOException {
if (currentWriter != null && !isKvAfterCurrentWriter(kv)) return; // Use same writer.
stopUsingCurrentWriter();
// See if KV will be past the writer we are about to create; need to add another one.
while (isKvAfterCurrentWriter(kv)) {
checkCanCreateWriter();
createEmptyWriter();
}
checkCanCreateWriter();
hasAnyWriter = true;
currentWriter = writerFactory.createWriter();
existingWriters.add(currentWriter);
}
/**
* Called if there are no cells for some stripe.
* We need to have something in the writer list for this stripe, so that writer-boundary
* list indices correspond to each other. We can insert null in the writer list for that
* purpose, except in the following cases where we actually need a file:
* 1) If we are in range for which we are compacting all the files, we need to create an
* empty file to preserve stripe metadata.
* 2) If we have not produced any file at all for this compactions, and this is the
* last chance (the last stripe), we need to preserve last seqNum (see also HBASE-6059).
*/
private void createEmptyWriter() throws IOException {
int index = existingWriters.size();
boolean isInMajorRange = (index >= majorRangeFromIndex) && (index < majorRangeToIndex);
// Stripe boundary count = stripe count + 1, so last stripe index is (#boundaries minus 2)
boolean isLastWriter = !hasAnyWriter && (index == (boundaries.size() - 2));
boolean needEmptyFile = isInMajorRange || isLastWriter;
existingWriters.add(needEmptyFile ? writerFactory.createWriter() : null);
hasAnyWriter |= needEmptyFile;
currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
? null : boundaries.get(existingWriters.size() + 1);
}
private void checkCanCreateWriter() throws IOException {
int maxWriterCount = boundaries.size() - 1;
assert existingWriters.size() <= maxWriterCount;
if (existingWriters.size() >= maxWriterCount) {
throw new IOException("Cannot create any more writers (created " + existingWriters.size()
+ " out of " + maxWriterCount + " - row might be out of range of all valid writers");
}
}
private void stopUsingCurrentWriter() {
if (currentWriter != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping to use a writer after [" + Bytes.toString(currentWriterEndKey)
+ "] row; wrote out " + kvsInCurrentWriter + " kvs");
}
kvsInCurrentWriter = 0;
}
currentWriter = null;
currentWriterEndKey = (existingWriters.size() + 1 == boundaries.size())
? null : boundaries.get(existingWriters.size() + 1);
}
}
/**
* MultiWriter that separates the cells based on target cell number per file and file count.
* New file is started every time the target number of KVs is reached, unless the fixed
* count of writers has already been created (in that case all the remaining KVs go into
* the last writer).
*/
public static class SizeMultiWriter extends StripeMultiFileWriter {
private int targetCount;
private long targetKvs;
private byte[] left;
private byte[] right;
private KeyValue lastKv;
private StoreFile.Writer currentWriter;
protected byte[] lastRowInCurrentWriter = null;
private long kvsInCurrentWriter = 0;
private long kvsSeen = 0;
private long kvsSeenInPrevious = 0;
/**
* @param targetCount The maximum count of writers that can be created.
* @param targetKvs The number of KVs to read from source before starting each new writer.
* @param left The left boundary of the first writer.
* @param right The right boundary of the last writer.
*/
public SizeMultiWriter(int targetCount, long targetKvs, byte[] left, byte[] right) {
super();
this.targetCount = targetCount;
this.targetKvs = targetKvs;
this.left = left;
this.right = right;
int preallocate = Math.min(this.targetCount, 64);
this.existingWriters = new ArrayList<StoreFile.Writer>(preallocate);
this.boundaries = new ArrayList<byte[]>(preallocate + 1);
}
@Override
public void append(KeyValue kv) throws IOException {
// If we are waiting for opportunity to close and we started writing different row,
// discard the writer and stop waiting.
boolean doCreateWriter = false;
if (currentWriter == null) {
// First append ever, do a sanity check.
sanityCheckLeft(left, kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
doCreateWriter = true;
} else if (lastRowInCurrentWriter != null
&& !comparator.matchingRows(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
lastRowInCurrentWriter, 0, lastRowInCurrentWriter.length)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping to use a writer after [" + Bytes.toString(lastRowInCurrentWriter)
+ "] row; wrote out " + kvsInCurrentWriter + " kvs");
}
lastRowInCurrentWriter = null;
kvsInCurrentWriter = 0;
kvsSeenInPrevious += kvsSeen;
doCreateWriter = true;
}
if (doCreateWriter) {
byte[] boundary = existingWriters.isEmpty() ? left : kv.getRow(); // make a copy
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new writer starting at [" + Bytes.toString(boundary) + "]");
}
currentWriter = writerFactory.createWriter();
boundaries.add(boundary);
existingWriters.add(currentWriter);
}
currentWriter.append(kv);
lastKv = kv; // for the sanity check
++kvsInCurrentWriter;
kvsSeen = kvsInCurrentWriter;
if (this.sourceScanner != null) {
kvsSeen = Math.max(kvsSeen,
this.sourceScanner.getEstimatedNumberOfKvsScanned() - kvsSeenInPrevious);
}
// If we are not already waiting for opportunity to close, start waiting if we can
// create any more writers and if the current one is too big.
if (lastRowInCurrentWriter == null
&& existingWriters.size() < targetCount
&& kvsSeen >= targetKvs) {
lastRowInCurrentWriter = kv.getRow(); // make a copy
if (LOG.isDebugEnabled()) {
LOG.debug("Preparing to start a new writer after [" + Bytes.toString(
lastRowInCurrentWriter) + "] row; observed " + kvsSeen + " kvs and wrote out "
+ kvsInCurrentWriter + " kvs");
}
}
}
@Override
protected void commitWritersInternal() throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Stopping with " + kvsInCurrentWriter + " kvs in last writer" +
((this.sourceScanner == null) ? "" : ("; observed estimated "
+ this.sourceScanner.getEstimatedNumberOfKvsScanned() + " KVs total")));
}
if (lastKv != null) {
sanityCheckRight(
right, lastKv.getRowArray(), lastKv.getRowOffset(), lastKv.getRowLength());
}
this.boundaries.add(right);
}
}
}

View File

@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.CellOutputStream;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo; import org.apache.hadoop.hbase.io.hfile.HFile.FileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileWriterV2; import org.apache.hadoop.hbase.io.hfile.HFileWriterV2;
@ -97,6 +96,12 @@ public abstract class Compactor {
public int maxTagsLength = 0; public int maxTagsLength = 0;
} }
/**
* Extracts some details about the files to compact that are commonly needed by compactors.
* @param filesToCompact Files.
* @param calculatePutTs Whether earliest put TS is needed.
* @return The result.
*/
protected FileDetails getFileDetails( protected FileDetails getFileDetails(
Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException { Collection<StoreFile> filesToCompact, boolean calculatePutTs) throws IOException {
FileDetails fd = new FileDetails(); FileDetails fd = new FileDetails();
@ -151,6 +156,11 @@ public abstract class Compactor {
return fd; return fd;
} }
/**
* Creates file scanners for compaction.
* @param filesToCompact Files.
* @return Scanners.
*/
protected List<StoreFileScanner> createFileScanners( protected List<StoreFileScanner> createFileScanners(
final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException { final Collection<StoreFile> filesToCompact, long smallestReadPoint) throws IOException {
return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true, return StoreFileScanner.getScannersForStoreFiles(filesToCompact, false, false, true,
@ -161,6 +171,14 @@ public abstract class Compactor {
return store.getSmallestReadPoint(); return store.getSmallestReadPoint();
} }
/**
* Calls coprocessor, if any, to create compaction scanner - before normal scanner creation.
* @param request Compaction request.
* @param scanType Scan type.
* @param earliestPutTs Earliest put ts.
* @param scanners File scanners for compaction files.
* @return Scanner override by coprocessor; null if not overriding.
*/
protected InternalScanner preCreateCoprocScanner(final CompactionRequest request, protected InternalScanner preCreateCoprocScanner(final CompactionRequest request,
ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException { ScanType scanType, long earliestPutTs, List<StoreFileScanner> scanners) throws IOException {
if (store.getCoprocessorHost() == null) return null; if (store.getCoprocessorHost() == null) return null;
@ -168,13 +186,27 @@ public abstract class Compactor {
.preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request); .preCompactScannerOpen(store, scanners, scanType, earliestPutTs, request);
} }
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request, /**
* Calls coprocessor, if any, to create scanners - after normal scanner creation.
* @param request Compaction request.
* @param scanType Scan type.
* @param scanner The default scanner created for compaction.
* @return Scanner scanner to use (usually the default); null if compaction should not proceed.
*/
protected InternalScanner postCreateCoprocScanner(final CompactionRequest request,
ScanType scanType, InternalScanner scanner) throws IOException { ScanType scanType, InternalScanner scanner) throws IOException {
if (store.getCoprocessorHost() == null) return scanner; if (store.getCoprocessorHost() == null) return scanner;
return store.getCoprocessorHost().preCompact(store, scanner, scanType, request); return store.getCoprocessorHost().preCompact(store, scanner, scanType, request);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
/**
* Performs the compaction.
* @param scanner Where to read from.
* @param writer Where to write to.
* @param smallestReadPoint Smallest read point.
* @return Whether compaction ended; false if it was interrupted for some reason.
*/
protected boolean performCompaction(InternalScanner scanner, protected boolean performCompaction(InternalScanner scanner,
CellSink writer, long smallestReadPoint) throws IOException { CellSink writer, long smallestReadPoint) throws IOException {
int bytesWritten = 0; int bytesWritten = 0;
@ -213,12 +245,8 @@ public abstract class Compactor {
return true; return true;
} }
protected void abortWriter(final StoreFile.Writer writer) throws IOException {
writer.close();
store.getFileSystem().delete(writer.getPath(), false);
}
/** /**
* @param store store
* @param scanners Store file scanners. * @param scanners Store file scanners.
* @param scanType Scan type. * @param scanType Scan type.
* @param smallestReadPoint Smallest MVCC read point. * @param smallestReadPoint Smallest MVCC read point.
@ -232,4 +260,20 @@ public abstract class Compactor {
return new StoreScanner(store, store.getScanInfo(), scan, scanners, return new StoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs); scanType, smallestReadPoint, earliestPutTs);
} }
/**
* @param scanners Store file scanners.
* @param scanType Scan type.
* @param smallestReadPoint Smallest MVCC read point.
* @param earliestPutTs Earliest put across all files.
* @return A compaction scanner.
*/
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
return new StoreScanner(store, store.getScanInfo(), scan, scanners, smallestReadPoint,
earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
}
} }

View File

@ -75,7 +75,8 @@ public class DefaultCompactor extends Compactor {
fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0); fd.maxMVCCReadpoint >= smallestReadPoint, fd.maxTagsLength > 0);
boolean finished = performCompaction(scanner, writer, smallestReadPoint); boolean finished = performCompaction(scanner, writer, smallestReadPoint);
if (!finished) { if (!finished) {
abortWriter(writer); writer.close();
store.getFileSystem().delete(writer.getPath(), false);
writer = null; writer = null;
throw new InterruptedIOException( "Aborting compaction of store " + store + throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() + " in region " + store.getRegionInfo().getRegionNameAsString() +

View File

@ -380,7 +380,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
* @param compactor Compactor. * @param compactor Compactor.
* @return result of compact(...) * @return result of compact(...)
*/ */
public abstract List<Path> execute(StripeCompactor compactor); public abstract List<Path> execute(StripeCompactor compactor) throws IOException;
public StripeCompactionRequest(CompactionRequest request) { public StripeCompactionRequest(CompactionRequest request) {
this.request = request; this.request = request;
@ -431,7 +431,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
} }
@Override @Override
public List<Path> execute(StripeCompactor compactor) { public List<Path> execute(StripeCompactor compactor) throws IOException {
return compactor.compact( return compactor.compact(
this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow); this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow);
} }
@ -481,7 +481,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
} }
@Override @Override
public List<Path> execute(StripeCompactor compactor) { public List<Path> execute(StripeCompactor compactor) throws IOException {
return compactor.compact(this.request, this.targetCount, this.targetKvs, return compactor.compact(this.request, this.targetCount, this.targetKvs,
this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow); this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow);
} }

View File

@ -17,30 +17,140 @@
*/ */
package org.apache.hadoop.hbase.regionserver.compactions; package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List; import java.util.List;
import org.apache.commons.lang.NotImplementedException; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
import org.apache.hadoop.hbase.util.Bytes;
/** /**
* This is the placeholder for stripe compactor. The implementation, * This is the placeholder for stripe compactor. The implementation,
* as well as the proper javadoc, will be added in HBASE-7967. * as well as the proper javadoc, will be added in HBASE-7967.
*/ */
public class StripeCompactor extends Compactor { public class StripeCompactor extends Compactor {
private static final Log LOG = LogFactory.getLog(StripeCompactor.class);
public StripeCompactor(Configuration conf, final Store store) { public StripeCompactor(Configuration conf, Store store) {
super(conf, store); super(conf, store);
} }
public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries, public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
byte[] dropDeletesFromRow, byte[] dropDeletesToRow) { byte[] majorRangeFromRow, byte[] majorRangeToRow) throws IOException {
throw new NotImplementedException(); if (LOG.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
sb.append("Executing compaction with " + targetBoundaries.size() + " boundaries:");
for (byte[] tb : targetBoundaries) {
sb.append(" [").append(Bytes.toString(tb)).append("]");
}
LOG.debug(sb.toString());
}
StripeMultiFileWriter writer = new StripeMultiFileWriter.BoundaryMultiWriter(
targetBoundaries, majorRangeFromRow, majorRangeToRow);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow);
} }
public List<Path> compact(CompactionRequest request, int targetCount, long targetSize, public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
byte[] left, byte[] right, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) { byte[] left, byte[] right, byte[] majorRangeFromRow, byte[] majorRangeToRow)
throw new NotImplementedException(); throws IOException {
if (LOG.isDebugEnabled()) {
LOG.debug("Executing compaction with " + targetSize
+ " target file size, no more than " + targetCount + " files, in ["
+ Bytes.toString(left) + "] [" + Bytes.toString(right) + "] range");
}
StripeMultiFileWriter writer = new StripeMultiFileWriter.SizeMultiWriter(
targetCount, targetSize, left, right);
return compactInternal(writer, request, majorRangeFromRow, majorRangeToRow);
}
private List<Path> compactInternal(StripeMultiFileWriter mw, CompactionRequest request,
byte[] majorRangeFromRow, byte[] majorRangeToRow) throws IOException {
final Collection<StoreFile> filesToCompact = request.getFiles();
final FileDetails fd = getFileDetails(filesToCompact, request.isMajor());
this.progress = new CompactionProgress(fd.maxKeyCount);
long smallestReadPoint = getSmallestReadPoint();
List<StoreFileScanner> scanners = createFileScanners(filesToCompact, smallestReadPoint);
boolean finished = false;
InternalScanner scanner = null;
try {
// Get scanner to use.
ScanType coprocScanType = ScanType.COMPACT_RETAIN_DELETES;
scanner = preCreateCoprocScanner(request, coprocScanType, fd.earliestPutTs, scanners);
if (scanner == null) {
scanner = (majorRangeFromRow == null)
? createScanner(store, scanners,
ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, fd.earliestPutTs)
: createScanner(store, scanners,
smallestReadPoint, fd.earliestPutTs, majorRangeFromRow, majorRangeToRow);
}
scanner = postCreateCoprocScanner(request, coprocScanType, scanner);
if (scanner == null) {
// NULL scanner returned from coprocessor hooks means skip normal processing.
return new ArrayList<Path>();
}
// Create the writer factory for compactions.
final boolean needMvcc = fd.maxMVCCReadpoint >= smallestReadPoint;
final Compression.Algorithm compression = store.getFamily().getCompactionCompression();
StripeMultiFileWriter.WriterFactory factory = new StripeMultiFileWriter.WriterFactory() {
@Override
public Writer createWriter() throws IOException {
return store.createWriterInTmp(
fd.maxKeyCount, compression, true, needMvcc, fd.maxTagsLength > 0);
}
};
// Prepare multi-writer, and perform the compaction using scanner and writer.
// It is ok here if storeScanner is null.
StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
mw.init(storeScanner, factory, store.getComparator());
finished = performCompaction(scanner, mw, smallestReadPoint);
if (!finished) {
throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getRegionInfo().getRegionNameAsString() +
" because it was interrupted.");
}
} finally {
if (scanner != null) {
try {
scanner.close();
} catch (Throwable t) {
// Don't fail the compaction if this fails.
LOG.error("Failed to close scanner after compaction.", t);
}
}
if (!finished) {
for (Path leftoverFile : mw.abortWriters()) {
try {
store.getFileSystem().delete(leftoverFile, false);
} catch (Exception ex) {
LOG.error("Failed to delete the leftover file after an unfinished compaction.", ex);
}
}
}
}
assert finished : "We should have exited the method on all error paths";
List<Path> newFiles = mw.commitWriters(fd.maxSeqId, request.isMajor());
assert !newFiles.isEmpty() : "Should have produced an empty file to preserve metadata.";
return newFiles;
} }
} }

View File

@ -0,0 +1,300 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_START_KEY;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.STRIPE_END_KEY;
import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.ListIterator;
import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Category(SmallTests.class)
public class TestStripeCompactor {
private static final byte[] NAME_OF_THINGS = Bytes.toBytes("foo");
private static final TableName TABLE_NAME = TableName.valueOf(NAME_OF_THINGS, NAME_OF_THINGS);
private static final byte[] KEY_B = Bytes.toBytes("bbb");
private static final byte[] KEY_C = Bytes.toBytes("ccc");
private static final byte[] KEY_D = Bytes.toBytes("ddd");
private static final KeyValue KV_A = kvAfter(Bytes.toBytes("aaa"));
private static final KeyValue KV_B = kvAfter(KEY_B);
private static final KeyValue KV_C = kvAfter(KEY_C);
private static final KeyValue KV_D = kvAfter(KEY_D);
private static KeyValue kvAfter(byte[] key) {
return new KeyValue(Arrays.copyOf(key, key.length + 1), 0L);
}
private static <T> T[] a(T... a) {
return a;
}
private static KeyValue[] e() {
return TestStripeCompactor.<KeyValue>a();
}
@Test
public void testBoundaryCompactions() throws Exception {
// General verification
verifyBoundaryCompaction(a(KV_A, KV_A, KV_B, KV_B, KV_C, KV_D),
a(OPEN_KEY, KEY_B, KEY_D, OPEN_KEY), a(a(KV_A, KV_A), a(KV_B, KV_B, KV_C), a(KV_D)));
verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_C, KEY_D), a(a(KV_B), a(KV_C)));
verifyBoundaryCompaction(a(KV_B, KV_C), a(KEY_B, KEY_D), new KeyValue[][] { a(KV_B, KV_C) });
}
@Test
public void testBoundaryCompactionEmptyFiles() throws Exception {
// No empty file if there're already files.
verifyBoundaryCompaction(
a(KV_B), a(KEY_B, KEY_C, KEY_D, OPEN_KEY), a(a(KV_B), null, null), null, null, false);
verifyBoundaryCompaction(a(KV_A, KV_C),
a(OPEN_KEY, KEY_B, KEY_C, KEY_D), a(a(KV_A), null, a(KV_C)), null, null, false);
// But should be created if there are no file.
verifyBoundaryCompaction(
e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, null, e()), null, null, false);
// In major range if there's major range.
verifyBoundaryCompaction(
e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(null, e(), null), KEY_B, KEY_C, false);
verifyBoundaryCompaction(
e(), a(OPEN_KEY, KEY_B, KEY_C, OPEN_KEY), a(e(), e(), null), OPEN_KEY, KEY_C, false);
// Major range should have files regardless of KVs.
verifyBoundaryCompaction(a(KV_A), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
a(a(KV_A), e(), e(), null), KEY_B, KEY_D, false);
verifyBoundaryCompaction(a(KV_C), a(OPEN_KEY, KEY_B, KEY_C, KEY_D, OPEN_KEY),
a(null, null, a(KV_C), e()), KEY_C, OPEN_KEY, false);
}
public static void verifyBoundaryCompaction(
KeyValue[] input, byte[][] boundaries, KeyValue[][] output) throws Exception {
verifyBoundaryCompaction(input, boundaries, output, null, null, true);
}
public static void verifyBoundaryCompaction(KeyValue[] input, byte[][] boundaries,
KeyValue[][] output, byte[] majorFrom, byte[] majorTo, boolean allFiles)
throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo);
writers.verifyKvs(output, allFiles);
if (allFiles) {
assertEquals(output.length, paths.size());
writers.verifyBoundaries(boundaries);
}
}
@Test
public void testSizeCompactions() throws Exception {
// General verification with different sizes.
verifySizeCompaction(a(KV_A, KV_A, KV_B, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
a(a(KV_A, KV_A), a(KV_B, KV_C), a(KV_D)));
verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 4, 1, OPEN_KEY, OPEN_KEY,
a(a(KV_A), a(KV_B), a(KV_C), a(KV_D)));
verifySizeCompaction(a(KV_B, KV_C), 2, 1, KEY_B, KEY_D, a(a(KV_B), a(KV_C)));
// Verify row boundaries are preserved.
verifySizeCompaction(a(KV_A, KV_A, KV_A, KV_C, KV_D), 3, 2, OPEN_KEY, OPEN_KEY,
a(a(KV_A, KV_A, KV_A), a(KV_C, KV_D)));
verifySizeCompaction(a(KV_A, KV_B, KV_B, KV_C), 3, 1, OPEN_KEY, OPEN_KEY,
a(a(KV_A), a(KV_B, KV_B), a(KV_C)));
// Too much data, count limits the number of files.
verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), 2, 1, OPEN_KEY, OPEN_KEY,
a(a(KV_A), a(KV_B, KV_C, KV_D)));
verifySizeCompaction(a(KV_A, KV_B, KV_C), 1, Long.MAX_VALUE, OPEN_KEY, KEY_D,
new KeyValue[][] { a(KV_A, KV_B, KV_C) });
// Too little data/large count, no extra files.
verifySizeCompaction(a(KV_A, KV_B, KV_C, KV_D), Integer.MAX_VALUE, 2, OPEN_KEY, OPEN_KEY,
a(a(KV_A, KV_B), a(KV_C, KV_D)));
}
public static void verifySizeCompaction(KeyValue[] input, int targetCount, long targetSize,
byte[] left, byte[] right, KeyValue[][] output) throws Exception {
StoreFileWritersCapture writers = new StoreFileWritersCapture();
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths = sc.compact(
createDummyRequest(), targetCount, targetSize, left, right, null, null);
assertEquals(output.length, paths.size());
writers.verifyKvs(output, true);
List<byte[]> boundaries = new ArrayList<byte[]>();
boundaries.add(left);
for (int i = 1; i < output.length; ++i) {
boundaries.add(output[i][0].getRow());
}
boundaries.add(right);
writers.verifyBoundaries(boundaries.toArray(new byte[][] {}));
}
private static StripeCompactor createCompactor(
StoreFileWritersCapture writers, KeyValue[] input) throws Exception {
Configuration conf = HBaseConfiguration.create();
final Scanner scanner = new Scanner(input);
// Create store mock that is satisfactory for compactor.
HColumnDescriptor col = new HColumnDescriptor(NAME_OF_THINGS);
ScanInfo si = new ScanInfo(col, Long.MAX_VALUE, 0, new KVComparator());
Store store = mock(Store.class);
when(store.getFamily()).thenReturn(col);
when(store.getScanInfo()).thenReturn(si);
when(store.areWritesEnabled()).thenReturn(true);
when(store.getFileSystem()).thenReturn(mock(FileSystem.class));
when(store.getRegionInfo()).thenReturn(new HRegionInfo(TABLE_NAME));
when(store.createWriterInTmp(anyLong(), any(Compression.Algorithm.class),
anyBoolean(), anyBoolean(), anyBoolean())).thenAnswer(writers);
when(store.getComparator()).thenReturn(new KVComparator());
return new StripeCompactor(conf, store) {
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
long smallestReadPoint, long earliestPutTs, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow) throws IOException {
return scanner;
}
@Override
protected InternalScanner createScanner(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long smallestReadPoint, long earliestPutTs) throws IOException {
return scanner;
}
};
}
private static CompactionRequest createDummyRequest() throws Exception {
// "Files" are totally unused, it's Scanner class below that gives compactor fake KVs.
// But compaction depends on everything under the sun, so stub everything with dummies.
StoreFile sf = mock(StoreFile.class);
StoreFile.Reader r = mock(StoreFile.Reader.class);
when(r.length()).thenReturn(1L);
when(r.getBloomFilterType()).thenReturn(BloomType.NONE);
when(r.getHFileReader()).thenReturn(mock(HFile.Reader.class));
when(r.getStoreFileScanner(anyBoolean(), anyBoolean(), anyBoolean(), anyLong()))
.thenReturn(mock(StoreFileScanner.class));
when(sf.getReader()).thenReturn(r);
when(sf.createReader()).thenReturn(r);
return new CompactionRequest(Arrays.asList(sf));
}
private static class Scanner implements InternalScanner {
private final ArrayList<KeyValue> kvs;
public Scanner(KeyValue... kvs) {
this.kvs = new ArrayList<KeyValue>(Arrays.asList(kvs));
}
@Override
public boolean next(List<Cell> results) throws IOException {
if (kvs.isEmpty()) return false;
results.add(kvs.remove(0));
return !kvs.isEmpty();
}
@Override
public boolean next(List<Cell> result, int limit) throws IOException {
return next(result);
}
@Override
public void close() throws IOException {}
}
// StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
private static class StoreFileWritersCapture implements Answer<StoreFile.Writer> {
public static class Writer {
public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
}
private List<Writer> writers = new ArrayList<Writer>();
@Override
public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
final Writer realWriter = new Writer();
writers.add(realWriter);
StoreFile.Writer writer = mock(StoreFile.Writer.class);
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
return realWriter.kvs.add((KeyValue)invocation.getArguments()[0]);
}}).when(writer).append(any(KeyValue.class));
doAnswer(new Answer<Object>() {
public Object answer(InvocationOnMock invocation) {
Object[] args = invocation.getArguments();
return realWriter.data.put((byte[])args[0], (byte[])args[1]);
}}).when(writer).appendFileInfo(any(byte[].class), any(byte[].class));
return writer;
}
public void verifyKvs(KeyValue[][] kvss, boolean allFiles) {
if (allFiles) {
assertEquals(kvss.length, writers.size());
}
int skippedWriters = 0;
for (int i = 0; i < kvss.length; ++i) {
KeyValue[] kvs = kvss[i];
if (kvs != null) {
Writer w = writers.get(i - skippedWriters);
assertNotNull(w.data.get(STRIPE_START_KEY));
assertNotNull(w.data.get(STRIPE_END_KEY));
assertEquals(kvs.length, w.kvs.size());
for (int j = 0; j < kvs.length; ++j) {
assertEquals(kvs[j], w.kvs.get(j));
}
} else {
assertFalse(allFiles);
++skippedWriters;
}
}
}
public void verifyBoundaries(byte[][] boundaries) {
assertEquals(boundaries.length - 1, writers.size());
for (int i = 0; i < writers.size(); ++i) {
assertArrayEquals("i = " + i, boundaries[i], writers.get(i).data.get(STRIPE_START_KEY));
assertArrayEquals("i = " + i, boundaries[i + 1], writers.get(i).data.get(STRIPE_END_KEY));
}
}
}
}