Merge online snapshot branch with trunk 2/14/13

git-svn-id: https://svn.apache.org/repos/asf/hbase/branches/hbase-7290v2@1446179 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jonathan Hsieh 2013-02-14 13:59:06 +00:00
commit 8606f57f29
35 changed files with 651 additions and 502 deletions

View File

@ -235,6 +235,7 @@ case $startStop in
rm -f "$HBASE_START_FILE" rm -f "$HBASE_START_FILE"
if [ -f $pid ]; then if [ -f $pid ]; then
pidToKill=`cat $pid` pidToKill=`cat $pid`
processedAt=`date +%s`
# kill -0 == see if the PID exists # kill -0 == see if the PID exists
if kill -0 $pidToKill > /dev/null 2>&1; then if kill -0 $pidToKill > /dev/null 2>&1; then
echo -n stopping $command echo -n stopping $command
@ -244,7 +245,16 @@ case $startStop in
do do
echo -n "." echo -n "."
sleep 1; sleep 1;
# if process persists more than $HBASE_STOP_TIMEOUT (default 1200 sec) no mercy
if [ $(( `date +%s` - $processedAt )) -gt ${HBASE_STOP_TIMEOUT:-1200} ]; then
break;
fi
done done
# process still there : kill kill
if kill -0 $pidToKill > /dev/null 2>&1; then
echo -n force stopping $command
kill -9 $pidToKill > /dev/null 2>&1
fi
rm $pid rm $pid
echo echo
else else

View File

@ -32,8 +32,8 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.HStore.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
@ -175,7 +175,7 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
// nothing to do here // nothing to do here
} }
protected ScanInfo getScanInfo(HStore store, RegionCoprocessorEnvironment e) { protected ScanInfo getScanInfo(Store store, RegionCoprocessorEnvironment e) {
byte[] data = ((ZKWatcher)e.getSharedData().get(zkkey)).getData(); byte[] data = ((ZKWatcher)e.getSharedData().get(zkkey)).getData();
if (data == null) { if (data == null) {
return null; return null;
@ -184,15 +184,16 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
if (oldSI.getTtl() == Long.MAX_VALUE) { if (oldSI.getTtl() == Long.MAX_VALUE) {
return null; return null;
} }
long ttl = Math.max(EnvironmentEdgeManager.currentTimeMillis() - Bytes.toLong(data), oldSI.getTtl()); long ttl = Math.max(EnvironmentEdgeManager.currentTimeMillis() -
Bytes.toLong(data), oldSI.getTtl());
return new ScanInfo(store.getFamily(), ttl, return new ScanInfo(store.getFamily(), ttl,
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
} }
@Override @Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
HStore store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
HStore.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
if (scanInfo == null) { if (scanInfo == null) {
// take default action // take default action
return null; return null;
@ -200,30 +201,30 @@ public class ZooKeeperScanPolicyObserver extends BaseRegionObserver {
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions()); scan.setMaxVersions(scanInfo.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(), ScanType.MINOR_COMPACT, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
HConstants.OLDEST_TIMESTAMP);
} }
@Override @Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(
HStore store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s) throws IOException { InternalScanner s) throws IOException {
HStore.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
if (scanInfo == null) { if (scanInfo == null) {
// take default action // take default action
return null; return null;
} }
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions()); scan.setMaxVersions(scanInfo.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion() return new StoreScanner(store, scanInfo, scan, scanners, scanType,
.getSmallestReadPoint(), earliestPutTs); store.getSmallestReadPoint(), earliestPutTs);
} }
@Override @Override
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final Scan scan, final NavigableSet<byte[]> targetCols, final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
final KeyValueScanner s) throws IOException { final KeyValueScanner s) throws IOException {
HStore.ScanInfo scanInfo = getScanInfo(store, c.getEnvironment()); ScanInfo scanInfo = getScanInfo(store, c.getEnvironment());
if (scanInfo == null) { if (scanInfo == null) {
// take default action // take default action
return null; return null;

View File

@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -77,7 +77,7 @@ public abstract class BaseRegionObserver implements RegionObserver {
@Override @Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final KeyValueScanner memstoreScanner, final InternalScanner s) final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
throws IOException { throws IOException {
return null; return null;
} }
@ -91,13 +91,13 @@ public abstract class BaseRegionObserver implements RegionObserver {
} }
@Override @Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> e, HStore store, public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
InternalScanner scanner) throws IOException { InternalScanner scanner) throws IOException {
return scanner; return scanner;
} }
@Override @Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e, HStore store, public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
StoreFile resultFile) throws IOException { StoreFile resultFile) throws IOException {
} }
@ -132,28 +132,28 @@ public abstract class BaseRegionObserver implements RegionObserver {
@Override @Override
public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final List<StoreFile> candidates) throws IOException { } final Store store, final List<StoreFile> candidates) throws IOException { }
@Override @Override
public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final ImmutableList<StoreFile> selected) { } final Store store, final ImmutableList<StoreFile> selected) { }
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
final HStore store, final InternalScanner scanner, final ScanType scanType) final Store store, final InternalScanner scanner, final ScanType scanType)
throws IOException { throws IOException {
return scanner; return scanner;
} }
@Override @Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, List<? extends KeyValueScanner> scanners, final ScanType scanType, final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s) throws IOException { final long earliestPutTs, final InternalScanner s) throws IOException {
return null; return null;
} }
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final HStore store, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
final StoreFile resultFile) throws IOException { final StoreFile resultFile) throws IOException {
} }
@ -290,7 +290,7 @@ public abstract class BaseRegionObserver implements RegionObserver {
@Override @Override
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final Scan scan, final NavigableSet<byte[]> targetCols, final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
final KeyValueScanner s) throws IOException { final KeyValueScanner s) throws IOException {
return null; return null;
} }

View File

