HBASE-2646 Compaction requests should be prioritized to prevent blocking
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1002019 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5affba7171
commit
f4ac670a42
|
@ -947,6 +947,8 @@ Release 0.21.0 - Unreleased
|
||||||
(Andy Chen via Stack)
|
(Andy Chen via Stack)
|
||||||
HBASE-3030 The return code of many filesystem operations are not checked
|
HBASE-3030 The return code of many filesystem operations are not checked
|
||||||
(dhruba borthakur via Stack)
|
(dhruba borthakur via Stack)
|
||||||
|
HBASE-2646 Compaction requests should be prioritized to prevent blocking
|
||||||
|
(Jeff Whiting via Stack)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
HBASE-1961 HBase EC2 scripts
|
HBASE-1961 HBase EC2 scripts
|
||||||
|
|
|
@ -20,9 +20,6 @@
|
||||||
package org.apache.hadoop.hbase.regionserver;
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.concurrent.BlockingQueue;
|
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
@ -43,10 +40,36 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
||||||
private final HRegionServer server;
|
private final HRegionServer server;
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
|
||||||
private final BlockingQueue<HRegion> compactionQueue =
|
private final PriorityCompactionQueue compactionQueue =
|
||||||
new LinkedBlockingQueue<HRegion>();
|
new PriorityCompactionQueue();
|
||||||
|
|
||||||
private final HashSet<HRegion> regionsInQueue = new HashSet<HRegion>();
|
/** The priorities for a compaction request. */
|
||||||
|
public enum Priority implements Comparable<Priority> {
|
||||||
|
//NOTE: All priorities should be numbered consecutively starting with 1.
|
||||||
|
//The highest priority should be 1 followed by all lower priorities.
|
||||||
|
//Priorities can be changed at anytime without requiring any changes to the
|
||||||
|
//queue.
|
||||||
|
|
||||||
|
/** HIGH_BLOCKING should only be used when an operation is blocked until a
|
||||||
|
* compact / split is done (e.g. a MemStore can't flush because it has
|
||||||
|
* "too many store files" and is blocking until a compact / split is done)
|
||||||
|
*/
|
||||||
|
HIGH_BLOCKING(1),
|
||||||
|
/** A normal compaction / split request */
|
||||||
|
NORMAL(2),
|
||||||
|
/** A low compaction / split request -- not currently used */
|
||||||
|
LOW(3);
|
||||||
|
|
||||||
|
int value;
|
||||||
|
|
||||||
|
Priority(int value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
int getInt() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Splitting should not take place if the total number of regions exceed this.
|
* Splitting should not take place if the total number of regions exceed this.
|
||||||
|
@ -74,9 +97,6 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
||||||
try {
|
try {
|
||||||
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
|
r = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
|
||||||
if (r != null && !this.server.isStopped()) {
|
if (r != null && !this.server.isStopped()) {
|
||||||
synchronized (regionsInQueue) {
|
|
||||||
regionsInQueue.remove(r);
|
|
||||||
}
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
try {
|
try {
|
||||||
// Don't interrupt us while we are working
|
// Don't interrupt us while we are working
|
||||||
|
@ -107,14 +127,23 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
regionsInQueue.clear();
|
|
||||||
compactionQueue.clear();
|
compactionQueue.clear();
|
||||||
LOG.info(getName() + " exiting");
|
LOG.info(getName() + " exiting");
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void requestCompaction(final HRegion r,
|
public synchronized void requestCompaction(final HRegion r,
|
||||||
final String why) {
|
final String why) {
|
||||||
requestCompaction(r, false, why);
|
requestCompaction(r, false, why, Priority.NORMAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void requestCompaction(final HRegion r,
|
||||||
|
final String why, Priority p) {
|
||||||
|
requestCompaction(r, false, why, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void requestCompaction(final HRegion r,
|
||||||
|
final boolean force, final String why) {
|
||||||
|
requestCompaction(r, force, why, Priority.NORMAL);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -123,7 +152,7 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
||||||
* @param why Why compaction requested -- used in debug messages
|
* @param why Why compaction requested -- used in debug messages
|
||||||
*/
|
*/
|
||||||
public synchronized void requestCompaction(final HRegion r,
|
public synchronized void requestCompaction(final HRegion r,
|
||||||
final boolean force, final String why) {
|
final boolean force, final String why, Priority priority) {
|
||||||
if (this.server.isStopped()) {
|
if (this.server.isStopped()) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -131,14 +160,10 @@ public class CompactSplitThread extends Thread implements CompactionRequestor {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Compaction " + (force? "(major) ": "") +
|
LOG.debug("Compaction " + (force? "(major) ": "") +
|
||||||
"requested for region " + r.getRegionNameAsString() +
|
"requested for region " + r.getRegionNameAsString() +
|
||||||
(why != null && !why.isEmpty()? " because: " + why: ""));
|
(why != null && !why.isEmpty()? " because: " + why: "") +
|
||||||
}
|
"; Priority: " + priority + "; Compaction queue size: " + compactionQueue.size());
|
||||||
synchronized (regionsInQueue) {
|
|
||||||
if (!regionsInQueue.contains(r)) {
|
|
||||||
compactionQueue.add(r);
|
|
||||||
regionsInQueue.add(r);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
compactionQueue.add(r, priority);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void split(final HRegion parent, final byte [] midKey)
|
private void split(final HRegion parent, final byte [] midKey)
|
||||||
|
|
|
@ -212,7 +212,8 @@ class MemStoreFlusher extends Thread implements FlushRequester {
|
||||||
LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
|
LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
|
||||||
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
|
"store files; delaying flush up to " + this.blockingWaitTime + "ms");
|
||||||
}
|
}
|
||||||
this.server.compactSplitThread.requestCompaction(region, getName());
|
this.server.compactSplitThread.requestCompaction(region, getName(),
|
||||||
|
CompactSplitThread.Priority.HIGH_BLOCKING);
|
||||||
// Put back on the queue. Have it come back out of the queue
|
// Put back on the queue. Have it come back out of the queue
|
||||||
// after a delay of this.blockingWaitTime / 100 ms.
|
// after a delay of this.blockingWaitTime / 100 ms.
|
||||||
this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
|
this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));
|
||||||
|
|
|
@ -0,0 +1,375 @@
|
||||||
|
/**
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.PriorityBlockingQueue;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class delegates to the BlockingQueue but wraps all HRegions in
|
||||||
|
* compaction requests that hold the priority and the date requested.
|
||||||
|
*
|
||||||
|
* Implementation Note: With an elevation time of -1 there is the potential for
|
||||||
|
* starvation of the lower priority compaction requests as long as there is a
|
||||||
|
* constant stream of high priority requests.
|
||||||
|
*/
|
||||||
|
public class PriorityCompactionQueue implements BlockingQueue<HRegion> {
|
||||||
|
static final Log LOG = LogFactory.getLog(PriorityCompactionQueue.class);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class represents a compaction request and holds the region, priority,
|
||||||
|
* and time submitted.
|
||||||
|
*/
|
||||||
|
private class CompactionRequest implements Comparable<CompactionRequest> {
|
||||||
|
private final HRegion r;
|
||||||
|
private final Priority p;
|
||||||
|
private final Date date;
|
||||||
|
|
||||||
|
public CompactionRequest(HRegion r, Priority p) {
|
||||||
|
this(r, p, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public CompactionRequest(HRegion r, Priority p, Date d) {
|
||||||
|
if (r == null) {
|
||||||
|
throw new NullPointerException("HRegion cannot be null");
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p == null) {
|
||||||
|
p = Priority.NORMAL; //the default priority
|
||||||
|
}
|
||||||
|
|
||||||
|
if (d == null) {
|
||||||
|
d = new Date();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.r = r;
|
||||||
|
this.p = p;
|
||||||
|
this.date = d;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This function will define where in the priority queue the request will
|
||||||
|
* end up. Those with the highest priorities will be first. When the
|
||||||
|
* priorities are the same it will It will first compare priority then date
|
||||||
|
* to maintain a FIFO functionality.
|
||||||
|
*
|
||||||
|
* <p>Note: The date is only accurate to the millisecond which means it is
|
||||||
|
* possible that two requests were inserted into the queue within a
|
||||||
|
* millisecond. When that is the case this function will break the tie
|
||||||
|
* arbitrarily.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int compareTo(CompactionRequest request) {
|
||||||
|
//NOTE: The head of the priority queue is the least element
|
||||||
|
if (this.equals(request)) {
|
||||||
|
return 0; //they are the same request
|
||||||
|
}
|
||||||
|
int compareVal;
|
||||||
|
|
||||||
|
compareVal = p.compareTo(request.p); //compare priority
|
||||||
|
if (compareVal != 0) {
|
||||||
|
return compareVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
compareVal = date.compareTo(request.date);
|
||||||
|
if (compareVal != 0) {
|
||||||
|
return compareVal;
|
||||||
|
}
|
||||||
|
|
||||||
|
//break the tie arbitrarily
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Gets the HRegion for the request */
|
||||||
|
HRegion getHRegion() {
|
||||||
|
return r;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Gets the priority for the request */
|
||||||
|
Priority getPriority() {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return "regionName=" + r.getRegionNameAsString() +
|
||||||
|
", priority=" + p + ", date=" + date;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/** The actual blocking queue we delegate to */
|
||||||
|
protected final BlockingQueue<CompactionRequest> queue =
|
||||||
|
new PriorityBlockingQueue<CompactionRequest>();
|
||||||
|
|
||||||
|
/** Hash map of the HRegions contained within the Compaction Queue */
|
||||||
|
private final HashMap<HRegion, CompactionRequest> regionsInQueue =
|
||||||
|
new HashMap<HRegion, CompactionRequest>();
|
||||||
|
|
||||||
|
/** Creates a new PriorityCompactionQueue with no priority elevation time */
|
||||||
|
public PriorityCompactionQueue() {
|
||||||
|
LOG.debug("Create PriorityCompactionQueue");
|
||||||
|
}
|
||||||
|
|
||||||
|
/** If the region is not already in the queue it will add it and return a
|
||||||
|
* new compaction request object. If it is already present in the queue
|
||||||
|
* then it will return null.
|
||||||
|
* @param p If null it will use the default priority
|
||||||
|
* @return returns a compaction request if it isn't already in the queue
|
||||||
|
*/
|
||||||
|
protected CompactionRequest addToRegionsInQueue(HRegion r, Priority p) {
|
||||||
|
CompactionRequest queuedRequest = null;
|
||||||
|
CompactionRequest newRequest = new CompactionRequest(r, p);
|
||||||
|
synchronized (regionsInQueue) {
|
||||||
|
queuedRequest = regionsInQueue.get(r);
|
||||||
|
if (queuedRequest == null ||
|
||||||
|
newRequest.getPriority().compareTo(queuedRequest.getPriority()) < 0) {
|
||||||
|
LOG.trace("Inserting region in queue. " + newRequest);
|
||||||
|
regionsInQueue.put(r, newRequest);
|
||||||
|
} else {
|
||||||
|
LOG.trace("Region already in queue, skipping. Queued: " + queuedRequest +
|
||||||
|
", requested: " + newRequest);
|
||||||
|
newRequest = null; // It is already present so don't add it
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (newRequest != null && queuedRequest != null) {
|
||||||
|
// Remove the lower priority request
|
||||||
|
queue.remove(queuedRequest);
|
||||||
|
}
|
||||||
|
|
||||||
|
return newRequest;
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Removes the request from the regions in queue
|
||||||
|
* @param p If null it will use the default priority
|
||||||
|
*/
|
||||||
|
protected CompactionRequest removeFromRegionsInQueue(HRegion r) {
|
||||||
|
if (r == null) return null;
|
||||||
|
|
||||||
|
synchronized (regionsInQueue) {
|
||||||
|
CompactionRequest cr = regionsInQueue.remove(r);
|
||||||
|
if (cr == null) {
|
||||||
|
LOG.warn("Removed a region it couldn't find in regionsInQueue: " + r);
|
||||||
|
}
|
||||||
|
return cr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean add(HRegion e, Priority p) {
|
||||||
|
CompactionRequest request = this.addToRegionsInQueue(e, p);
|
||||||
|
if (request != null) {
|
||||||
|
boolean result = queue.add(request);
|
||||||
|
queue.peek();
|
||||||
|
return result;
|
||||||
|
} else {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean add(HRegion e) {
|
||||||
|
return add(e, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean offer(HRegion e, Priority p) {
|
||||||
|
CompactionRequest request = this.addToRegionsInQueue(e, p);
|
||||||
|
return (request != null)? queue.offer(request): false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean offer(HRegion e) {
|
||||||
|
return offer(e, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void put(HRegion e, Priority p) throws InterruptedException {
|
||||||
|
CompactionRequest request = this.addToRegionsInQueue(e, p);
|
||||||
|
if (request != null) {
|
||||||
|
queue.put(request);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void put(HRegion e) throws InterruptedException {
|
||||||
|
put(e, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean offer(HRegion e, Priority p, long timeout, TimeUnit unit)
|
||||||
|
throws InterruptedException {
|
||||||
|
CompactionRequest request = this.addToRegionsInQueue(e, p);
|
||||||
|
return (request != null)? queue.offer(request, timeout, unit): false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean offer(HRegion e, long timeout, TimeUnit unit)
|
||||||
|
throws InterruptedException {
|
||||||
|
return offer(e, null, timeout, unit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HRegion take() throws InterruptedException {
|
||||||
|
CompactionRequest cr = queue.take();
|
||||||
|
if (cr != null) {
|
||||||
|
removeFromRegionsInQueue(cr.getHRegion());
|
||||||
|
return cr.getHRegion();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HRegion poll(long timeout, TimeUnit unit) throws InterruptedException {
|
||||||
|
CompactionRequest cr = queue.poll(timeout, unit);
|
||||||
|
if (cr != null) {
|
||||||
|
removeFromRegionsInQueue(cr.getHRegion());
|
||||||
|
return cr.getHRegion();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean remove(Object r) {
|
||||||
|
if (r instanceof HRegion) {
|
||||||
|
CompactionRequest cr = removeFromRegionsInQueue((HRegion) r);
|
||||||
|
if (cr != null) {
|
||||||
|
return queue.remove(cr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HRegion remove() {
|
||||||
|
CompactionRequest cr = queue.remove();
|
||||||
|
if (cr != null) {
|
||||||
|
removeFromRegionsInQueue(cr.getHRegion());
|
||||||
|
return cr.getHRegion();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HRegion poll() {
|
||||||
|
CompactionRequest cr = queue.poll();
|
||||||
|
if (cr != null) {
|
||||||
|
removeFromRegionsInQueue(cr.getHRegion());
|
||||||
|
return cr.getHRegion();
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int remainingCapacity() {
|
||||||
|
return queue.remainingCapacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean contains(Object r) {
|
||||||
|
if (r instanceof HRegion) {
|
||||||
|
synchronized (regionsInQueue) {
|
||||||
|
return regionsInQueue.containsKey((HRegion) r);
|
||||||
|
}
|
||||||
|
} else if (r instanceof CompactionRequest) {
|
||||||
|
return queue.contains(r);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HRegion element() {
|
||||||
|
CompactionRequest cr = queue.element();
|
||||||
|
return (cr != null)? cr.getHRegion(): null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public HRegion peek() {
|
||||||
|
CompactionRequest cr = queue.peek();
|
||||||
|
return (cr != null)? cr.getHRegion(): null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int size() {
|
||||||
|
return queue.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return queue.isEmpty();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clear() {
|
||||||
|
regionsInQueue.clear();
|
||||||
|
queue.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Unimplemented methods, collection methods
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Iterator<HRegion> iterator() {
|
||||||
|
throw new UnsupportedOperationException("Not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Object[] toArray() {
|
||||||
|
throw new UnsupportedOperationException("Not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public <T> T[] toArray(T[] a) {
|
||||||
|
throw new UnsupportedOperationException("Not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean containsAll(Collection<?> c) {
|
||||||
|
throw new UnsupportedOperationException("Not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean addAll(Collection<? extends HRegion> c) {
|
||||||
|
throw new UnsupportedOperationException("Not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean removeAll(Collection<?> c) {
|
||||||
|
throw new UnsupportedOperationException("Not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean retainAll(Collection<?> c) {
|
||||||
|
throw new UnsupportedOperationException("Not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int drainTo(Collection<? super HRegion> c) {
|
||||||
|
throw new UnsupportedOperationException("Not supported.");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int drainTo(Collection<? super HRegion> c, int maxElements) {
|
||||||
|
throw new UnsupportedOperationException("Not supported.");
|
||||||
|
}
|
||||||
|
}
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.FilterInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.lang.Class;
|
import java.lang.Class;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
|
@ -78,18 +78,43 @@ public class SequenceFileLogReader implements HLog.Reader {
|
||||||
this.length = l;
|
this.length = l;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// This section can be confusing. It is specific to how HDFS works.
|
||||||
|
// Let me try to break it down. This is the problem:
|
||||||
|
//
|
||||||
|
// 1. HDFS DataNodes update the NameNode about a filename's length
|
||||||
|
// on block boundaries or when a file is closed. Therefore,
|
||||||
|
// if an RS dies, then the NN's fs.getLength() can be out of date
|
||||||
|
// 2. this.in.available() would work, but it returns int &
|
||||||
|
// therefore breaks for files > 2GB (happens on big clusters)
|
||||||
|
// 3. DFSInputStream.getFileLength() gets the actual length from the DNs
|
||||||
|
// 4. DFSInputStream is wrapped 2 levels deep : this.in.in
|
||||||
|
//
|
||||||
|
// So, here we adjust getPos() using getFileLength() so the
|
||||||
|
// SequenceFile.Reader constructor (aka: first invocation) comes out
|
||||||
|
// with the correct end of the file:
|
||||||
|
// this.end = in.getPos() + length;
|
||||||
@Override
|
@Override
|
||||||
public long getPos() throws IOException {
|
public long getPos() throws IOException {
|
||||||
if (this.firstGetPosInvocation) {
|
if (this.firstGetPosInvocation) {
|
||||||
this.firstGetPosInvocation = false;
|
this.firstGetPosInvocation = false;
|
||||||
// Tell a lie. We're doing this just so that this line up in
|
long adjust = 0;
|
||||||
// SequenceFile.Reader constructor comes out with the correct length
|
|
||||||
// on the file:
|
try {
|
||||||
// this.end = in.getPos() + length;
|
Field fIn = FilterInputStream.class.getDeclaredField("in");
|
||||||
long available = this.in.available();
|
fIn.setAccessible(true);
|
||||||
// Length gets added up in the SF.Reader constructor so subtract the
|
Object realIn = fIn.get(this.in);
|
||||||
// difference. If available < this.length, then return this.length.
|
long realLength = ((Long)realIn.getClass().
|
||||||
return available >= this.length? available - this.length: this.length;
|
getMethod("getFileLength", new Class<?> []{}).
|
||||||
|
invoke(realIn, new Object []{})).longValue();
|
||||||
|
assert(realLength >= this.length);
|
||||||
|
adjust = realLength - this.length;
|
||||||
|
} catch(Exception e) {
|
||||||
|
SequenceFileLogReader.LOG.warn(
|
||||||
|
"Error while trying to get accurate file length. " +
|
||||||
|
"Truncation / data loss may occur if RegionServers die.", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return adjust + super.getPos();
|
||||||
}
|
}
|
||||||
return super.getPos();
|
return super.getPos();
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,219 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2010 The Apache Software Foundation
|
||||||
|
*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.regionserver;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CompactSplitThread.Priority;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test class for the priority compaction queue
|
||||||
|
*/
|
||||||
|
public class TestPriorityCompactionQueue {
|
||||||
|
static final Log LOG = LogFactory.getLog(TestPriorityCompactionQueue.class);
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
class DummyHRegion extends HRegion {
|
||||||
|
String name;
|
||||||
|
|
||||||
|
DummyHRegion(String name) {
|
||||||
|
super();
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int hashCode() {
|
||||||
|
return name.hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean equals(DummyHRegion r) {
|
||||||
|
return name.equals(r.name);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String toString() {
|
||||||
|
return "[DummyHRegion " + name + "]";
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getRegionNameAsString() {
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void getAndCheckRegion(PriorityCompactionQueue pq,
|
||||||
|
HRegion checkRegion) {
|
||||||
|
HRegion r = pq.remove();
|
||||||
|
if (r != checkRegion) {
|
||||||
|
Assert.assertTrue("Didn't get expected " + checkRegion + " got " + r, r
|
||||||
|
.equals(checkRegion));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void addRegion(PriorityCompactionQueue pq, HRegion r, Priority p) {
|
||||||
|
pq.add(r, p);
|
||||||
|
try {
|
||||||
|
// Sleep 10 millisecond so 2 things are not put in the queue within the
|
||||||
|
// same millisecond. The queue breaks ties arbitrarily between two
|
||||||
|
// requests inserted at the same time. We want the ordering to
|
||||||
|
// be consistent for our unit test.
|
||||||
|
Thread.sleep(1);
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
// continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ////////////////////////////////////////////////////////////////////////////
|
||||||
|
// tests
|
||||||
|
// ////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
/** tests general functionality of the compaction queue */
|
||||||
|
@Test public void testPriorityQueue() throws InterruptedException {
|
||||||
|
PriorityCompactionQueue pq = new PriorityCompactionQueue();
|
||||||
|
|
||||||
|
HRegion r1 = new DummyHRegion("r1");
|
||||||
|
HRegion r2 = new DummyHRegion("r2");
|
||||||
|
HRegion r3 = new DummyHRegion("r3");
|
||||||
|
HRegion r4 = new DummyHRegion("r4");
|
||||||
|
HRegion r5 = new DummyHRegion("r5");
|
||||||
|
|
||||||
|
// test 1
|
||||||
|
// check fifo w/priority
|
||||||
|
addRegion(pq, r1, Priority.HIGH_BLOCKING);
|
||||||
|
addRegion(pq, r2, Priority.HIGH_BLOCKING);
|
||||||
|
addRegion(pq, r3, Priority.HIGH_BLOCKING);
|
||||||
|
addRegion(pq, r4, Priority.HIGH_BLOCKING);
|
||||||
|
addRegion(pq, r5, Priority.HIGH_BLOCKING);
|
||||||
|
|
||||||
|
getAndCheckRegion(pq, r1);
|
||||||
|
getAndCheckRegion(pq, r2);
|
||||||
|
getAndCheckRegion(pq, r3);
|
||||||
|
getAndCheckRegion(pq, r4);
|
||||||
|
getAndCheckRegion(pq, r5);
|
||||||
|
|
||||||
|
// test 2
|
||||||
|
// check fifo
|
||||||
|
addRegion(pq, r1, null);
|
||||||
|
addRegion(pq, r2, null);
|
||||||
|
addRegion(pq, r3, null);
|
||||||
|
addRegion(pq, r4, null);
|
||||||
|
addRegion(pq, r5, null);
|
||||||
|
|
||||||
|
getAndCheckRegion(pq, r1);
|
||||||
|
getAndCheckRegion(pq, r2);
|
||||||
|
getAndCheckRegion(pq, r3);
|
||||||
|
getAndCheckRegion(pq, r4);
|
||||||
|
getAndCheckRegion(pq, r5);
|
||||||
|
|
||||||
|
// test 3
|
||||||
|
// check fifo w/mixed priority
|
||||||
|
addRegion(pq, r1, Priority.HIGH_BLOCKING);
|
||||||
|
addRegion(pq, r2, Priority.NORMAL);
|
||||||
|
addRegion(pq, r3, Priority.HIGH_BLOCKING);
|
||||||
|
addRegion(pq, r4, Priority.NORMAL);
|
||||||
|
addRegion(pq, r5, Priority.HIGH_BLOCKING);
|
||||||
|
|
||||||
|
getAndCheckRegion(pq, r1);
|
||||||
|
getAndCheckRegion(pq, r3);
|
||||||
|
getAndCheckRegion(pq, r5);
|
||||||
|
getAndCheckRegion(pq, r2);
|
||||||
|
getAndCheckRegion(pq, r4);
|
||||||
|
|
||||||
|
// test 4
|
||||||
|
// check fifo w/mixed priority
|
||||||
|
addRegion(pq, r1, Priority.NORMAL);
|
||||||
|
addRegion(pq, r2, Priority.NORMAL);
|
||||||
|
addRegion(pq, r3, Priority.NORMAL);
|
||||||
|
addRegion(pq, r4, Priority.NORMAL);
|
||||||
|
addRegion(pq, r5, Priority.HIGH_BLOCKING);
|
||||||
|
|
||||||
|
getAndCheckRegion(pq, r5);
|
||||||
|
getAndCheckRegion(pq, r1);
|
||||||
|
getAndCheckRegion(pq, r2);
|
||||||
|
getAndCheckRegion(pq, r3);
|
||||||
|
getAndCheckRegion(pq, r4);
|
||||||
|
|
||||||
|
// test 5
|
||||||
|
// check fifo w/mixed priority elevation time
|
||||||
|
addRegion(pq, r1, Priority.NORMAL);
|
||||||
|
addRegion(pq, r2, Priority.HIGH_BLOCKING);
|
||||||
|
addRegion(pq, r3, Priority.NORMAL);
|
||||||
|
Thread.sleep(1000);
|
||||||
|
addRegion(pq, r4, Priority.NORMAL);
|
||||||
|
addRegion(pq, r5, Priority.HIGH_BLOCKING);
|
||||||
|
|
||||||
|
getAndCheckRegion(pq, r2);
|
||||||
|
getAndCheckRegion(pq, r5);
|
||||||
|
getAndCheckRegion(pq, r1);
|
||||||
|
getAndCheckRegion(pq, r3);
|
||||||
|
getAndCheckRegion(pq, r4);
|
||||||
|
|
||||||
|
// reset the priority compaction queue back to a normal queue
|
||||||
|
pq = new PriorityCompactionQueue();
|
||||||
|
|
||||||
|
// test 7
|
||||||
|
// test that lower priority are removed from the queue when a high priority
|
||||||
|
// is added
|
||||||
|
addRegion(pq, r1, Priority.NORMAL);
|
||||||
|
addRegion(pq, r2, Priority.NORMAL);
|
||||||
|
addRegion(pq, r3, Priority.NORMAL);
|
||||||
|
addRegion(pq, r4, Priority.NORMAL);
|
||||||
|
addRegion(pq, r5, Priority.NORMAL);
|
||||||
|
addRegion(pq, r3, Priority.HIGH_BLOCKING);
|
||||||
|
|
||||||
|
getAndCheckRegion(pq, r3);
|
||||||
|
getAndCheckRegion(pq, r1);
|
||||||
|
getAndCheckRegion(pq, r2);
|
||||||
|
getAndCheckRegion(pq, r4);
|
||||||
|
getAndCheckRegion(pq, r5);
|
||||||
|
|
||||||
|
Assert.assertTrue("Queue should be empty.", pq.size() == 0);
|
||||||
|
|
||||||
|
// test 8
|
||||||
|
// don't add the same region more than once
|
||||||
|
addRegion(pq, r1, Priority.NORMAL);
|
||||||
|
addRegion(pq, r2, Priority.NORMAL);
|
||||||
|
addRegion(pq, r3, Priority.NORMAL);
|
||||||
|
addRegion(pq, r4, Priority.NORMAL);
|
||||||
|
addRegion(pq, r5, Priority.NORMAL);
|
||||||
|
addRegion(pq, r1, Priority.NORMAL);
|
||||||
|
addRegion(pq, r2, Priority.NORMAL);
|
||||||
|
addRegion(pq, r3, Priority.NORMAL);
|
||||||
|
addRegion(pq, r4, Priority.NORMAL);
|
||||||
|
addRegion(pq, r5, Priority.NORMAL);
|
||||||
|
|
||||||
|
getAndCheckRegion(pq, r1);
|
||||||
|
getAndCheckRegion(pq, r2);
|
||||||
|
getAndCheckRegion(pq, r3);
|
||||||
|
getAndCheckRegion(pq, r4);
|
||||||
|
getAndCheckRegion(pq, r5);
|
||||||
|
|
||||||
|
Assert.assertTrue("Queue should be empty.", pq.size() == 0);
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue