HBASE-3797 StoreFile Level Compaction Locking

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1101676 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Nicolas Spiegelberg 2011-05-10 23:20:45 +00:00
parent 3b993d8c4d
commit 1f4eb71478
11 changed files with 390 additions and 789 deletions

View File

@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
@ -28,6 +30,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
@ -42,13 +45,14 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
private final HRegionServer server;
private final Configuration conf;
private final PriorityCompactionQueue compactionQueue =
new PriorityCompactionQueue();
protected final BlockingQueue<CompactionRequest> compactionQueue =
new PriorityBlockingQueue<CompactionRequest>();
/* The default priority for user-specified compaction requests.
* The user gets top priority unless we have blocking compactions. (Pri <= 0)
*/
public static final int PRIORITY_USER = 1;
public static final int NO_PRIORITY = Integer.MIN_VALUE;
/**
* Splitting should not take place if the total number of regions exceed this.
@ -74,21 +78,36 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
while (!this.server.isStopped()) {
CompactionRequest compactionRequest = null;
HRegion r = null;
boolean completed = false;
try {
compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
if (compactionRequest != null) {
r = compactionRequest.getHRegion();
lock.lock();
try {
// look for a split first
if(!this.server.isStopped()) {
// Don't interrupt us while we are working
r = compactionRequest.getHRegion();
byte [] midKey = r.compactStore(compactionRequest.getStore());
if (r.getLastCompactInfo() != null) { // compaction aborted?
this.server.getMetrics().addCompaction(r.getLastCompactInfo());
// don't split regions that are blocking
if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) {
byte[] midkey = compactionRequest.getStore().checkSplit();
if (midkey != null) {
split(r, midkey);
continue;
}
}
if (shouldSplitRegion() && midKey != null &&
!this.server.isStopped()) {
split(r, midKey);
}
// now test for compaction
if(!this.server.isStopped()) {
long startTime = EnvironmentEdgeManager.currentTimeMillis();
completed = r.compact(compactionRequest);
long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info(((completed) ? "completed" : "aborted")
+ " compaction: " + compactionRequest + ", duration="
+ StringUtils.formatTimeDiff(now, startTime));
if (completed) { // compaction aborted?
this.server.getMetrics().
addCompaction(now - startTime, compactionRequest.getSize());
}
}
} finally {
@ -98,19 +117,26 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
} catch (InterruptedException ex) {
continue;
} catch (IOException ex) {
LOG.error("Compaction/Split failed for region " +
r.getRegionNameAsString(),
LOG.error("Compaction/Split failed " + compactionRequest,
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
break;
}
} catch (Exception ex) {
LOG.error("Compaction failed" +
(r != null ? (" for region " + r.getRegionNameAsString()) : ""),
ex);
LOG.error("Compaction failed " + compactionRequest, ex);
if (!server.checkFileSystem()) {
break;
}
} finally {
if (compactionRequest != null) {
Store s = compactionRequest.getStore();
s.finishRequest(compactionRequest);
// degenerate case: blocked regions require recursive enqueues
if (s.getCompactPriority() < PRIORITY_USER && completed) {
requestCompaction(r, s, "Recursive enqueue");
}
}
compactionRequest = null;
}
}
compactionQueue.clear();
@ -120,19 +146,19 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
public synchronized void requestCompaction(final HRegion r,
final String why) {
for(Store s : r.getStores().values()) {
requestCompaction(r, s, false, why, s.getCompactPriority());
requestCompaction(r, s, why, NO_PRIORITY);
}
}
public synchronized void requestCompaction(final HRegion r,
final String why, int p) {
requestCompaction(r, false, why, p);
public synchronized void requestCompaction(final HRegion r, final Store s,
final String why) {
requestCompaction(r, s, why, NO_PRIORITY);
}
public synchronized void requestCompaction(final HRegion r,
final boolean force, final String why, int p) {
public synchronized void requestCompaction(final HRegion r, final String why,
int p) {
for(Store s : r.getStores().values()) {
requestCompaction(r, s, force, why, p);
requestCompaction(r, s, why, p);
}
}
@ -140,24 +166,28 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
* @param r HRegion store belongs to
* @param force Whether next compaction should be major
* @param why Why compaction requested -- used in debug messages
* @param priority override the default priority (NO_PRIORITY == decide)
*/
public synchronized void requestCompaction(final HRegion r, final Store s,
final boolean force, final String why, int priority) {
final String why, int priority) {
if (this.server.isStopped()) {
return;
}
// tell the region to major-compact (and don't downgrade it)
if (force) {
s.setForceMajorCompaction(force);
}
CompactionRequest compactionRequest = new CompactionRequest(r, s, priority);
if (compactionQueue.add(compactionRequest) && LOG.isDebugEnabled()) {
LOG.debug("Compaction " + (force? "(major) ": "") +
"requested for region " + r.getRegionNameAsString() +
"/" + r.getRegionInfo().getEncodedName() +
", store " + s +
(why != null && !why.isEmpty()? " because " + why: "") +
"; priority=" + priority + ", compaction queue size=" + compactionQueue.size());
CompactionRequest cr = s.requestCompaction();
if (cr != null) {
if (priority != NO_PRIORITY) {
cr.setPriority(priority);
}
boolean addedToQueue = compactionQueue.add(cr);
if (!addedToQueue) {
LOG.error("Could not add request to compaction queue: " + cr);
s.finishRequest(cr);
} else if (LOG.isDebugEnabled()) {
LOG.debug("Compaction requested: " + cr
+ (why != null && !why.isEmpty() ? "; Because: " + why : "")
+ "; Priority: " + priority + "; Compaction queue size: "
+ compactionQueue.size());
}
}
}

View File

@ -26,10 +26,27 @@ public interface CompactionRequestor {
*/
public void requestCompaction(final HRegion r, final String why);
/**
* @param r Region to compact
* @param s Store within region to compact
* @param why Why compaction was requested -- used in debug messages
*/
public void requestCompaction(final HRegion r, final Store s, final String why);
/**
* @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. <=0 is critical
*/
public void requestCompaction(final HRegion r, final String why, int pri);
/**
* @param r Region to compact
* @param s Store within region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. <=0 is critical
*/
public void requestCompaction(final HRegion r, final Store s,
final String why, int pri);
}

View File

@ -85,6 +85,7 @@ import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
import org.apache.hadoop.hbase.ipc.HBaseRPC;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -101,6 +102,7 @@ import org.apache.hadoop.util.StringUtils;
import org.cliffc.high_scale_lib.Counter;
import com.google.common.base.Preconditions;
import com.google.common.collect.ClassToInstanceMap;
import com.google.common.collect.Lists;
import com.google.common.collect.MutableClassToInstanceMap;
@ -206,8 +208,8 @@ public class HRegion implements HeapSize { // , Writable{
volatile boolean flushing = false;
// Set when a flush has been requested.
volatile boolean flushRequested = false;
// Set while a compaction is running.
volatile boolean compacting = false;
// Number of compactions running.
volatile int compacting = 0;
// Gets set in close. If set, cannot compact or flush again.
volatile boolean writesEnabled = true;
// Set if region is read-only
@ -395,7 +397,7 @@ public class HRegion implements HeapSize { // , Writable{
this.writestate.setReadOnly(this.regionInfo.getTableDesc().isReadOnly());
this.writestate.compacting = false;
this.writestate.compacting = 0;
this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
// Use maximum of log sequenceid or that which was found in stores
// (particularly if no recovered edits, seqid will be -1).
@ -606,12 +608,10 @@ public class HRegion implements HeapSize { // , Writable{
writestate.writesEnabled = false;
wasFlushing = writestate.flushing;
LOG.debug("Closing " + this + ": disabling compactions & flushes");
while (writestate.compacting || writestate.flushing) {
LOG.debug("waiting for" +
(writestate.compacting ? " compaction" : "") +
(writestate.flushing ?
(writestate.compacting ? "," : "") + " cache flush" :
"") + " to complete for region " + this);
while (writestate.compacting > 0 || writestate.flushing) {
LOG.debug("waiting for " + writestate.compacting + " compactions" +
(writestate.flushing ? " & cache flush" : "") +
" to complete for region " + this);
try {
writestate.wait();
} catch (InterruptedException iex) {
@ -734,11 +734,6 @@ public class HRegion implements HeapSize { // , Writable{
return this.fs;
}
/** @return info about the last compaction <time, size> */
public Pair<Long,Long> getLastCompactInfo() {
return this.lastCompactInfo;
}
/** @return the last time the region was flushed */
public long getLastFlushTime() {
return this.lastFlushTime;
@ -794,9 +789,9 @@ public class HRegion implements HeapSize { // , Writable{
return new Path(getRegionDir(), ".tmp");
}
void setForceMajorCompaction(final boolean b) {
void triggerMajorCompaction() {
for (Store h: stores.values()) {
h.setForceMajorCompaction(b);
h.triggerMajorCompaction();
}
}
@ -817,7 +812,9 @@ public class HRegion implements HeapSize { // , Writable{
*/
byte [] compactStores(final boolean majorCompaction)
throws IOException {
this.setForceMajorCompaction(majorCompaction);
if (majorCompaction) {
this.triggerMajorCompaction();
}
return compactStores();
}
@ -826,13 +823,21 @@ public class HRegion implements HeapSize { // , Writable{
* to be split.
*/
public byte[] compactStores() throws IOException {
byte[] splitRow = null;
for(Store s : getStores().values()) {
if(splitRow == null) {
splitRow = compactStore(s);
CompactionRequest cr = s.requestCompaction();
if(cr != null) {
try {
compact(cr);
} finally {
s.finishRequest(cr);
}
}
byte[] splitRow = s.checkSplit();
if (splitRow != null) {
return splitRow;
}
}
return splitRow;
return null;
}
/*
@ -846,93 +851,76 @@ public class HRegion implements HeapSize { // , Writable{
* conflicts with a region split, and that cannot happen because the region
* server does them sequentially and not in parallel.
*
* @return split row if split is needed
* @param cr Compaction details, obtained by requestCompaction()
* @return whether the compaction completed
* @throws IOException e
*/
public byte [] compactStore(Store store) throws IOException {
if (this.closing.get()) {
LOG.debug("Skipping compaction on " + this + " because closing");
return null;
public boolean compact(CompactionRequest cr)
throws IOException {
if (cr == null) {
return false;
}
if (this.closing.get() || this.closed.get()) {
LOG.debug("Skipping compaction on " + this + " because closing/closed");
return false;
}
Preconditions.checkArgument(cr.getHRegion().equals(this));
lock.readLock().lock();
this.lastCompactInfo = null;
byte [] splitRow = null;
MonitoredTask status = TaskMonitor.get().createStatus(
"Compacting stores in " + this);
"Compacting " + cr.getStore() + " in " + this);
try {
if (this.closed.get()) {
LOG.debug("Skipping compaction on " + this + " because closed");
return null;
}
if (this.closed.get()) {
return splitRow;
return false;
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor preCompact hooks");
coprocessorHost.preCompact(false);
}
boolean decr = true;
try {
synchronized (writestate) {
if (!writestate.compacting && writestate.writesEnabled) {
writestate.compacting = true;
if (writestate.writesEnabled) {
++writestate.compacting;
} else {
String msg = "NOT compacting region " + this +
": compacting=" + writestate.compacting + ", writesEnabled=" +
writestate.writesEnabled;
String msg = "NOT compacting region " + this + ". Writes disabled.";
LOG.info(msg);
status.abort(msg);
return splitRow;
decr = false;
return false;
}
}
LOG.info("Starting compaction on region " + this);
long startTime = EnvironmentEdgeManager.currentTimeMillis();
doRegionCompactionPrep();
long lastCompactSize = 0;
boolean completed = false;
try {
status.setStatus("Compacting store " + store);
final Store.StoreSize ss = store.compact();
lastCompactSize += store.getLastCompactSize();
if (ss != null) {
splitRow = ss.getSplitRow();
}
completed = true;
status.setStatus("Compacting store " + cr.getStore());
cr.getStore().compact(cr);
} catch (InterruptedIOException iioe) {
LOG.info("compaction interrupted by user: ", iioe);
} finally {
long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info(((completed) ? "completed" : "aborted")
+ " compaction on region " + this
+ " after " + StringUtils.formatTimeDiff(now, startTime));
if (completed) {
this.lastCompactInfo =
new Pair<Long,Long>((now - startTime) / 1000, lastCompactSize);
status.setStatus("Compaction complete: " +
StringUtils.humanReadableInt(lastCompactSize) + " in " +
(now - startTime) + "ms");
}
String msg = "compaction interrupted by user";
LOG.info(msg, iioe);
status.abort(msg);
return false;
}
} finally {
synchronized (writestate) {
writestate.compacting = false;
writestate.notifyAll();
if (decr) {
synchronized (writestate) {
--writestate.compacting;
if (writestate.compacting <= 0) {
writestate.notifyAll();
}
}
}
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor post-compact hooks");
coprocessorHost.postCompact(splitRow != null);
coprocessorHost.postCompact(false);
}
status.markComplete("Compaction complete");
return true;
} finally {
status.cleanup();
lock.readLock().unlock();
}
if (splitRow != null) {
assert splitPoint == null || Bytes.equals(splitRow, splitPoint);
this.splitPoint = null; // clear the split point (if set)
}
return splitRow;
}
/**
@ -3708,6 +3696,10 @@ public class HRegion implements HeapSize { // , Writable{
}
}
void clearSplit_TESTS_ONLY() {
this.splitRequest = false;
}
/**
* Give the region a chance to prepare before it is split.
*/
@ -3731,9 +3723,9 @@ public class HRegion implements HeapSize { // , Writable{
* store files
* @return true if any store has too many store files
*/
public boolean hasTooManyStoreFiles() {
public boolean needsCompaction() {
for(Store store : stores.values()) {
if(store.hasTooManyStoreFiles()) {
if(store.needsCompaction()) {
return true;
}
}

View File

@ -1046,14 +1046,18 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
@Override
protected void chore() {
for (HRegion r : this.instance.onlineRegions.values()) {
try {
if (r != null && r.isMajorCompaction()) {
// Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.requestCompaction(r, getName()
+ " requests major compaction");
if (r == null)
continue;
for (Store s : r.getStores().values()) {
try {
if (s.isMajorCompaction()) {
// Queue a compaction. Will recognize if major is needed.
this.instance.compactSplitThread.requestCompaction(r, s,
getName() + " requests major compaction");
}
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
}
} catch (IOException e) {
LOG.warn("Failed major compaction check on " + r, e);
}
}
}
@ -1346,10 +1350,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
final boolean daughter)
throws KeeperException, IOException {
// Do checks to see if we need to compact (references or too many files)
if (r.hasReferences() || r.hasTooManyStoreFiles()) {
getCompactionRequester().requestCompaction(r,
r.hasReferences()? "Region has references on open" :
"Region has too many store files");
for (Store s : r.getStores().values()) {
if (s.hasReferences() || s.needsCompaction()) {
getCompactionRequester().requestCompaction(r, s, "Opening Region");
}
}
// Add to online regions if all above was successful.
@ -2346,7 +2350,10 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
public void compactRegion(HRegionInfo regionInfo, boolean major)
throws NotServingRegionException, IOException {
HRegion region = getRegion(regionInfo.getRegionName());
compactSplitThread.requestCompaction(region, major, "User-triggered "
if (major) {
region.triggerMajorCompaction();
}
compactSplitThread.requestCompaction(region, "User-triggered "
+ (major ? "major " : "") + "compaction",
CompactSplitThread.PRIORITY_USER);
}

View File

@ -1,299 +0,0 @@
/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Pair;
/**
* This class delegates to the BlockingQueue but wraps all Stores in
* compaction requests that hold the priority and the date requested.
*
* Implementation Note: With an elevation time of -1 there is the potential for
* starvation of the lower priority compaction requests as long as there is a
* constant stream of high priority requests.
*/
public class PriorityCompactionQueue implements BlockingQueue<CompactionRequest> {
static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
/** The actual blocking queue we delegate to */
protected final BlockingQueue<CompactionRequest> queue =
new PriorityBlockingQueue<CompactionRequest>();
/** Hash map of the Stores contained within the Compaction Queue */
private final HashMap<Pair<HRegion, Store>, CompactionRequest> storesInQueue =
new HashMap<Pair<HRegion, Store>, CompactionRequest>();
/** Creates a new PriorityCompactionQueue with no priority elevation time */
public PriorityCompactionQueue() {
LOG.debug("Create PriorityCompactionQueue");
}
protected Pair<HRegion, Store> toPair(CompactionRequest cr) {
return Pair.newPair(cr.getHRegion(), cr.getStore());
}
/** If the store is not already in the queue it will add it and return a
* new compaction request object. If it is already present in the queue
* then it will return null.
* @param p If null it will use the default priority
* @return returns a compaction request if it isn't already in the queue
*/
protected CompactionRequest addToCompactionQueue(CompactionRequest newRequest) {
CompactionRequest queuedRequest = null;
synchronized (storesInQueue) {
queuedRequest = storesInQueue.get(toPair(newRequest));
if (queuedRequest == null ||
newRequest.getPriority() < queuedRequest.getPriority()) {
String reason = "";
if (queuedRequest != null) {
if (newRequest.getPriority() < queuedRequest.getPriority()) {
reason = "Reason : priority changed from " +
queuedRequest.getPriority() + " to " +
newRequest.getPriority() + ". ";
}
}
LOG.debug("Inserting store in queue. " + reason + newRequest);
storesInQueue.put(toPair(newRequest), newRequest);
} else {
LOG.debug("Store already in queue, skipping. Queued: " + queuedRequest +
", requested: " + newRequest);
newRequest = null; // It is already present so don't add it
}
}
if (newRequest != null && queuedRequest != null) {
// Remove the lower priority request
queue.remove(queuedRequest);
}
return newRequest;
}
/** Removes the request from the stores in queue
* @param remove
*/
protected CompactionRequest removeFromQueue(CompactionRequest c) {
if (c == null) return null;
synchronized (storesInQueue) {
CompactionRequest cr = storesInQueue.remove(toPair(c));
if (cr != null && !cr.equals(c))
{
//Because we don't synchronize across both this.regionsInQueue and this.queue
//a rare race condition exists where a higher priority compaction request replaces
//the lower priority request in this.regionsInQueue but the lower priority request
//is taken off this.queue before the higher can be added to this.queue.
//So if we didn't remove what we were expecting we put it back on.
storesInQueue.put(toPair(cr), cr);
}
if (cr == null) {
LOG.warn("Removed a compaction request it couldn't find in storesInQueue: " +
"region = " + c.getHRegion() + ", store = " + c.getStore());
}
return cr;
}
}
@Override
public boolean add(CompactionRequest e) {
CompactionRequest request = this.addToCompactionQueue(e);
if (request != null) {
boolean result = queue.add(request);
return result;
} else {
return false;
}
}
@Override
public boolean offer(CompactionRequest e) {
CompactionRequest request = this.addToCompactionQueue(e);
return (request != null)? queue.offer(request): false;
}
@Override
public void put(CompactionRequest e) throws InterruptedException {
CompactionRequest request = this.addToCompactionQueue(e);
if (request != null) {
queue.put(request);
}
}
@Override
public boolean offer(CompactionRequest e, long timeout, TimeUnit unit)
throws InterruptedException {
CompactionRequest request = this.addToCompactionQueue(e);
return (request != null)? queue.offer(request, timeout, unit): false;
}
@Override
public CompactionRequest take() throws InterruptedException {
CompactionRequest cr = queue.take();
if (cr != null) {
removeFromQueue(cr);
return cr;
}
return null;
}
@Override
public CompactionRequest poll(long timeout, TimeUnit unit) throws InterruptedException {
CompactionRequest cr = queue.poll(timeout, unit);
if (cr != null) {
removeFromQueue(cr);
return cr;
}
return null;
}
@Override
public boolean remove(Object o) {
if (o instanceof CompactionRequest) {
CompactionRequest cr = removeFromQueue((CompactionRequest) o);
if (cr != null) {
return queue.remove(cr);
}
}
return false;
}
@Override
public CompactionRequest remove() {
CompactionRequest cr = queue.remove();
if (cr != null) {
removeFromQueue(cr);
return cr;
}
return null;
}
@Override
public CompactionRequest poll() {
CompactionRequest cr = queue.poll();
if (cr != null) {
removeFromQueue(cr);
return cr;
}
return null;
}
@Override
public int remainingCapacity() {
return queue.remainingCapacity();
}
@Override
public boolean contains(Object r) {
if (r instanceof CompactionRequest) {
synchronized (storesInQueue) {
return storesInQueue.containsKey(toPair((CompactionRequest) r));
}
} else if (r instanceof CompactionRequest) {
return queue.contains(r);
}
return false;
}
@Override
public CompactionRequest element() {
CompactionRequest cr = queue.element();
return (cr != null)? cr: null;
}
@Override
public CompactionRequest peek() {
CompactionRequest cr = queue.peek();
return (cr != null)? cr: null;
}
@Override
public int size() {
return queue.size();
}
@Override
public boolean isEmpty() {
return queue.isEmpty();
}
@Override
public void clear() {
storesInQueue.clear();
queue.clear();
}
// Unimplemented methods, collection methods
@Override
public Iterator<CompactionRequest> iterator() {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public Object[] toArray() {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public <T> T[] toArray(T[] a) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public boolean containsAll(Collection<?> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public boolean addAll(Collection<? extends CompactionRequest> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public boolean removeAll(Collection<?> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public boolean retainAll(Collection<?> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public int drainTo(Collection<? super CompactionRequest> c) {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public int drainTo(Collection<? super CompactionRequest> c, int maxElements) {
throw new UnsupportedOperationException("Not supported.");
}
}

View File

@ -24,8 +24,10 @@ import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -48,16 +50,20 @@ import org.apache.hadoop.hbase.io.hfile.BlockCache;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
/**
* A Store holds a column family in a Region. Its a memstore and a set of zero
@ -102,7 +108,7 @@ public class Store implements HeapSize {
// With float, java will downcast your long to float for comparisons (bad)
private double compactRatio;
private long lastCompactSize = 0;
private volatile boolean forceMajor = false;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
static int closeCheckInterval = 0;
private final long desiredMaxFileSize;
@ -119,12 +125,12 @@ public class Store implements HeapSize {
*/
private ImmutableList<StoreFile> storefiles = null;
List<StoreFile> filesCompacting = Lists.newArrayList();
// All access must be synchronized.
private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
new CopyOnWriteArraySet<ChangedReadersObserver>();
private final Object compactLock = new Object();
private final int blocksize;
private final boolean blockcache;
/** Compression algorithm for flush files and minor compaction */
@ -569,7 +575,7 @@ public class Store implements HeapSize {
// Tell listeners of the change in readers.
notifyChangedReadersObservers();
return this.storefiles.size() >= this.minFilesToCompact;
return needsCompaction();
} finally {
this.lock.writeLock().unlock();
}
@ -620,99 +626,108 @@ public class Store implements HeapSize {
* <p>We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
* @return row to split around if a split is needed, null otherwise
* @param CompactionRequest
* compaction details obtained from requestCompaction()
* @throws IOException
*/
StoreSize compact() throws IOException {
boolean forceSplit = this.region.shouldForceSplit();
synchronized (compactLock) {
this.lastCompactSize = 0; // reset first in case compaction is aborted
void compact(CompactionRequest cr) throws IOException {
if (cr == null || cr.getFiles().isEmpty()) {
return;
}
Preconditions.checkArgument(cr.getStore().toString()
.equals(this.toString()));
// sanity checks
for (StoreFile sf : this.storefiles) {
if (sf.getPath() == null || sf.getReader() == null) {
boolean np = sf.getPath() == null;
LOG.debug("StoreFile " + sf + " has null " + (np ? "Path":"Reader"));
return null;
}
}
List<StoreFile> filesToCompact = cr.getFiles();
// if the user wants to force a split, skip compaction unless necessary
boolean references = hasReferences(this.storefiles);
if (forceSplit && !this.forceMajor && !references) {
return checkSplit(forceSplit);
}
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging
Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
}
Collection<StoreFile> filesToCompact
= compactSelection(this.storefiles, this.forceMajor);
// Max-sequenceID is the last key in the files we're compacting
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
// empty == do not compact
if (filesToCompact.isEmpty()) {
// but do see if we need to split before returning
return checkSplit(forceSplit);
}
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+ this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
// sum size of all files included in compaction
long totalSize = 0;
for (StoreFile sf : filesToCompact) {
totalSize += sf.getReader().length();
}
this.lastCompactSize = totalSize;
// major compaction iff all StoreFiles are included
boolean majorcompaction
= (filesToCompact.size() == this.storefiles.size());
if (majorcompaction) {
this.forceMajor = false;
}
// Max-sequenceID is the last key in the files we're compacting
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
// Ready to go. Have list of files to compact.
LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in cf=" +
this.storeNameStr +
(hasReferences(filesToCompact)? ", hasReferences=true,": " ") + " into " +
region.getTmpDir() + ", seqid=" + maxId +
", totalSize=" + StringUtils.humanReadableInt(totalSize));
StoreFile.Writer writer
= compactStore(filesToCompact, majorcompaction, maxId);
StoreFile sf = null;
try {
StoreFile.Writer writer = compactStore(filesToCompact, cr.isMajor(),
maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
if (LOG.isInfoEnabled()) {
LOG.info("Completed" + (majorcompaction? " major ": " ") +
"compaction of " + filesToCompact.size() +
" file(s), new file=" + (sf == null? "none": sf.toString()) +
", size=" + (sf == null? "none": StringUtils.humanReadableInt(sf.getReader().length())) +
"; total size for store is " + StringUtils.humanReadableInt(storeSize));
sf = completeCompaction(filesToCompact, writer);
} finally {
synchronized (filesCompacting) {
filesCompacting.removeAll(filesToCompact);
}
}
return checkSplit(forceSplit);
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+ filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ "; new storefile name=" + (sf == null ? "none" : sf.toString())
+ ", size=" + (sf == null ? "none" :
StringUtils.humanReadableInt(sf.getReader().length()))
+ "; total size for store is "
+ StringUtils.humanReadableInt(storeSize));
}
/*
* Compact the most recent N files. Essentially a hook for testing.
*/
protected void compactRecent(int N) throws IOException {
synchronized(compactLock) {
List<StoreFile> filesToCompact = this.storefiles;
int count = filesToCompact.size();
if (N > count) {
throw new RuntimeException("Not enough files");
List<StoreFile> filesToCompact;
long maxId;
boolean isMajor;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
filesToCompact = Lists.newArrayList(storefiles);
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = filesToCompact.indexOf(last);
Preconditions.checkArgument(idx != -1);
filesToCompact = filesToCompact.subList(idx+1, filesToCompact.size());
}
int count = filesToCompact.size();
if (N > count) {
throw new RuntimeException("Not enough files");
}
filesToCompact = filesToCompact.subList(count - N, count);
maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
isMajor = (filesToCompact.size() == storefiles.size());
filesCompacting.addAll(filesToCompact);
Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
}
} finally {
this.lock.readLock().unlock();
}
filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(count-N, count));
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
boolean majorcompaction = (N == count);
// Ready to go. Have list of files to compact.
StoreFile.Writer writer
= compactStore(filesToCompact, majorcompaction, maxId);
try {
// Ready to go. Have list of files to compact.
StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, writer);
} finally {
synchronized (filesCompacting) {
filesCompacting.removeAll(filesToCompact);
}
}
}
boolean hasReferences() {
return hasReferences(this.storefiles);
}
/*
* @param files
* @return True if any of the files in <code>files</code> are References.
@ -835,6 +850,69 @@ public class Store implements HeapSize {
return ret;
}
public CompactionRequest requestCompaction() {
// don't even select for compaction if writes are disabled
if (!this.region.areWritesEnabled()) {
return null;
}
CompactionRequest ret = null;
this.lock.readLock().lock();
try {
synchronized (filesCompacting) {
// candidates = all storefiles not already in compaction queue
List<StoreFile> candidates = Lists.newArrayList(storefiles);
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
// compacting. this allows us to preserve contiguity (HBASE-2856)
StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
int idx = candidates.indexOf(last);
Preconditions.checkArgument(idx != -1);
candidates = candidates.subList(idx + 1, candidates.size());
}
List<StoreFile> filesToCompact = compactSelection(candidates);
// no files to compact
if (filesToCompact.isEmpty()) {
return null;
}
// basic sanity check: do not try to compact the same StoreFile twice.
if (!Collections.disjoint(filesCompacting, filesToCompact)) {
// TODO: change this from an IAE to LOG.error after sufficient testing
Preconditions.checkArgument(false, "%s overlaps with %s",
filesToCompact, filesCompacting);
}
filesCompacting.addAll(filesToCompact);
Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
// major compaction iff all StoreFiles are included
boolean isMajor = (filesToCompact.size() == this.storefiles.size());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
this.majorCompactionTime = getNextMajorCompactTime();
}
// everything went better than expected. create a compaction request
int pri = getCompactPriority();
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
}
} catch (IOException ex) {
LOG.error("Compaction Request failed for region " + region + ", store "
+ this, RemoteExceptionHandler.checkIOException(ex));
} finally {
this.lock.readLock().unlock();
}
return ret;
}
public void finishRequest(CompactionRequest cr) {
synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles());
}
}
/**
* Algorithm to choose which files to compact
*
@ -851,12 +929,13 @@ public class Store implements HeapSize {
* max files to compact at once (avoids OOM)
*
* @param candidates candidate files, ordered from oldest to newest
* @param majorcompaction whether to force a major compaction
* @return subset copy of candidate list that meets compaction criteria
* @throws IOException
*/
List<StoreFile> compactSelection(List<StoreFile> candidates,
boolean forcemajor) throws IOException {
List<StoreFile> compactSelection(List<StoreFile> candidates)
throws IOException {
// ASSUMPTION!!! filesCompacting is locked when calling this function
/* normal skew:
*
* older ----> newer
@ -870,6 +949,7 @@ public class Store implements HeapSize {
*/
List<StoreFile> filesToCompact = new ArrayList<StoreFile>(candidates);
boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
if (!forcemajor) {
// do not compact old files above a configurable threshold
// save all references. we MUST compact them
@ -888,9 +968,6 @@ public class Store implements HeapSize {
// major compact on user action or age (caveat: we have too many files)
boolean majorcompaction = (forcemajor || isMajorCompaction(filesToCompact))
&& filesToCompact.size() < this.maxFilesToCompact;
if (majorcompaction) {
this.majorCompactionTime = getNextMajorCompactTime();
}
if (!majorcompaction && !hasReferences(filesToCompact)) {
// we're doing a minor compaction, let's see what files are applicable
@ -1054,9 +1131,6 @@ public class Store implements HeapSize {
}
/*
* It's assumed that the compactLock will be acquired prior to calling this
* method! Otherwise, it is not thread-safe!
*
* <p>It works by processing a compaction that's been written to disk.
*
* <p>It is usually invoked at the end of a compaction, but might also be
@ -1097,18 +1171,13 @@ public class Store implements HeapSize {
this.lock.writeLock().lock();
try {
try {
// 2. Unloading
// 3. Loading the new TreeMap.
// Change this.storefiles so it reflects new state but do not
// delete old store files until we have sent out notification of
// change in case old files are still being accessed by outstanding
// scanners.
ArrayList<StoreFile> newStoreFiles = new ArrayList<StoreFile>();
for (StoreFile sf : storefiles) {
if (!compactedFiles.contains(sf)) {
newStoreFiles.add(sf);
}
}
ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
newStoreFiles.removeAll(compactedFiles);
filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
// If a StoreFile result, move it into place. May be null.
if (result != null) {
@ -1318,13 +1387,13 @@ public class Store implements HeapSize {
}
/**
* Determines if HStore can be split
* @param force Whether to force a split or not.
* @return a StoreSize if store can be split, null otherwise.
* Determines if Store should be split
* @return byte[] if store should be split, null otherwise.
*/
StoreSize checkSplit(final boolean force) {
byte[] checkSplit() {
this.lock.readLock().lock();
try {
boolean force = this.region.shouldForceSplit();
// sanity checks
if (this.storefiles.isEmpty()) {
return null;
@ -1369,7 +1438,7 @@ public class Store implements HeapSize {
}
// if the user explicit set a split point, use that
if (this.region.getSplitPoint() != null) {
return new StoreSize(maxSize, this.region.getSplitPoint());
return this.region.getSplitPoint();
}
StoreFile.Reader r = largestSf.getReader();
if (r == null) {
@ -1396,7 +1465,7 @@ public class Store implements HeapSize {
}
return null;
}
return new StoreSize(maxSize, mk.getRow());
return mk.getRow();
}
} catch(IOException e) {
LOG.warn("Failed getting store size for " + this.storeNameStr, e);
@ -1416,8 +1485,8 @@ public class Store implements HeapSize {
return storeSize;
}
void setForceMajorCompaction(final boolean b) {
this.forceMajor = b;
void triggerMajorCompaction() {
this.forceMajor = true;
}
boolean getForceMajorCompaction() {
@ -1493,28 +1562,6 @@ public class Store implements HeapSize {
return this.blockingStoreFileCount - this.storefiles.size();
}
/**
* Datastructure that holds size and row to split a file around.
* TODO: Take a KeyValue rather than row.
*/
static class StoreSize {
private final long size;
private final byte [] row;
StoreSize(long size, byte [] row) {
this.size = size;
this.row = row;
}
/* @return the size */
long getSize() {
return size;
}
byte [] getSplitRow() {
return this.row;
}
}
HRegion getHRegion() {
return this.region;
}
@ -1624,8 +1671,8 @@ public class Store implements HeapSize {
* @return true if number of store files is greater than
* the number defined in minFilesToCompact
*/
public boolean hasTooManyStoreFiles() {
return this.storefiles.size() > this.minFilesToCompact;
public boolean needsCompaction() {
return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
}
public static final long FIXED_OVERHEAD = ClassSize.align(

View File

@ -20,11 +20,13 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.Date;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
/**
* This class represents a compaction request and holds the region, priority,
@ -34,30 +36,37 @@ import org.apache.hadoop.hbase.regionserver.Store;
static final Log LOG = LogFactory.getLog(CompactionRequest.class);
private final HRegion r;
private final Store s;
private final List<StoreFile> files;
private final long totalSize;
private final boolean isMajor;
private int p;
private final Date date;
public CompactionRequest(HRegion r, Store s) {
this(r, s, s.getCompactPriority());
this(r, s, null, false, s.getCompactPriority());
}
public CompactionRequest(HRegion r, Store s, int p) {
this(r, s, p, null);
this(r, s, null, false, p);
}
public CompactionRequest(HRegion r, Store s, int p, Date d) {
public CompactionRequest(HRegion r, Store s,
List<StoreFile> files, boolean isMajor, int p) {
if (r == null) {
throw new NullPointerException("HRegion cannot be null");
}
if (d == null) {
d = new Date();
}
this.r = r;
this.s = s;
this.files = files;
long sz = 0;
for (StoreFile sf : files) {
sz += sf.getReader().length();
}
this.totalSize = sz;
this.isMajor = isMajor;
this.p = p;
this.date = d;
this.date = new Date();
}
/**
@ -89,8 +98,8 @@ import org.apache.hadoop.hbase.regionserver.Store;
return compareVal;
}
//break the tie arbitrarily
return -1;
// break the tie based on hash code
return this.hashCode() - request.hashCode();
}
/** Gets the HRegion for the request */
@ -103,6 +112,20 @@ import org.apache.hadoop.hbase.regionserver.Store;
return s;
}
/** Gets the StoreFiles for the request */
public List<StoreFile> getFiles() {
return files;
}
/** Gets the total size of all StoreFiles in compaction */
public long getSize() {
return totalSize;
}
public boolean isMajor() {
return this.isMajor;
}
/** Gets the priority for the request */
public int getPriority() {
return p;
@ -115,8 +138,8 @@ import org.apache.hadoop.hbase.regionserver.Store;
public String toString() {
return "regionName=" + r.getRegionNameAsString() +
((s == null) ? ""
: "storeName = " + new String(s.getFamily().getName())) +
", storeName=" + new String(s.getFamily().getName()) +
", fileCount=" + files.size() +
", priority=" + p + ", date=" + date;
}
}

View File

@ -314,11 +314,12 @@ public class RegionServerMetrics implements Updater {
}
/**
* @param compact history in <time, size>
* @param time time that compaction took
* @param size bytesize of storefiles in the compaction
*/
public synchronized void addCompaction(final Pair<Long,Long> compact) {
this.compactionTime.inc(compact.getFirst());
this.compactionSize.inc(compact.getSecond());
public synchronized void addCompaction(long time, long size) {
this.compactionTime.inc(time);
this.compactionSize.inc(size);
}
/**

View File

@ -158,7 +158,9 @@ public class TestCompactSelection extends TestCase {
void compactEquals(List<StoreFile> candidates, boolean forcemajor,
long ... expected)
throws IOException {
List<StoreFile> actual = store.compactSelection(candidates, forcemajor);
store.forceMajor = forcemajor;
List<StoreFile> actual = store.compactSelection(candidates);
store.forceMajor = false;
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
}
@ -187,7 +189,7 @@ public class TestCompactSelection extends TestCase {
*/
// don't exceed max file compact threshold
assertEquals(maxFiles,
store.compactSelection(sfCreate(7,6,5,4,3,2,1), false).size());
store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
/* MAJOR COMPACTION */
// if a major compaction has been forced, then compact everything
@ -197,8 +199,11 @@ public class TestCompactSelection extends TestCase {
// even if one of those files is too big
compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12);
// don't exceed max file compact threshold, even with major compaction
store.forceMajor = true;
assertEquals(maxFiles,
store.compactSelection(sfCreate(7,6,5,4,3,2,1), true).size());
store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
store.forceMajor = false;
// if we exceed maxCompactSize, downgrade to minor
// if not, it creates a 'snowball effect' when files >> maxCompactSize:
// the last file in compaction is the aggregate of all previous compactions
@ -217,7 +222,7 @@ public class TestCompactSelection extends TestCase {
compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
// reference files should obey max file compact to avoid OOM
assertEquals(maxFiles,
store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1), false).size());
store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).size());
// empty case
compactEquals(new ArrayList<StoreFile>() /* empty */);

View File

@ -1,224 +0,0 @@
/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
/**
* Test class for the priority compaction queue
*/
public class TestPriorityCompactionQueue {
static final Log LOG = LogFactory.getLog(TestPriorityCompactionQueue.class);
@Before
public void setUp() {
}
@After
public void tearDown() {
}
class DummyHRegion extends HRegion {
String name;
DummyHRegion(String name) {
super();
this.name = name;
}
public int hashCode() {
return name.hashCode();
}
public boolean equals(DummyHRegion r) {
return name.equals(r.name);
}
public String toString() {
return "[DummyHRegion " + name + "]";
}
public byte[] getRegionName() {
return Bytes.toBytes(name);
}
public String getRegionNameAsString() {
return name;
}
}
protected void getAndCheckRegion(PriorityCompactionQueue pq,
HRegion checkRegion) {
HRegion r = pq.remove().getHRegion();
if (r != checkRegion) {
Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r
.equals(checkRegion));
}
}
protected void addRegion(PriorityCompactionQueue pq, HRegion r, int p) {
pq.add(new CompactionRequest(r, null, p));
try {
// Sleep 1 millisecond so 2 things are not put in the queue within the
// same millisecond. The queue breaks ties arbitrarily between two
// requests inserted at the same time. We want the ordering to
// be consistent for our unit test.
Thread.sleep(1);
} catch (InterruptedException ex) {
// continue
}
}
// ////////////////////////////////////////////////////////////////////////////
// tests
// ////////////////////////////////////////////////////////////////////////////
/** tests general functionality of the compaction queue */
@Test public void testPriorityQueue() throws InterruptedException {
PriorityCompactionQueue pq = new PriorityCompactionQueue();
HRegion r1 = new DummyHRegion("r1");
HRegion r2 = new DummyHRegion("r2");
HRegion r3 = new DummyHRegion("r3");
HRegion r4 = new DummyHRegion("r4");
HRegion r5 = new DummyHRegion("r5");
// test 1
// check fifo w/priority
addRegion(pq, r1, 0);
addRegion(pq, r2, 0);
addRegion(pq, r3, 0);
addRegion(pq, r4, 0);
addRegion(pq, r5, 0);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r4);
getAndCheckRegion(pq, r5);
// test 2
// check fifo w/mixed priority
addRegion(pq, r1, 0);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, 0);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, 0);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r5);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r4);
// test 3
// check fifo w/mixed priority
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, 0);
getAndCheckRegion(pq, r5);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r4);
// test 4
// check fifo w/mixed priority elevation time
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, 0);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
Thread.sleep(1000);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, 0);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r5);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r4);
// reset the priority compaction queue back to a normal queue
pq = new PriorityCompactionQueue();
// test 5
// test that lower priority are removed from the queue when a high priority
// is added
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, 0);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r4);
getAndCheckRegion(pq, r5);
Assert.assertTrue("Queue should be empty.", pq.size() == 0);
// test 6
// don't add the same region more than once
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r3, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r4, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r5, CompactSplitThread.PRIORITY_USER);
getAndCheckRegion(pq, r1);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r4);
getAndCheckRegion(pq, r5);
Assert.assertTrue("Queue should be empty.", pq.size() == 0);
// test 7
// we can handle negative priorities
addRegion(pq, r1, CompactSplitThread.PRIORITY_USER);
addRegion(pq, r2, -1);
addRegion(pq, r3, 0);
addRegion(pq, r4, -2);
getAndCheckRegion(pq, r4);
getAndCheckRegion(pq, r2);
getAndCheckRegion(pq, r3);
getAndCheckRegion(pq, r1);
Assert.assertTrue("Queue should be empty.", pq.size() == 0);
}
}

View File

@ -156,7 +156,7 @@ public class TestStore extends TestCase {
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
// after compact; check the lowest time stamp
store.compact();
store.compact(store.requestCompaction());
lowestTimeStampFromStore = Store.getLowestTimestamp(store.getStorefiles());
lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
@ -688,7 +688,9 @@ public class TestStore extends TestCase {
*/
public void testSplitWithEmptyColFam() throws IOException {
init(this.getName());
assertNull(store.checkSplit(false));
assertNull(store.checkSplit(true));
assertNull(store.checkSplit());
store.getHRegion().forceSplit(null);
assertNull(store.checkSplit());
store.getHRegion().clearSplit_TESTS_ONLY();
}
}