@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner; import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey; import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -84,14 +84,14 @@ public interface RegionObserver extends Coprocessor {
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final KeyValueScanner memstoreScanner, final InternalScanner s) final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
throws IOException; throws IOException;
/** /**
* Called before the memstore is flushed to disk. * Called before the memstore is flushed to disk.
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
* @deprecated use {@link #preFlush(ObserverContext, HStore, InternalScanner)} instead * @deprecated use {@link #preFlush(ObserverContext, Store, InternalScanner)} instead
*/ */
void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException; void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException;
@ -104,14 +104,14 @@ public interface RegionObserver extends Coprocessor {
* unless the implementation is writing new store files on its own. * unless the implementation is writing new store files on its own.
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
InternalScanner preFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final HStore store, InternalScanner preFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
final InternalScanner scanner) throws IOException; final InternalScanner scanner) throws IOException;
/** /**
* Called after the memstore is flushed to disk. * Called after the memstore is flushed to disk.
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
* @deprecated use {@link #preFlush(ObserverContext, HStore, InternalScanner)} instead. * @deprecated use {@link #preFlush(ObserverContext, Store, InternalScanner)} instead.
*/ */
void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException; void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException;
@ -122,7 +122,7 @@ public interface RegionObserver extends Coprocessor {
* @param resultFile the new store file written out during compaction * @param resultFile the new store file written out during compaction
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final HStore store, void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
final StoreFile resultFile) throws IOException; final StoreFile resultFile) throws IOException;
/** /**
@ -135,7 +135,7 @@ public interface RegionObserver extends Coprocessor {
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final List<StoreFile> candidates) throws IOException; final Store store, final List<StoreFile> candidates) throws IOException;
/** /**
* Called after the {@link StoreFile}s to compact have been selected from the * Called after the {@link StoreFile}s to compact have been selected from the
@ -145,7 +145,7 @@ public interface RegionObserver extends Coprocessor {
* @param selected the store files selected to compact * @param selected the store files selected to compact
*/ */
void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c, void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final ImmutableList<StoreFile> selected); final Store store, final ImmutableList<StoreFile> selected);
/** /**
* Called prior to writing the {@link StoreFile}s selected for compaction into * Called prior to writing the {@link StoreFile}s selected for compaction into
@ -173,7 +173,7 @@ public interface RegionObserver extends Coprocessor {
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final InternalScanner scanner, final Store store, final InternalScanner scanner,
final ScanType scanType) throws IOException; final ScanType scanType) throws IOException;
/** /**
@ -196,7 +196,7 @@ public interface RegionObserver extends Coprocessor {
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, List<? extends KeyValueScanner> scanners, final ScanType scanType, final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s) throws IOException; final long earliestPutTs, final InternalScanner s) throws IOException;
/** /**
@ -207,7 +207,7 @@ public interface RegionObserver extends Coprocessor {
* @param resultFile the new store file written out during compaction * @param resultFile the new store file written out during compaction
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final HStore store, void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
StoreFile resultFile) throws IOException; StoreFile resultFile) throws IOException;
/** /**
@ -215,17 +215,19 @@ public interface RegionObserver extends Coprocessor {
* @param c the environment provided by the region server * @param c the environment provided by the region server
* (e.getRegion() returns the parent region) * (e.getRegion() returns the parent region)
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
* @deprecated Use preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) * @deprecated Use preSplit(
* final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
*/ */
void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException; void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException;
/** /**
* Called before the region is split. * Called before the region is split.
* @param c the environment provided by the region server * @param c the environment provided by the region server
* (e.getRegion() returns the parent region) * (e.getRegion() returns the parent region)
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow) throws IOException; void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c, byte[] splitRow)
throws IOException;
/** /**
* Called after the region is split. * Called after the region is split.
@ -238,28 +240,30 @@ public interface RegionObserver extends Coprocessor {
*/ */
void postSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final HRegion l, void postSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final HRegion l,
final HRegion r) throws IOException; final HRegion r) throws IOException;
/** /**
* This will be called before the roll back of the split region is completed * This will be called before the roll back of the split region is completed
* @param ctx * @param ctx
* @throws IOException * @throws IOException
*/ */
void preRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException; void preRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException;
/** /**
* This will be called after the roll back of the split region is completed * This will be called after the roll back of the split region is completed
* @param ctx * @param ctx
* @throws IOException * @throws IOException
*/ */
void postRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException; void postRollBackSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException;
/** /**
* Called after any split request is processed. This will be called irrespective of success or * Called after any split request is processed. This will be called irrespective of success or
* failure of the split. * failure of the split.
* @param ctx * @param ctx
* @throws IOException * @throws IOException
*/ */
void postCompleteSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx) throws IOException; void postCompleteSplit(final ObserverContext<RegionCoprocessorEnvironment> ctx)
throws IOException;
/** /**
* Called before the region is reported as closed to the master. * Called before the region is reported as closed to the master.
* @param c the environment provided by the region server * @param c the environment provided by the region server
@ -661,8 +665,9 @@ public interface RegionObserver extends Coprocessor {
* Called before a store opens a new scanner. * Called before a store opens a new scanner.
* This hook is called when a "user" scanner is opened. * This hook is called when a "user" scanner is opened.
* <p> * <p>
* See {@link #preFlushScannerOpen(ObserverContext, HStore, KeyValueScanner, InternalScanner)} * See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner)}
* and {@link #preCompactScannerOpen(ObserverContext, HStore, List, ScanType, long, InternalScanner)} * and {@link #preCompactScannerOpen(ObserverContext,
* Store, List, ScanType, long, InternalScanner)}
* to override scanners created for flushes or compactions, resp. * to override scanners created for flushes or compactions, resp.
* <p> * <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained * Call CoprocessorEnvironment#complete to skip any subsequent chained
@ -678,7 +683,7 @@ public interface RegionObserver extends Coprocessor {
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final Scan scan, final NavigableSet<byte[]> targetCols, final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
final KeyValueScanner s) throws IOException; final KeyValueScanner s) throws IOException;
/** /**

View File

@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.ipc.RpcCallContext;
import org.apache.hadoop.hbase.ipc.UnknownProtocolException; import org.apache.hadoop.hbase.ipc.UnknownProtocolException;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall; import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription; import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
@ -296,6 +297,9 @@ public class HRegion implements HeapSize { // , Writable{
*/ */
private boolean isLoadingCfsOnDemandDefault = false; private boolean isLoadingCfsOnDemandDefault = false;
private final AtomicInteger majorInProgress = new AtomicInteger(0);
private final AtomicInteger minorInProgress = new AtomicInteger(0);
/** /**
* @return The smallest mvcc readPoint across all the scanners in this * @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every * region. Writes older than this readPoint, are included in every
@ -5093,7 +5097,7 @@ public class HRegion implements HeapSize { // , Writable{
public static final long FIXED_OVERHEAD = ClassSize.align( public static final long FIXED_OVERHEAD = ClassSize.align(
ClassSize.OBJECT + ClassSize.OBJECT +
ClassSize.ARRAY + ClassSize.ARRAY +
39 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT + 41 * ClassSize.REFERENCE + 2 * Bytes.SIZEOF_INT +
(10 * Bytes.SIZEOF_LONG) + (10 * Bytes.SIZEOF_LONG) +
Bytes.SIZEOF_BOOLEAN); Bytes.SIZEOF_BOOLEAN);
@ -5564,6 +5568,24 @@ public class HRegion implements HeapSize { // , Writable{
return this.openSeqNum; return this.openSeqNum;
} }
/**
* @return if a given region is in compaction now.
*/
public CompactionState getCompactionState() {
boolean hasMajor = majorInProgress.get() > 0, hasMinor = minorInProgress.get() > 0;
return (hasMajor ? (hasMinor ? CompactionState.MAJOR_AND_MINOR : CompactionState.MAJOR)
: (hasMinor ? CompactionState.MINOR : CompactionState.NONE));
}
public void reportCompactionRequestStart(boolean isMajor){
(isMajor ? majorInProgress : minorInProgress).incrementAndGet();
}
public void reportCompactionRequestEnd(boolean isMajor){
int newValue = (isMajor ? majorInProgress : minorInProgress).decrementAndGet();
assert newValue >= 0;
}
/** /**
* Listener class to enable callers of * Listener class to enable callers of
* bulkLoadHFile() to perform any necessary * bulkLoadHFile() to perform any necessary

View File

@ -3328,8 +3328,7 @@ public class HRegionServer implements ClientProtocol,
GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder(); GetRegionInfoResponse.Builder builder = GetRegionInfoResponse.newBuilder();
builder.setRegionInfo(HRegionInfo.convert(info)); builder.setRegionInfo(HRegionInfo.convert(info));
if (request.hasCompactionState() && request.getCompactionState()) { if (request.hasCompactionState() && request.getCompactionState()) {
builder.setCompactionState( builder.setCompactionState(region.getCompactionState());
CompactionRequest.getCompactionState(info.getRegionId()));
} }
return builder.build(); return builder.build();
} catch (IOException ie) { } catch (IOException ie) {

View File

@ -70,6 +70,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor; import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType; import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -150,9 +151,11 @@ public class HStore implements Store {
private int bytesPerChecksum; private int bytesPerChecksum;
// Comparing KeyValues // Comparing KeyValues
final KeyValue.KVComparator comparator; private final KeyValue.KVComparator comparator;
private Compactor compactor; private Compactor compactor;
private OffPeakCompactions offPeakCompactions;
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10; private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
private static int flush_retries_number; private static int flush_retries_number;
@ -207,11 +210,11 @@ public class HStore implements Store {
// to clone it? // to clone it?
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator); scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator); this.memstore = new MemStore(conf, this.comparator);
this.offPeakCompactions = new OffPeakCompactions(conf);
// Setting up cache configuration for this family // Setting up cache configuration for this family
this.cacheConf = new CacheConfig(conf, family); this.cacheConf = new CacheConfig(conf, family);
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false); this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
if (HStore.closeCheckInterval == 0) { if (HStore.closeCheckInterval == 0) {
@ -287,6 +290,7 @@ public class HStore implements Store {
return homedir; return homedir;
} }
@Override
public FileSystem getFileSystem() { public FileSystem getFileSystem() {
return this.fs; return this.fs;
} }
@ -803,8 +807,8 @@ public class HStore implements Store {
// treat this as a minor compaction. // treat this as a minor compaction.
InternalScanner scanner = null; InternalScanner scanner = null;
KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator); KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
if (getHRegion().getCoprocessorHost() != null) { if (this.region.getCoprocessorHost() != null) {
scanner = getHRegion().getCoprocessorHost() scanner = this.region.getCoprocessorHost()
.preFlushScannerOpen(this, memstoreScanner); .preFlushScannerOpen(this, memstoreScanner);
} }
if (scanner == null) { if (scanner == null) {
@ -814,9 +818,9 @@ public class HStore implements Store {
Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT, Collections.singletonList(memstoreScanner), ScanType.MINOR_COMPACT,
this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP); this.region.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
} }
if (getHRegion().getCoprocessorHost() != null) { if (this.region.getCoprocessorHost() != null) {
InternalScanner cpScanner = InternalScanner cpScanner =
getHRegion().getCoprocessorHost().preFlush(this, scanner); this.region.getCoprocessorHost().preFlush(this, scanner);
// NULL scanner returned from coprocessor hooks means skip normal processing // NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) { if (cpScanner == null) {
return null; return null;
@ -1001,7 +1005,8 @@ public class HStore implements Store {
* the line). * the line).
* @return all scanners for this store * @return all scanners for this store
*/ */
protected List<KeyValueScanner> getScanners(boolean cacheBlocks, @Override
public List<KeyValueScanner> getScanners(boolean cacheBlocks,
boolean isGet, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow, boolean isGet, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
byte[] stopRow) throws IOException { byte[] stopRow) throws IOException {
Collection<StoreFile> storeFilesToScan; Collection<StoreFile> storeFilesToScan;
@ -1029,17 +1034,13 @@ public class HStore implements Store {
return scanners; return scanners;
} }
/* @Override
* @param o Observer who wants to know about changes in set of Readers public void addChangedReaderObserver(ChangedReadersObserver o) {
*/
void addChangedReaderObserver(ChangedReadersObserver o) {
this.changedReaderObservers.add(o); this.changedReaderObservers.add(o);
} }
/* @Override
* @param o Observer no longer interested in changes in set of Readers. public void deleteChangedReaderObserver(ChangedReadersObserver o) {
*/
void deleteChangedReaderObserver(ChangedReadersObserver o) {
// We don't check if observer present; it may not be (legitimately) // We don't check if observer present; it may not be (legitimately)
this.changedReaderObservers.remove(o); this.changedReaderObservers.remove(o);
} }
@ -1244,8 +1245,13 @@ public class HStore implements Store {
filesToCompact = new CompactSelection(candidates); filesToCompact = new CompactSelection(candidates);
} else { } else {
boolean isUserCompaction = priority == Store.PRIORITY_USER; boolean isUserCompaction = priority == Store.PRIORITY_USER;
boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest();
filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction, filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
forceMajor && filesCompacting.isEmpty()); mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
if (mayUseOffPeak && !filesToCompact.isOffPeakCompaction()) {
// Compaction policy doesn't want to do anything with off-peak.
this.offPeakCompactions.endOffPeakRequest();
}
} }
if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost() != null) {
@ -1274,7 +1280,7 @@ public class HStore implements Store {
this.forceMajor = false; this.forceMajor = false;
} }
LOG.debug(getHRegionInfo().getEncodedName() + " - " + LOG.debug(getRegionInfo().getEncodedName() + " - " +
getColumnFamilyName() + ": Initiating " + getColumnFamilyName() + ": Initiating " +
(isMajor ? "major" : "minor") + " compaction"); (isMajor ? "major" : "minor") + " compaction");
@ -1286,14 +1292,17 @@ public class HStore implements Store {
this.lock.readLock().unlock(); this.lock.readLock().unlock();
} }
if (ret != null) { if (ret != null) {
CompactionRequest.preRequest(ret); this.region.reportCompactionRequestStart(ret.isMajor());
} }
return ret; return ret;
} }
public void finishRequest(CompactionRequest cr) { public void finishRequest(CompactionRequest cr) {
CompactionRequest.postRequest(cr); this.region.reportCompactionRequestEnd(cr.isMajor());
cr.finishRequest(); if (cr.getCompactSelection().isOffPeakCompaction()) {
this.offPeakCompactions.endOffPeakRequest();
cr.getCompactSelection().setOffPeak(false);
}
synchronized (filesCompacting) { synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles()); filesCompacting.removeAll(cr.getFiles());
} }
@ -1645,8 +1654,8 @@ public class HStore implements Store {
lock.readLock().lock(); lock.readLock().lock();
try { try {
KeyValueScanner scanner = null; KeyValueScanner scanner = null;
if (getHRegion().getCoprocessorHost() != null) { if (this.region.getCoprocessorHost() != null) {
scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols); scanner = this.region.getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
} }
if (scanner == null) { if (scanner == null) {
scanner = new StoreScanner(this, getScanInfo(), scan, targetCols); scanner = new StoreScanner(this, getScanInfo(), scan, targetCols);
@ -1743,15 +1752,30 @@ public class HStore implements Store {
return compactionPolicy.throttleCompaction(compactionSize); return compactionPolicy.throttleCompaction(compactionSize);
} }
@Override
public HRegion getHRegion() { public HRegion getHRegion() {
return this.region; return this.region;
} }
HRegionInfo getHRegionInfo() { @Override
public RegionCoprocessorHost getCoprocessorHost() {
return this.region.getCoprocessorHost();
}
@Override
public HRegionInfo getRegionInfo() {
return this.region.getRegionInfo(); return this.region.getRegionInfo();
} }
@Override
public boolean areWritesEnabled() {
return this.region.areWritesEnabled();
}
@Override
public long getSmallestReadPoint() {
return this.region.getSmallestReadPoint();
}
/** /**
* Used in tests. TODO: Remove * Used in tests. TODO: Remove
* *
@ -1832,7 +1856,7 @@ public class HStore implements Store {
} }
storeFile = HStore.this.commitFile(storeFilePath, cacheFlushId, storeFile = HStore.this.commitFile(storeFilePath, cacheFlushId,
snapshotTimeRangeTracker, flushedSize, status); snapshotTimeRangeTracker, flushedSize, status);
if (HStore.this.getHRegion().getCoprocessorHost() != null) { if (HStore.this.region.getCoprocessorHost() != null) {
HStore.this.getHRegion() HStore.this.getHRegion()
.getCoprocessorHost() .getCoprocessorHost()
.postFlush(HStore.this, storeFile); .postFlush(HStore.this, storeFile);
@ -1855,7 +1879,7 @@ public class HStore implements Store {
} }
public static final long FIXED_OVERHEAD = public static final long FIXED_OVERHEAD =
ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG) ClassSize.align((21 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
+ (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN); + (2 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
@ -1873,6 +1897,7 @@ public class HStore implements Store {
return comparator; return comparator;
} }
@Override
public ScanInfo getScanInfo() { public ScanInfo getScanInfo() {
return scanInfo; return scanInfo;
} }
@ -1884,84 +1909,4 @@ public class HStore implements Store {
void setScanInfo(ScanInfo scanInfo) { void setScanInfo(ScanInfo scanInfo) {
this.scanInfo = scanInfo; this.scanInfo = scanInfo;
} }
/**
* Immutable information for scans over a store.
*/
public static class ScanInfo {
private byte[] family;
private int minVersions;
private int maxVersions;
private long ttl;
private boolean keepDeletedCells;
private long timeToPurgeDeletes;
private KVComparator comparator;
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
+ Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN);
/**
* @param family {@link HColumnDescriptor} describing the column family
* @param ttl Store's TTL (in ms)
* @param timeToPurgeDeletes duration in ms after which a delete marker can
* be purged during a major compaction.
* @param comparator The store's comparator
*/
public ScanInfo(HColumnDescriptor family, long ttl, long timeToPurgeDeletes, KVComparator comparator) {
this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
.getKeepDeletedCells(), timeToPurgeDeletes, comparator);
}
/**
* @param family Name of this store's column family
* @param minVersions Store's MIN_VERSIONS setting
* @param maxVersions Store's VERSIONS setting
* @param ttl Store's TTL (in ms)
* @param timeToPurgeDeletes duration in ms after which a delete marker can
* be purged during a major compaction.
* @param keepDeletedCells Store's keepDeletedCells setting
* @param comparator The store's comparator
*/
public ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
boolean keepDeletedCells, long timeToPurgeDeletes,
KVComparator comparator) {
this.family = family;
this.minVersions = minVersions;
this.maxVersions = maxVersions;
this.ttl = ttl;
this.keepDeletedCells = keepDeletedCells;
this.timeToPurgeDeletes = timeToPurgeDeletes;
this.comparator = comparator;
}
public byte[] getFamily() {
return family;
}
public int getMinVersions() {
return minVersions;
}
public int getMaxVersions() {
return maxVersions;
}
public long getTtl() {
return ttl;
}
public boolean getKeepDeletedCells() {
return keepDeletedCells;
}
public long getTimeToPurgeDeletes() {
return timeToPurgeDeletes;
}
public KVComparator getComparator() {
return comparator;
}
}
} }

View File

@ -246,7 +246,8 @@ public class RegionCoprocessorHost
* {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks. * {@link org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost#postOpen()} are such hooks.
* *
* See also * See also
* {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(CoprocessorEnvironment, Throwable)} * {@link org.apache.hadoop.hbase.master.MasterCoprocessorHost#handleCoprocessorThrowable(
* CoprocessorEnvironment, Throwable)}
* @param env The coprocessor that threw the exception. * @param env The coprocessor that threw the exception.
* @param e The exception that was thrown. * @param e The exception that was thrown.
*/ */
@ -256,8 +257,9 @@ public class RegionCoprocessorHost
handleCoprocessorThrowable(env,e); handleCoprocessorThrowable(env,e);
} catch (IOException ioe) { } catch (IOException ioe) {
// We cannot throw exceptions from the caller hook, so ignore. // We cannot throw exceptions from the caller hook, so ignore.
LOG.warn("handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " + e LOG.warn(
+ ". Ignoring.",e); "handleCoprocessorThrowable() threw an IOException while attempting to handle Throwable " +
e + ". Ignoring.",e);
} }
} }
@ -343,9 +345,10 @@ public class RegionCoprocessorHost
/** /**
* See * See
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, HStore, List, ScanType, long, InternalScanner)} * {@link RegionObserver#preCompactScannerOpen(ObserverContext,
* Store, List, ScanType, long, InternalScanner)}
*/ */
public InternalScanner preCompactScannerOpen(HStore store, List<StoreFileScanner> scanners, public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long earliestPutTs) throws IOException { ScanType scanType, long earliestPutTs) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null;
InternalScanner s = null; InternalScanner s = null;
@ -374,7 +377,7 @@ public class RegionCoprocessorHost
* @return If {@code true}, skip the normal selection process and use the current list * @return If {@code true}, skip the normal selection process and use the current list
* @throws IOException * @throws IOException
*/ */
public boolean preCompactSelection(HStore store, List<StoreFile> candidates) throws IOException { public boolean preCompactSelection(Store store, List<StoreFile> candidates) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false; boolean bypass = false;
for (RegionEnvironment env: coprocessors) { for (RegionEnvironment env: coprocessors) {
@ -402,7 +405,7 @@ public class RegionCoprocessorHost
* @param store The store where compaction is being requested * @param store The store where compaction is being requested
* @param selected The store files selected to compact * @param selected The store files selected to compact
*/ */
public void postCompactSelection(HStore store, public void postCompactSelection(Store store,
ImmutableList<StoreFile> selected) { ImmutableList<StoreFile> selected) {
ObserverContext<RegionCoprocessorEnvironment> ctx = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) { for (RegionEnvironment env: coprocessors) {
@ -428,7 +431,7 @@ public class RegionCoprocessorHost
* @param scanType type of Scan * @param scanType type of Scan
* @throws IOException * @throws IOException
*/ */
public InternalScanner preCompact(HStore store, InternalScanner scanner, public InternalScanner preCompact(Store store, InternalScanner scanner,
ScanType scanType) throws IOException { ScanType scanType) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false; boolean bypass = false;
@ -456,7 +459,7 @@ public class RegionCoprocessorHost
* @param resultFile the new store file written during compaction * @param resultFile the new store file written during compaction
* @throws IOException * @throws IOException
*/ */
public void postCompact(HStore store, StoreFile resultFile) throws IOException { public void postCompact(Store store, StoreFile resultFile) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) { for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) { if (env.getInstance() instanceof RegionObserver) {
@ -477,7 +480,7 @@ public class RegionCoprocessorHost
* Invoked before a memstore flush * Invoked before a memstore flush
* @throws IOException * @throws IOException
*/ */
public InternalScanner preFlush(HStore store, InternalScanner scanner) throws IOException { public InternalScanner preFlush(Store store, InternalScanner scanner) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false; boolean bypass = false;
for (RegionEnvironment env: coprocessors) { for (RegionEnvironment env: coprocessors) {
@ -521,16 +524,19 @@ public class RegionCoprocessorHost
/** /**
* See * See
* {@link RegionObserver#preFlushScannerOpen(ObserverContext, HStore, KeyValueScanner, InternalScanner)} * {@link RegionObserver#preFlushScannerOpen(ObserverContext,
* Store, KeyValueScanner, InternalScanner)}
*/ */
public InternalScanner preFlushScannerOpen(HStore store, KeyValueScanner memstoreScanner) throws IOException { public InternalScanner preFlushScannerOpen(Store store, KeyValueScanner memstoreScanner)
throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null;
InternalScanner s = null; InternalScanner s = null;
for (RegionEnvironment env : coprocessors) { for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) { if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx); ctx = ObserverContext.createAndPrepare(env, ctx);
try { try {
s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store, memstoreScanner, s); s = ((RegionObserver) env.getInstance())
.preFlushScannerOpen(ctx, store, memstoreScanner, s);
} catch (Throwable e) { } catch (Throwable e) {
handleCoprocessorThrowable(env, e); handleCoprocessorThrowable(env, e);
} }
@ -567,7 +573,7 @@ public class RegionCoprocessorHost
* Invoked after a memstore flush * Invoked after a memstore flush
* @throws IOException * @throws IOException
*/ */
public void postFlush(final HStore store, final StoreFile storeFile) throws IOException { public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) { for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) { if (env.getInstance() instanceof RegionObserver) {
@ -1243,9 +1249,10 @@ public class RegionCoprocessorHost
/** /**
* See * See
* {@link RegionObserver#preStoreScannerOpen(ObserverContext, HStore, Scan, NavigableSet, KeyValueScanner)} * {@link RegionObserver#preStoreScannerOpen(ObserverContext,
* Store, Scan, NavigableSet, KeyValueScanner)}
*/ */
public KeyValueScanner preStoreScannerOpen(HStore store, Scan scan, public KeyValueScanner preStoreScannerOpen(Store store, Scan scan,
final NavigableSet<byte[]> targetCols) throws IOException { final NavigableSet<byte[]> targetCols) throws IOException {
KeyValueScanner s = null; KeyValueScanner s = null;
ObserverContext<RegionCoprocessorEnvironment> ctx = null; ObserverContext<RegionCoprocessorEnvironment> ctx = null;

View File

@ -0,0 +1,106 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
/**
* Immutable information for scans over a store.
*/
@InterfaceAudience.Private
public class ScanInfo {
private byte[] family;
private int minVersions;
private int maxVersions;
private long ttl;
private boolean keepDeletedCells;
private long timeToPurgeDeletes;
private KVComparator comparator;
public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
+ Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN);
/**
* @param family {@link HColumnDescriptor} describing the column family
* @param ttl Store's TTL (in ms)
* @param timeToPurgeDeletes duration in ms after which a delete marker can
* be purged during a major compaction.
* @param comparator The store's comparator
*/
public ScanInfo(final HColumnDescriptor family, final long ttl, final long timeToPurgeDeletes,
final KVComparator comparator) {
this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
.getKeepDeletedCells(), timeToPurgeDeletes, comparator);
}
/**
* @param family Name of this store's column family
* @param minVersions Store's MIN_VERSIONS setting
* @param maxVersions Store's VERSIONS setting
* @param ttl Store's TTL (in ms)
* @param timeToPurgeDeletes duration in ms after which a delete marker can
* be purged during a major compaction.
* @param keepDeletedCells Store's keepDeletedCells setting
* @param comparator The store's comparator
*/
public ScanInfo(final byte[] family, final int minVersions, final int maxVersions,
final long ttl, final boolean keepDeletedCells, final long timeToPurgeDeletes,
final KVComparator comparator) {
this.family = family;
this.minVersions = minVersions;
this.maxVersions = maxVersions;
this.ttl = ttl;
this.keepDeletedCells = keepDeletedCells;
this.timeToPurgeDeletes = timeToPurgeDeletes;
this.comparator = comparator;
}
public byte[] getFamily() {
return family;
}
public int getMinVersions() {
return minVersions;
}
public int getMaxVersions() {
return maxVersions;
}
public long getTtl() {
return ttl;
}
public boolean getKeepDeletedCells() {
return keepDeletedCells;
}
public long getTimeToPurgeDeletes() {
return timeToPurgeDeletes;
}
public KVComparator getComparator() {
return comparator;
}
}

View File

@ -136,7 +136,7 @@ public class ScanQueryMatcher {
* @param oldestUnexpiredTS the oldest timestamp we are interested in, * @param oldestUnexpiredTS the oldest timestamp we are interested in,
* based on TTL * based on TTL
*/ */
public ScanQueryMatcher(Scan scan, HStore.ScanInfo scanInfo, public ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> columns, ScanType scanType, NavigableSet<byte[]> columns, ScanType scanType,
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS) { long readPointToUse, long earliestPutTs, long oldestUnexpiredTS) {
this.tr = scan.getTimeRange(); this.tr = scan.getTimeRange();
@ -182,7 +182,7 @@ public class ScanQueryMatcher {
/* /*
* Constructor for tests * Constructor for tests
*/ */
ScanQueryMatcher(Scan scan, HStore.ScanInfo scanInfo, ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
NavigableSet<byte[]> columns, long oldestUnexpiredTS) { NavigableSet<byte[]> columns, long oldestUnexpiredTS) {
this(scan, scanInfo, columns, ScanType.USER_SCAN, this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */ Long.MAX_VALUE, /* max Readpoint to track versions */

View File

@ -24,11 +24,14 @@ import java.util.NavigableSet;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
@ -74,6 +77,23 @@ public interface Store extends HeapSize, StoreConfigInformation {
public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols) public KeyValueScanner getScanner(Scan scan, final NavigableSet<byte[]> targetCols)
throws IOException; throws IOException;
/**
* Get all scanners with no filtering based on TTL (that happens further down
* the line).
* @param cacheBlocks
* @param isGet
* @param isCompaction
* @param matcher
* @param startRow
* @param stopRow
* @return all scanners for this store
*/
public List<KeyValueScanner> getScanners(boolean cacheBlocks,
boolean isGet, boolean isCompaction, ScanQueryMatcher matcher, byte[] startRow,
byte[] stopRow) throws IOException;
public ScanInfo getScanInfo();
/** /**
* Adds or replaces the specified KeyValues. * Adds or replaces the specified KeyValues.
* <p> * <p>
@ -117,6 +137,17 @@ public interface Store extends HeapSize, StoreConfigInformation {
*/ */
public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException; public KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException;
public FileSystem getFileSystem();
/*
* @param maxKeyCount
* @param compression Compression algorithm to use
* @param isCompaction whether we are creating a new file in a compaction
* @return Writer for a new StoreFile in the tmp dir.
*/
public StoreFile.Writer createWriterInTmp(int maxKeyCount,
Compression.Algorithm compression, boolean isCompaction) throws IOException;
// Compaction oriented methods // Compaction oriented methods
public boolean throttleCompaction(long compactionSize); public boolean throttleCompaction(long compactionSize);
@ -259,11 +290,32 @@ public interface Store extends HeapSize, StoreConfigInformation {
public CacheConfig getCacheConfig(); public CacheConfig getCacheConfig();
/** /**
* @return the parent region hosting this store * @return the parent region info hosting this store
*/ */
public HRegion getHRegion(); public HRegionInfo getRegionInfo();
public RegionCoprocessorHost getCoprocessorHost();
public boolean areWritesEnabled();
/**
* @return The smallest mvcc readPoint across all the scanners in this
* region. Writes older than this readPoint, are included in every
* read operation.
*/
public long getSmallestReadPoint();
public String getColumnFamilyName(); public String getColumnFamilyName();
public String getTableName(); public String getTableName();
/*
* @param o Observer who wants to know about changes in set of Readers
*/
public void addChangedReaderObserver(ChangedReadersObserver o);
/*
* @param o Observer no longer interested in changes in set of Readers.
*/
public void deleteChangedReaderObserver(ChangedReadersObserver o);
} }

View File

@ -32,19 +32,19 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.regionserver.HStore.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
/** /**
* Scanner scans both the memstore and the HStore. Coalesce KeyValue stream * Scanner scans both the memstore and the Store. Coalesce KeyValue stream
* into List<KeyValue> for a single row. * into List<KeyValue> for a single row.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class StoreScanner extends NonLazyKeyValueScanner public class StoreScanner extends NonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver { implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class); static final Log LOG = LogFactory.getLog(StoreScanner.class);
protected HStore store; protected Store store;
protected ScanQueryMatcher matcher; protected ScanQueryMatcher matcher;
protected KeyValueHeap heap; protected KeyValueHeap heap;
protected boolean cacheBlocks; protected boolean cacheBlocks;
@ -75,7 +75,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
protected KeyValue lastTop = null; protected KeyValue lastTop = null;
/** An internal constructor. */ /** An internal constructor. */
protected StoreScanner(HStore store, boolean cacheBlocks, Scan scan, protected StoreScanner(Store store, boolean cacheBlocks, Scan scan,
final NavigableSet<byte[]> columns, long ttl, int minVersions) { final NavigableSet<byte[]> columns, long ttl, int minVersions) {
this.store = store; this.store = store;
this.cacheBlocks = cacheBlocks; this.cacheBlocks = cacheBlocks;
@ -103,7 +103,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
* @param columns which columns we are scanning * @param columns which columns we are scanning
* @throws IOException * @throws IOException
*/ */
public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns) public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns)
throws IOException { throws IOException {
this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(), this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions()); scanInfo.getMinVersions());
@ -139,7 +139,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
this.storeOffset = scan.getRowOffsetPerColumnFamily(); this.storeOffset = scan.getRowOffsetPerColumnFamily();
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.comparator); heap = new KeyValueHeap(scanners, store.getComparator());
this.store.addChangedReaderObserver(this); this.store.addChangedReaderObserver(this);
} }
@ -154,7 +154,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
* @param smallestReadPoint the readPoint that we should use for tracking * @param smallestReadPoint the readPoint that we should use for tracking
* versions * versions
*/ */
public StoreScanner(HStore store, ScanInfo scanInfo, Scan scan, public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType, List<? extends KeyValueScanner> scanners, ScanType scanType,
long smallestReadPoint, long earliestPutTs) throws IOException { long smallestReadPoint, long earliestPutTs) throws IOException {
this(store, false, scan, null, scanInfo.getTtl(), this(store, false, scan, null, scanInfo.getTtl(),
@ -171,11 +171,11 @@ public class StoreScanner extends NonLazyKeyValueScanner
} }
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.comparator); heap = new KeyValueHeap(scanners, store.getComparator());
} }
/** Constructor for testing. */ /** Constructor for testing. */
StoreScanner(final Scan scan, HStore.ScanInfo scanInfo, StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns, ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners) throws IOException { final List<KeyValueScanner> scanners) throws IOException {
this(scan, scanInfo, scanType, columns, scanners, this(scan, scanInfo, scanType, columns, scanners,
@ -183,7 +183,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
} }
// Constructor for testing. // Constructor for testing.
StoreScanner(final Scan scan, HStore.ScanInfo scanInfo, StoreScanner(final Scan scan, ScanInfo scanInfo,
ScanType scanType, final NavigableSet<byte[]> columns, ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs) final List<KeyValueScanner> scanners, long earliestPutTs)
throws IOException { throws IOException {
@ -325,7 +325,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
byte[] row = peeked.getBuffer(); byte[] row = peeked.getBuffer();
int offset = peeked.getRowOffset(); int offset = peeked.getRowOffset();
short length = peeked.getRowLength(); short length = peeked.getRowLength();
if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) { if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
matcher.rowOffset, matcher.rowLength)) {
this.countPerRow = 0; this.countPerRow = 0;
matcher.setRow(row, offset, length); matcher.setRow(row, offset, length);
} }
@ -490,7 +491,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
if (this.heap == null && this.lastTop != null) { if (this.heap == null && this.lastTop != null) {
resetScannerStack(this.lastTop); resetScannerStack(this.lastTop);
if (this.heap.peek() == null if (this.heap.peek() == null
|| store.comparator.compareRows(this.lastTop, this.heap.peek()) != 0) { || store.getComparator().compareRows(this.lastTop, this.heap.peek()) != 0) {
LOG.debug("Storescanner.peek() is changed where before = " LOG.debug("Storescanner.peek() is changed where before = "
+ this.lastTop.toString() + ",and after = " + this.heap.peek()); + this.lastTop.toString() + ",and after = " + this.heap.peek());
this.lastTop = null; this.lastTop = null;
@ -517,7 +518,7 @@ public class StoreScanner extends NonLazyKeyValueScanner
} }
// Combine all seeked scanners with a heap // Combine all seeked scanners with a heap
heap = new KeyValueHeap(scanners, store.comparator); heap = new KeyValueHeap(scanners, store.getComparator());
// Reset the state of the Query Matcher and set to top row. // Reset the state of the Query Matcher and set to top row.
// Only reset and call setRow if the row changes; avoids confusing the // Only reset and call setRow if the row changes; avoids confusing the
@ -529,7 +530,8 @@ public class StoreScanner extends NonLazyKeyValueScanner
byte[] row = kv.getBuffer(); byte[] row = kv.getBuffer();
int offset = kv.getRowOffset(); int offset = kv.getRowOffset();
short length = kv.getRowLength(); short length = kv.getRowLength();
if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row, matcher.rowOffset, matcher.rowLength)) { if ((matcher.row == null) || !Bytes.equals(row, offset, length, matcher.row,
matcher.rowOffset, matcher.rowLength)) {
this.countPerRow = 0; this.countPerRow = 0;
matcher.reset(); matcher.reset();
matcher.setRow(row, offset, length); matcher.setRow(row, offset, length);

View File

@ -31,21 +31,8 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
public class CompactSelection { public class CompactSelection {
private static final long serialVersionUID = 1L; private static final long serialVersionUID = 1L;
static final Log LOG = LogFactory.getLog(CompactSelection.class); static final Log LOG = LogFactory.getLog(CompactSelection.class);
// the actual list - this is needed to handle methods like "sublist" // the actual list - this is needed to handle methods like "sublist" correctly
// correctly
List<StoreFile> filesToCompact = new ArrayList<StoreFile>(); List<StoreFile> filesToCompact = new ArrayList<StoreFile>();
/**
* Number of off peak compactions either in the compaction queue or
* happening now. Please lock compactionCountLock before modifying.
*/
static long numOutstandingOffPeakCompactions = 0;
/**
* Lock object for numOutstandingOffPeakCompactions
*/
private final static Object compactionCountLock = new Object();
// was this compaction promoted to an off-peak // was this compaction promoted to an off-peak
boolean isOffPeakCompaction = false; boolean isOffPeakCompaction = false;
// CompactSelection object creation time. // CompactSelection object creation time.
@ -57,23 +44,6 @@ public class CompactSelection {
this.isOffPeakCompaction = false; this.isOffPeakCompaction = false;
} }
/**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
public void finishRequest() {
if (isOffPeakCompaction) {
long newValueToLog = -1;
synchronized(compactionCountLock) {
assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
newValueToLog = --numOutstandingOffPeakCompactions;
isOffPeakCompaction = false;
}
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
newValueToLog);
}
}
public List<StoreFile> getFilesToCompact() { public List<StoreFile> getFilesToCompact() {
return filesToCompact; return filesToCompact;
} }
@ -84,42 +54,14 @@ public class CompactSelection {
*/ */
public void emptyFileList() { public void emptyFileList() {
filesToCompact.clear(); filesToCompact.clear();
if (isOffPeakCompaction) {
long newValueToLog = -1;
synchronized(compactionCountLock) {
// reset the off peak count
newValueToLog = --numOutstandingOffPeakCompactions;
isOffPeakCompaction = false;
}
LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
newValueToLog);
}
} }
public boolean isOffPeakCompaction() { public boolean isOffPeakCompaction() {
return this.isOffPeakCompaction; return this.isOffPeakCompaction;
} }
public static long getNumOutStandingOffPeakCompactions() { public void setOffPeak(boolean value) {
synchronized(compactionCountLock) { this.isOffPeakCompaction = value;
return numOutstandingOffPeakCompactions;
}
}
/**
* Tries making the compaction off-peak.
* Only checks internal compaction constraints, not timing.
* @return Eventual value of isOffPeakCompaction.
*/
public boolean trySetOffpeak() {
assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
synchronized(compactionCountLock) {
if (numOutstandingOffPeakCompactions == 0) {
numOutstandingOffPeakCompactions++;
isOffPeakCompaction = true;
}
}
return isOffPeakCompaction;
} }
public long getSelectionTime() { public long getSelectionTime() {

View File

@ -58,8 +58,6 @@ public class CompactionConfiguration {
int maxFilesToCompact; int maxFilesToCompact;
double compactionRatio; double compactionRatio;
double offPeekCompactionRatio; double offPeekCompactionRatio;
int offPeakStartHour;
int offPeakEndHour;
long throttlePoint; long throttlePoint;
boolean shouldDeleteExpired; boolean shouldDeleteExpired;
long majorCompactionPeriod; long majorCompactionPeriod;
@ -78,17 +76,6 @@ public class CompactionConfiguration {
maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10); maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F); compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F);
offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F); offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F);
offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle", throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize()); 2 * maxFilesToCompact * storeConfigInfo.getMemstoreFlushSize());
@ -104,16 +91,14 @@ public class CompactionConfiguration {
@Override @Override
public String toString() { public String toString() {
return String.format( return String.format(
"size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; off-peak hours %d-%d; " "size [%d, %d); files [%d, %d); ratio %f; off-peak ratio %f; throttle point %d;"
+ "throttle point %d;%s delete expired; major period %d, major jitter %f", + "%s delete expired; major period %d, major jitter %f",
minCompactSize, minCompactSize,
maxCompactSize, maxCompactSize,
minFilesToCompact, minFilesToCompact,
maxFilesToCompact, maxFilesToCompact,
compactionRatio, compactionRatio,
offPeekCompactionRatio, offPeekCompactionRatio,
offPeakStartHour,
offPeakEndHour,
throttlePoint, throttlePoint,
shouldDeleteExpired ? "" : " don't", shouldDeleteExpired ? "" : " don't",
majorCompactionPeriod, majorCompactionPeriod,
@ -169,20 +154,6 @@ public class CompactionConfiguration {
return offPeekCompactionRatio; return offPeekCompactionRatio;
} }
/**
* @return Hour at which off-peak compactions start
*/
int getOffPeakStartHour() {
return offPeakStartHour;
}
/**
* @return Hour at which off-peak compactions end
*/
int getOffPeakEndHour() {
return offPeakEndHour;
}
/** /**
* @return ThrottlePoint used for classifying small and large compactions * @return ThrottlePoint used for classifying small and large compactions
*/ */
@ -212,8 +183,4 @@ public class CompactionConfiguration {
boolean shouldDeleteExpired() { boolean shouldDeleteExpired() {
return shouldDeleteExpired; return shouldDeleteExpired;
} }
private static boolean isValidHour(int hour) {
return (hour >= 0 && hour <= 23);
}
} }

View File

@ -70,7 +70,7 @@ public abstract class CompactionPolicy extends Configured {
*/ */
public abstract CompactSelection selectCompaction( public abstract CompactSelection selectCompaction(
final List<StoreFile> candidateFiles, final boolean isUserCompaction, final List<StoreFile> candidateFiles, final boolean isUserCompaction,
final boolean forceMajor) throws IOException; final boolean mayUseOffPeak, final boolean forceMajor) throws IOException;
/** /**
* @param storeFiles Store files in the store. * @param storeFiles Store files in the store.

View File

@ -50,30 +50,22 @@ import com.google.common.collect.Collections2;
public class CompactionRequest implements Comparable<CompactionRequest>, public class CompactionRequest implements Comparable<CompactionRequest>,
Runnable { Runnable {
static final Log LOG = LogFactory.getLog(CompactionRequest.class); static final Log LOG = LogFactory.getLog(CompactionRequest.class);
private final HRegion r; private final HRegion region;
private final HStore s; private final HStore store;
private final CompactSelection compactSelection; private final CompactSelection compactSelection;
private final long totalSize; private final long totalSize;
private final boolean isMajor; private final boolean isMajor;
private int p; private int priority;
private final Long timeInNanos; private final Long timeInNanos;
private HRegionServer server = null; private HRegionServer server = null;
/** public CompactionRequest(HRegion region, HStore store,
* Map to track the number of compactions requested per region (id) CompactSelection files, boolean isMajor, int priority) {
*/ Preconditions.checkNotNull(region);
private static final ConcurrentHashMap<Long, AtomicInteger>
majorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
private static final ConcurrentHashMap<Long, AtomicInteger>
minorCompactions = new ConcurrentHashMap<Long, AtomicInteger>();
public CompactionRequest(HRegion r, HStore s,
CompactSelection files, boolean isMajor, int p) {
Preconditions.checkNotNull(r);
Preconditions.checkNotNull(files); Preconditions.checkNotNull(files);
this.r = r; this.region = region;
this.s = s; this.store = store;
this.compactSelection = files; this.compactSelection = files;
long sz = 0; long sz = 0;
for (StoreFile sf : files.getFilesToCompact()) { for (StoreFile sf : files.getFilesToCompact()) {
@ -81,66 +73,10 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
} }
this.totalSize = sz; this.totalSize = sz;
this.isMajor = isMajor; this.isMajor = isMajor;
this.p = p; this.priority = priority;
this.timeInNanos = System.nanoTime(); this.timeInNanos = System.nanoTime();
} }
/**
* Find out if a given region is in compaction now.
*
* @param regionId
* @return a CompactionState
*/
public static CompactionState getCompactionState(
final long regionId) {
Long key = Long.valueOf(regionId);
AtomicInteger major = majorCompactions.get(key);
AtomicInteger minor = minorCompactions.get(key);
int state = 0;
if (minor != null && minor.get() > 0) {
state += 1; // use 1 to indicate minor here
}
if (major != null && major.get() > 0) {
state += 2; // use 2 to indicate major here
}
switch (state) {
case 3: // 3 = 2 + 1, so both major and minor
return CompactionState.MAJOR_AND_MINOR;
case 2:
return CompactionState.MAJOR;
case 1:
return CompactionState.MINOR;
default:
return CompactionState.NONE;
}
}
public static void preRequest(final CompactionRequest cr){
Long key = Long.valueOf(cr.getHRegion().getRegionId());
ConcurrentHashMap<Long, AtomicInteger> compactions =
cr.isMajor() ? majorCompactions : minorCompactions;
AtomicInteger count = compactions.get(key);
if (count == null) {
compactions.putIfAbsent(key, new AtomicInteger(0));
count = compactions.get(key);
}
count.incrementAndGet();
}
public static void postRequest(final CompactionRequest cr){
Long key = Long.valueOf(cr.getHRegion().getRegionId());
ConcurrentHashMap<Long, AtomicInteger> compactions =
cr.isMajor() ? majorCompactions : minorCompactions;
AtomicInteger count = compactions.get(key);
if (count != null) {
count.decrementAndGet();
}
}
public void finishRequest() {
this.compactSelection.finishRequest();
}
/** /**
* This function will define where in the priority queue the request will * This function will define where in the priority queue the request will
* end up. Those with the highest priorities will be first. When the * end up. Those with the highest priorities will be first. When the
@ -160,7 +96,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
} }
int compareVal; int compareVal;
compareVal = p - request.p; //compare priority compareVal = priority - request.priority; //compare priority
if (compareVal != 0) { if (compareVal != 0) {
return compareVal; return compareVal;
} }
@ -181,12 +117,12 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
/** Gets the HRegion for the request */ /** Gets the HRegion for the request */
public HRegion getHRegion() { public HRegion getHRegion() {
return r; return region;
} }
/** Gets the Store for the request */ /** Gets the Store for the request */
public HStore getStore() { public HStore getStore() {
return s; return store;
} }
/** Gets the compact selection object for the request */ /** Gets the compact selection object for the request */
@ -210,7 +146,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
/** Gets the priority for the request */ /** Gets the priority for the request */
public int getPriority() { public int getPriority() {
return p; return priority;
} }
public long getSelectionTime() { public long getSelectionTime() {
@ -219,7 +155,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
/** Gets the priority for the request */ /** Gets the priority for the request */
public void setPriority(int p) { public void setPriority(int p) {
this.p = p; this.priority = p;
} }
public void setServer(HRegionServer hrs) { public void setServer(HRegionServer hrs) {
@ -241,12 +177,12 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
} }
})); }));
return "regionName=" + r.getRegionNameAsString() + return "regionName=" + region.getRegionNameAsString() +
", storeName=" + new String(s.getFamily().getName()) + ", storeName=" + new String(store.getFamily().getName()) +
", fileCount=" + compactSelection.getFilesToCompact().size() + ", fileCount=" + compactSelection.getFilesToCompact().size() +
", fileSize=" + StringUtils.humanReadableInt(totalSize) + ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
((fsList.isEmpty()) ? "" : " (" + fsList + ")") + ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
", priority=" + p + ", time=" + timeInNanos; ", priority=" + priority + ", time=" + timeInNanos;
} }
@Override @Override
@ -257,18 +193,18 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
} }
try { try {
long start = EnvironmentEdgeManager.currentTimeMillis(); long start = EnvironmentEdgeManager.currentTimeMillis();
boolean completed = r.compact(this); boolean completed = region.compact(this);
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info(((completed) ? "completed" : "aborted") + " compaction: " + LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
this + "; duration=" + StringUtils.formatTimeDiff(now, start)); this + "; duration=" + StringUtils.formatTimeDiff(now, start));
if (completed) { if (completed) {
// degenerate case: blocked regions require recursive enqueues // degenerate case: blocked regions require recursive enqueues
if (s.getCompactPriority() <= 0) { if (store.getCompactPriority() <= 0) {
server.compactSplitThread server.compactSplitThread
.requestCompaction(r, s, "Recursive enqueue"); .requestCompaction(region, store, "Recursive enqueue");
} else { } else {
// see if the compaction has caused us to exceed max region size // see if the compaction has caused us to exceed max region size
server.compactSplitThread.requestSplit(r); server.compactSplitThread.requestSplit(region);
} }
} }
} catch (IOException ex) { } catch (IOException ex) {
@ -279,7 +215,7 @@ public class CompactionRequest implements Comparable<CompactionRequest>,
LOG.error("Compaction failed " + this, ex); LOG.error("Compaction failed " + this, ex);
server.checkFileSystem(); server.checkFileSystem();
} finally { } finally {
s.finishRequest(this); store.finishRequest(this);
LOG.debug("CompactSplitThread Status: " + server.compactSplitThread); LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
} }
} }

View File

@ -48,7 +48,6 @@ import com.google.common.collect.Collections2;
public class DefaultCompactionPolicy extends CompactionPolicy { public class DefaultCompactionPolicy extends CompactionPolicy {
private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class); private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
private final static Calendar calendar = new GregorianCalendar();
public DefaultCompactionPolicy() { public DefaultCompactionPolicy() {
compactor = new DefaultCompactor(this); compactor = new DefaultCompactor(this);
@ -80,7 +79,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
* @throws java.io.IOException * @throws java.io.IOException
*/ */
public CompactSelection selectCompaction(List<StoreFile> candidateFiles, public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
boolean isUserCompaction, boolean forceMajor) final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor)
throws IOException { throws IOException {
// Preliminary compaction subject to filters // Preliminary compaction subject to filters
CompactSelection candidateSelection = new CompactSelection(candidateFiles); CompactSelection candidateSelection = new CompactSelection(candidateFiles);
@ -110,6 +109,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
if (!majorCompaction) { if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable // we're doing a minor compaction, let's see what files are applicable
candidateSelection.setOffPeak(mayUseOffPeak);
candidateSelection = filterBulk(candidateSelection); candidateSelection = filterBulk(candidateSelection);
candidateSelection = applyCompactionPolicy(candidateSelection); candidateSelection = applyCompactionPolicy(candidateSelection);
candidateSelection = checkMinFilesCriteria(candidateSelection); candidateSelection = checkMinFilesCriteria(candidateSelection);
@ -232,6 +232,7 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
" files ready for compaction. Need " + minFiles + " to initiate."); " files ready for compaction. Need " + minFiles + " to initiate.");
} }
candidates.emptyFileList(); candidates.emptyFileList();
candidates.setOffPeak(false);
} }
return candidates; return candidates;
} }
@ -274,11 +275,9 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
// we're doing a minor compaction, let's see what files are applicable // we're doing a minor compaction, let's see what files are applicable
int start = 0; int start = 0;
double ratio = comConf.getCompactionRatio(); double ratio = comConf.getCompactionRatio();
if (isOffPeakHour() && candidates.trySetOffpeak()) { if (candidates.isOffPeakCompaction()) {
ratio = comConf.getCompactionRatioOffPeak(); ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
+ ", numOutstandingOffPeakCompactions is now "
+ CompactSelection.getNumOutStandingOffPeakCompactions());
} }
// get store file sizes for incremental compacting selection. // get store file sizes for incremental compacting selection.
@ -394,21 +393,4 @@ public class DefaultCompactionPolicy extends CompactionPolicy {
int numCandidates = storeFiles.size() - filesCompacting.size(); int numCandidates = storeFiles.size() - filesCompacting.size();
return numCandidates > comConf.getMinFilesToCompact(); return numCandidates > comConf.getMinFilesToCompact();
} }
/**
* @return whether this is off-peak hour
*/
private boolean isOffPeakHour() {
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
int startHour = comConf.getOffPeakStartHour();
int endHour = comConf.getOffPeakEndHour();
// If offpeak time checking is disabled just return false.
if (startHour == endHour) {
return false;
}
if (startHour < endHour) {
return (currentHour >= startHour && currentHour < endHour);
}
return (currentHour >= startHour || currentHour < endHour);
}
} }

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression; import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl; import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl;
@ -71,7 +72,7 @@ class DefaultCompactor extends Compactor {
// Calculate maximum key count after compaction (for blooms) // Calculate maximum key count after compaction (for blooms)
// Also calculate earliest put timestamp if major compaction // Also calculate earliest put timestamp if major compaction
int maxKeyCount = 0; int maxKeyCount = 0;
HStore store = policy.store; Store store = policy.store;
long earliestPutTs = HConstants.LATEST_TIMESTAMP; long earliestPutTs = HConstants.LATEST_TIMESTAMP;
for (StoreFile file: filesToCompact) { for (StoreFile file: filesToCompact) {
StoreFile.Reader r = file.getReader(); StoreFile.Reader r = file.getReader();
@ -127,14 +128,13 @@ class DefaultCompactor extends Compactor {
StoreFile.Writer writer = null; StoreFile.Writer writer = null;
List<Path> newFiles = new ArrayList<Path>(); List<Path> newFiles = new ArrayList<Path>();
// Find the smallest read point across all the Scanners. // Find the smallest read point across all the Scanners.
long smallestReadPoint = store.getHRegion().getSmallestReadPoint(); long smallestReadPoint = store.getSmallestReadPoint();
MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint); MultiVersionConsistencyControl.setThreadReadPoint(smallestReadPoint);
try { try {
InternalScanner scanner = null; InternalScanner scanner = null;
try { try {
if (store.getHRegion().getCoprocessorHost() != null) { if (store.getCoprocessorHost() != null) {
scanner = store scanner = store
.getHRegion()
.getCoprocessorHost() .getCoprocessorHost()
.preCompactScannerOpen(store, scanners, .preCompactScannerOpen(store, scanners,
majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs); majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
@ -147,9 +147,9 @@ class DefaultCompactor extends Compactor {
scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners, scanner = new StoreScanner(store, store.getScanInfo(), scan, scanners,
scanType, smallestReadPoint, earliestPutTs); scanType, smallestReadPoint, earliestPutTs);
} }
if (store.getHRegion().getCoprocessorHost() != null) { if (store.getCoprocessorHost() != null) {
InternalScanner cpScanner = InternalScanner cpScanner =
store.getHRegion().getCoprocessorHost().preCompact(store, scanner, scanType); store.getCoprocessorHost().preCompact(store, scanner, scanType);
// NULL scanner returned from coprocessor hooks means skip normal processing // NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) { if (cpScanner == null) {
return newFiles; // an empty list return newFiles; // an empty list
@ -209,13 +209,14 @@ class DefaultCompactor extends Compactor {
return newFiles; return newFiles;
} }
void isInterrupted(final HStore store, final StoreFile.Writer writer) void isInterrupted(final Store store, final StoreFile.Writer writer)
throws IOException { throws IOException {
if (store.getHRegion().areWritesEnabled()) return; if (store.areWritesEnabled()) return;
// Else cleanup. // Else cleanup.
writer.close(); writer.close();
store.getFileSystem().delete(writer.getPath(), false); store.getFileSystem().delete(writer.getPath(), false);
throw new InterruptedIOException( "Aborting compaction of store " + store + throw new InterruptedIOException( "Aborting compaction of store " + store +
" in region " + store.getHRegion() + " because it was interrupted."); " in region " + store.getRegionInfo().getRegionNameAsString() +
" because it was interrupted.");
} }
} }

View File

@ -0,0 +1,111 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.Calendar;
import java.util.GregorianCalendar;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
/**
* The class used to track off-peak hours and compactions. Off-peak compaction counter
* is global for the entire server, hours can be different per instance of this class,
* based on the configuration of the corresponding store.
*/
@InterfaceAudience.Private
public class OffPeakCompactions {
private static final Log LOG = LogFactory.getLog(OffPeakCompactions.class);
private final static Calendar calendar = new GregorianCalendar();
private int offPeakStartHour;
private int offPeakEndHour;
// TODO: replace with AtomicLong, see HBASE-7437.
/**
* Number of off peak compactions either in the compaction queue or
* happening now. Please lock compactionCountLock before modifying.
*/
private static long numOutstanding = 0;
/**
* Lock object for numOutstandingOffPeakCompactions
*/
private static final Object compactionCountLock = new Object();
public OffPeakCompactions(Configuration conf) {
offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
if (!isValidHour(offPeakStartHour) || !isValidHour(offPeakEndHour)) {
if (!(offPeakStartHour == -1 && offPeakEndHour == -1)) {
LOG.warn("Ignoring invalid start/end hour for peak hour : start = " +
this.offPeakStartHour + " end = " + this.offPeakEndHour +
". Valid numbers are [0-23]");
}
this.offPeakStartHour = this.offPeakEndHour = -1;
}
}
/**
* Tries making the compaction off-peak.
* @return Whether the compaction can be made off-peak.
*/
public boolean tryStartOffPeakRequest() {
if (!isOffPeakHour()) return false;
synchronized(compactionCountLock) {
if (numOutstanding == 0) {
numOutstanding++;
return true;
}
}
return false;
}
/**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
public void endOffPeakRequest() {
long newValueToLog = -1;
synchronized(compactionCountLock) {
newValueToLog = --numOutstanding;
}
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " + newValueToLog);
}
/**
* @return whether this is off-peak hour
*/
private boolean isOffPeakHour() {
int currentHour = calendar.get(Calendar.HOUR_OF_DAY);
// If offpeak time checking is disabled just return false.
if (this.offPeakStartHour == this.offPeakEndHour) {
return false;
}
if (this.offPeakStartHour < this.offPeakEndHour) {
return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
}
return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
}
private static boolean isValidHour(int hour) {
return (hour >= 0 && hour <= 23);
}
}

View File

@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescriptio
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -816,7 +816,7 @@ public class AccessController extends BaseRegionObserver
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
final HStore store, final InternalScanner scanner, final ScanType scanType) final Store store, final InternalScanner scanner, final ScanType scanType)
throws IOException { throws IOException {
requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN); requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
return scanner; return scanner;
@ -824,7 +824,7 @@ public class AccessController extends BaseRegionObserver
@Override @Override
public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> e, public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> e,
final HStore store, final List<StoreFile> candidates) throws IOException { final Store store, final List<StoreFile> candidates) throws IOException {
requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN); requirePermission("compact", getTableName(e.getEnvironment()), null, null, Action.ADMIN);
} }

View File

@ -759,10 +759,11 @@ public class ZKAssign {
// Verify it is in expected state // Verify it is in expected state
EventType et = rt.getEventType(); EventType et = rt.getEventType();
if (!et.equals(beginState)) { if (!et.equals(beginState)) {
LOG.warn(zkw.prefix("Attempt to transition the " + String existingServer = (rt.getServerName() == null)
"unassigned node for " + encoded + ? "<unknown>" : rt.getServerName().toString();
" from " + beginState + " to " + endState + " failed, " + LOG.warn(zkw.prefix("Attempt to transition the unassigned node for " + encoded
"the node existed but was in the state " + et + " set by the server " + serverName)); + " from " + beginState + " to " + endState + " failed, the node existed but"
+ " was in the state " + et + " set by the server " + existingServer));
return -1; return -1;
} }

View File

@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.Leases; import org.apache.hadoop.hbase.regionserver.Leases;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -137,20 +137,22 @@ public class SimpleRegionObserver extends BaseRegionObserver {
} }
@Override @Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, HStore store, InternalScanner scanner) { public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, InternalScanner scanner) {
hadPreFlush = true; hadPreFlush = true;
return scanner; return scanner;
} }
@Override @Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
HStore store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
hadPreFlushScannerOpen = true; hadPreFlushScannerOpen = true;
return null; return null;
} }
@Override @Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, HStore store, StoreFile resultFile) { public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile) {
hadPostFlush = true; hadPostFlush = true;
} }
@ -174,26 +176,27 @@ public class SimpleRegionObserver extends BaseRegionObserver {
@Override @Override
public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, public void preCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c,
HStore store, List<StoreFile> candidates) { Store store, List<StoreFile> candidates) {
hadPreCompactSelect = true; hadPreCompactSelect = true;
} }
@Override @Override
public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c, public void postCompactSelection(ObserverContext<RegionCoprocessorEnvironment> c,
HStore store, ImmutableList<StoreFile> selected) { Store store, ImmutableList<StoreFile> selected) {
hadPostCompactSelect = true; hadPostCompactSelect = true;
} }
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
HStore store, InternalScanner scanner, ScanType scanType) { Store store, InternalScanner scanner, ScanType scanType) {
hadPreCompact = true; hadPreCompact = true;
return scanner; return scanner;
} }
@Override @Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(
HStore store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s) throws IOException { InternalScanner s) throws IOException {
hadPreCompactScanner = true; hadPreCompactScanner = true;
return null; return null;
@ -201,7 +204,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
HStore store, StoreFile resultFile) { Store store, StoreFile resultFile) {
hadPostCompact = true; hadPostCompact = true;
} }
@ -219,7 +222,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
@Override @Override
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final HStore store, final Scan scan, final NavigableSet<byte[]> targetCols, final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
final KeyValueScanner s) throws IOException { final KeyValueScanner s) throws IOException {
hadPreStoreScannerOpen = true; hadPreStoreScannerOpen = true;
return null; return null;
@ -450,7 +453,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
@Override @Override
public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx, public boolean postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException { List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
RegionCoprocessorEnvironment e = ctx.getEnvironment(); RegionCoprocessorEnvironment e = ctx.getEnvironment();
assertNotNull(e); assertNotNull(e);
assertNotNull(e.getRegion()); assertNotNull(e.getRegion());

View File

@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
@ -187,13 +187,13 @@ public class TestCoprocessorInterface extends HBaseTestCase {
} }
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
HStore store, InternalScanner scanner, ScanType scanType) { Store store, InternalScanner scanner, ScanType scanType) {
preCompactCalled = true; preCompactCalled = true;
return scanner; return scanner;
} }
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
HStore store, StoreFile resultFile) { Store store, StoreFile resultFile) {
postCompactCalled = true; postCompactCalled = true;
} }
@Override @Override

View File

@ -60,7 +60,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -316,7 +316,7 @@ public class TestRegionObserverInterface {
@Override @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e, public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
HStore store, final InternalScanner scanner, final ScanType scanType) { Store store, final InternalScanner scanner, final ScanType scanType) {
return new InternalScanner() { return new InternalScanner() {
@Override @Override
public boolean next(List<KeyValue> results) throws IOException { public boolean next(List<KeyValue> results) throws IOException {
@ -368,7 +368,7 @@ public class TestRegionObserverInterface {
@Override @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
HStore store, StoreFile resultFile) { Store store, StoreFile resultFile) {
lastCompaction = EnvironmentEdgeManager.currentTimeMillis(); lastCompaction = EnvironmentEdgeManager.currentTimeMillis();
} }

View File

@ -25,15 +25,14 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver {
*/ */
@Override @Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
HStore store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
HStore.ScanInfo oldSI = store.getScanInfo(); ScanInfo oldSI = store.getScanInfo();
HStore.ScanInfo scanInfo = new HStore.ScanInfo(store.getFamily(), oldSI.getTtl(), ScanInfo scanInfo = new ScanInfo(store.getFamily(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions()); scan.setMaxVersions(oldSI.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(), ScanType.MINOR_COMPACT, store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
HConstants.OLDEST_TIMESTAMP);
} }
/** /**
@ -41,21 +40,21 @@ public class NoOpScanPolicyObserver extends BaseRegionObserver {
*/ */
@Override @Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
HStore store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs, Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s) throws IOException { InternalScanner s) throws IOException {
// this demonstrates how to override the scanners default behavior // this demonstrates how to override the scanners default behavior
HStore.ScanInfo oldSI = store.getScanInfo(); ScanInfo oldSI = store.getScanInfo();
HStore.ScanInfo scanInfo = new HStore.ScanInfo(store.getFamily(), oldSI.getTtl(), ScanInfo scanInfo = new ScanInfo(store.getFamily(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions()); scan.setMaxVersions(oldSI.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion() return new StoreScanner(store, scanInfo, scan, scanners, scanType,
.getSmallestReadPoint(), earliestPutTs); store.getSmallestReadPoint(), earliestPutTs);
} }
@Override @Override
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
HStore store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s) Store store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s)
throws IOException { throws IOException {
return new StoreScanner(store, store.getScanInfo(), scan, targetCols); return new StoreScanner(store, store.getScanInfo(), scan, targetCols);
} }

View File

@ -281,8 +281,8 @@ public class TestCompaction extends HBaseTestCase {
final int ttl = 1000; final int ttl = 1000;
for (Store hstore : this.r.stores.values()) { for (Store hstore : this.r.stores.values()) {
HStore store = ((HStore) hstore); HStore store = ((HStore) hstore);
HStore.ScanInfo old = store.getScanInfo(); ScanInfo old = store.getScanInfo();
HStore.ScanInfo si = new HStore.ScanInfo(old.getFamily(), ScanInfo si = new ScanInfo(old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl, old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator()); old.getKeepDeletedCells(), 0, old.getComparator());
store.setScanInfo(si); store.setScanInfo(si);
@ -540,8 +540,8 @@ public class TestCompaction extends HBaseTestCase {
final int ttl = 1000; final int ttl = 1000;
for (Store hstore: this.r.stores.values()) { for (Store hstore: this.r.stores.values()) {
HStore store = (HStore)hstore; HStore store = (HStore)hstore;
HStore.ScanInfo old = store.getScanInfo(); ScanInfo old = store.getScanInfo();
HStore.ScanInfo si = new HStore.ScanInfo(old.getFamily(), ScanInfo si = new ScanInfo(old.getFamily(),
old.getMinVersions(), old.getMaxVersions(), ttl, old.getMinVersions(), old.getMaxVersions(), ttl,
old.getKeepDeletedCells(), 0, old.getComparator()); old.getKeepDeletedCells(), 0, old.getComparator());
store.setScanInfo(si); store.setScanInfo(si);

View File

@ -163,7 +163,7 @@ public class TestCompactionState {
// otherwise, the compaction should have already been done // otherwise, the compaction should have already been done
if (expectedState != state) { if (expectedState != state) {
for (HRegion region: regions) { for (HRegion region: regions) {
state = CompactionRequest.getCompactionState(region.getRegionId()); state = region.getCompactionState();
assertEquals(CompactionState.NONE, state); assertEquals(CompactionState.NONE, state);
} }
} else { } else {

View File

@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder; import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.wal.HLog; import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory; import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -220,16 +221,25 @@ public class TestDefaultCompactSelection extends TestCase {
void compactEquals(List<StoreFile> candidates, long... expected) void compactEquals(List<StoreFile> candidates, long... expected)
throws IOException { throws IOException {
compactEquals(candidates, false, expected); compactEquals(candidates, false, false, expected);
} }
void compactEquals(List<StoreFile> candidates, boolean forcemajor, void compactEquals(List<StoreFile> candidates, boolean forcemajor, long... expected)
throws IOException {
compactEquals(candidates, forcemajor, false, expected);
}
void compactEquals(List<StoreFile> candidates, boolean forcemajor, boolean isOffPeak,
long ... expected) long ... expected)
throws IOException { throws IOException {
store.forceMajor = forcemajor; store.forceMajor = forcemajor;
//Test Default compactions //Test Default compactions
List<StoreFile> actual = store.compactionPolicy CompactSelection result = store.compactionPolicy
.selectCompaction(candidates, false, forcemajor).getFilesToCompact(); .selectCompaction(candidates, false, isOffPeak, forcemajor);
List<StoreFile> actual = result.getFilesToCompact();
if (isOffPeak && !forcemajor) {
assertTrue(result.isOffPeakCompaction());
}
assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual))); assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
store.forceMajor = false; store.forceMajor = false;
} }
@ -309,36 +319,11 @@ public class TestDefaultCompactSelection extends TestCase {
* current compaction algorithm. Developed to ensure that refactoring * current compaction algorithm. Developed to ensure that refactoring
* doesn't implicitly alter this. * doesn't implicitly alter this.
*/ */
//long tooBig = maxSize + 1;
Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
LOG.debug("Hour of day = " + hourOfDay);
int hourPlusOne = ((hourOfDay+1)%24);
int hourMinusOne = ((hourOfDay-1+24)%24);
int hourMinusTwo = ((hourOfDay-2+24)%24);
// check compact selection without peak hour setting
LOG.debug("Testing compact selection without off-peak settings...");
compactEquals(sfCreate(999,50,12,12,1), 12, 12, 1);
// set an off-peak compaction threshold // set an off-peak compaction threshold
this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F); this.conf.setFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
// set peak hour to current time and check compact selection
this.conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
this.conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")");
store.compactionPolicy.updateConfiguration(); store.compactionPolicy.updateConfiguration();
compactEquals(sfCreate(999, 50, 12, 12, 1), 50, 12, 12, 1); // Test with and without the flag.
compactEquals(sfCreate(999, 50, 12, 12, 1), false, true, 50, 12, 12, 1);
// set peak hour outside current selection and check compact selection compactEquals(sfCreate(999, 50, 12, 12, 1), 12, 12, 1);
this.conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
this.conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
store.compactionPolicy.updateConfiguration();
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusTwo + ", " + hourMinusOne + ")");
compactEquals(sfCreate(999,50,12,12, 1), 12, 12, 1);
} }
} }

View File

@ -34,7 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*; import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HStore.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;

View File

@ -0,0 +1,70 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.Calendar;
import java.util.GregorianCalendar;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakCompactions;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestOffPeakCompactions {
private final static Log LOG = LogFactory.getLog(TestDefaultCompactSelection.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
@Test
public void testOffPeakHours() throws IOException {
Calendar calendar = new GregorianCalendar();
int hourOfDay = calendar.get(Calendar.HOUR_OF_DAY);
LOG.debug("Hour of day = " + hourOfDay);
int hourPlusOne = ((hourOfDay+1)%24);
int hourMinusOne = ((hourOfDay-1+24)%24);
int hourMinusTwo = ((hourOfDay-2+24)%24);
Configuration conf = TEST_UTIL.getConfiguration();
OffPeakCompactions opc = new OffPeakCompactions(conf);
LOG.debug("Testing without off-peak settings...");
assertFalse(opc.tryStartOffPeakRequest());
// set peak hour to current time and check compact selection
conf.setLong("hbase.offpeak.start.hour", hourMinusOne);
conf.setLong("hbase.offpeak.end.hour", hourPlusOne);
opc = new OffPeakCompactions(conf);
LOG.debug("Testing compact selection with off-peak settings (" +
hourMinusOne + ", " + hourPlusOne + ")");
assertTrue(opc.tryStartOffPeakRequest());
opc.endOffPeakRequest();
// set peak hour outside current selection and check compact selection
conf.setLong("hbase.offpeak.start.hour", hourMinusTwo);
conf.setLong("hbase.offpeak.end.hour", hourMinusOne);
opc = new OffPeakCompactions(conf);
assertFalse(opc.tryStartOffPeakRequest());
}
}

View File

@ -98,7 +98,7 @@ public class TestQueryMatcher extends HBaseTestCase {
// 2,4,5 // 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new HStore.ScanInfo(fam2, ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2), 0, 1, ttl, false, 0, rowComparator), get.getFamilyMap().get(fam2),
EnvironmentEdgeManager.currentTimeMillis() - ttl); EnvironmentEdgeManager.currentTimeMillis() - ttl);
@ -144,7 +144,7 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.INCLUDE); expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE); expected.add(ScanQueryMatcher.MatchCode.DONE);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new HStore.ScanInfo(fam2, ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, false, 0, rowComparator), null, 0, 1, ttl, false, 0, rowComparator), null,
EnvironmentEdgeManager.currentTimeMillis() - ttl); EnvironmentEdgeManager.currentTimeMillis() - ttl);
@ -198,7 +198,7 @@ public class TestQueryMatcher extends HBaseTestCase {
}; };
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new HStore.ScanInfo(fam2, ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, false, 0, rowComparator), get.getFamilyMap().get(fam2), 0, 1, testTTL, false, 0, rowComparator), get.getFamilyMap().get(fam2),
now - testTTL); now - testTTL);
@ -252,7 +252,7 @@ public class TestQueryMatcher extends HBaseTestCase {
}; };
long now = EnvironmentEdgeManager.currentTimeMillis(); long now = EnvironmentEdgeManager.currentTimeMillis();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new HStore.ScanInfo(fam2, ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, false, 0, rowComparator), null, 0, 1, testTTL, false, 0, rowComparator), null,
now - testTTL); now - testTTL);

View File

@ -324,7 +324,7 @@ public class TestStore extends TestCase {
this.store = new HStore(storedir.getParent().getParent(), this.store = new HStore(storedir.getParent().getParent(),
this.store.getHRegion(), this.store.getHRegion(),
this.store.getFamily(), fs, c); this.store.getFamily(), fs, c);
System.out.println(this.store.getHRegionInfo().getEncodedName()); System.out.println(this.store.getRegionInfo().getEncodedName());
assertEquals(2, this.store.getStorefilesCount()); assertEquals(2, this.store.getStorefilesCount());
result = HBaseTestingUtility.getFromStoreFile(store, result = HBaseTestingUtility.getFromStoreFile(store,

View File

@ -36,7 +36,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil; import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.MediumTests; import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HStore.ScanInfo; import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
@ -547,7 +547,7 @@ public class TestStoreScanner extends TestCase {
List<KeyValueScanner> scanners = scanFixture(kvs); List<KeyValueScanner> scanners = scanFixture(kvs);
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(2); scan.setMaxVersions(2);
HStore.ScanInfo scanInfo = new HStore.ScanInfo(Bytes.toBytes("cf"), ScanInfo scanInfo = new ScanInfo(Bytes.toBytes("cf"),
0 /* minVersions */, 0 /* minVersions */,
2 /* maxVersions */, 500 /* ttl */, 2 /* maxVersions */, 500 /* ttl */,
false /* keepDeletedCells */, false /* keepDeletedCells */,

View File

@ -170,7 +170,7 @@ public class PerfTestCompactionPolicies {
private List<StoreFile> runIteration(List<StoreFile> startingStoreFiles) throws IOException { private List<StoreFile> runIteration(List<StoreFile> startingStoreFiles) throws IOException {
List<StoreFile> storeFiles = new ArrayList<StoreFile>(startingStoreFiles); List<StoreFile> storeFiles = new ArrayList<StoreFile>(startingStoreFiles);
CompactSelection sel = cp.selectCompaction(storeFiles, false, false); CompactSelection sel = cp.selectCompaction(storeFiles, false, false, false);
int newFileSize = 0; int newFileSize = 0;
List<StoreFile> filesToCompact = sel.getFilesToCompact(); List<StoreFile> filesToCompact = sel.getFilesToCompact();

View File

@ -47,7 +47,8 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.ScanInfo;
import org.apache.hadoop.hbase.regionserver.StoreScanner; import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit; import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -204,53 +205,55 @@ public class TestCoprocessorScanPolicy {
} }
@Override @Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preFlushScannerOpen(
HStore store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException { final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
Long newTtl = ttls.get(store.getTableName()); Long newTtl = ttls.get(store.getTableName());
if (newTtl != null) { if (newTtl != null) {
System.out.println("PreFlush:" + newTtl); System.out.println("PreFlush:" + newTtl);
} }
Integer newVersions = versions.get(store.getTableName()); Integer newVersions = versions.get(store.getTableName());
HStore.ScanInfo oldSI = store.getScanInfo(); ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily(); HColumnDescriptor family = store.getFamily();
HStore.ScanInfo scanInfo = new HStore.ScanInfo(family.getName(), family.getMinVersions(), ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions, newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner), return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(), ScanType.MINOR_COMPACT, store.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP); HConstants.OLDEST_TIMESTAMP);
} }
@Override @Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preCompactScannerOpen(
HStore store, List<? extends KeyValueScanner> scanners, ScanType scanType, final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s) throws IOException { long earliestPutTs, InternalScanner s) throws IOException {
Long newTtl = ttls.get(store.getTableName()); Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName()); Integer newVersions = versions.get(store.getTableName());
HStore.ScanInfo oldSI = store.getScanInfo(); ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily(); HColumnDescriptor family = store.getFamily();
HStore.ScanInfo scanInfo = new HStore.ScanInfo(family.getName(), family.getMinVersions(), ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions, newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan(); Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions); scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion() return new StoreScanner(store, scanInfo, scan, scanners, scanType,
.getSmallestReadPoint(), earliestPutTs); store.getSmallestReadPoint(), earliestPutTs);
} }
@Override @Override
public KeyValueScanner preStoreScannerOpen( public KeyValueScanner preStoreScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, HStore store, final Scan scan, final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan,
final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException { final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException {
Long newTtl = ttls.get(store.getTableName()); Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName()); Integer newVersions = versions.get(store.getTableName());
HStore.ScanInfo oldSI = store.getScanInfo(); ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily(); HColumnDescriptor family = store.getFamily();
HStore.ScanInfo scanInfo = new HStore.ScanInfo(family.getName(), family.getMinVersions(), ScanInfo scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions, newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(), newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator()); oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());