HBASE-6427 Pluggable compaction and scan policies via coprocessors

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1367361 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
larsh 2012-07-30 23:14:21 +00:00
parent cee7c32732
commit 3fd458ad88
20 changed files with 847 additions and 59 deletions

View File

@ -17,7 +17,7 @@
package org.apache.hadoop.hbase.coprocessor;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import com.google.common.collect.ImmutableList;
@ -37,7 +37,9 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
@ -74,6 +76,13 @@ public abstract class BaseRegionObserver implements RegionObserver {
public void postClose(ObserverContext<RegionCoprocessorEnvironment> e,
boolean abortRequested) { }
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
throws IOException {
return null;
}
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
}
@ -82,6 +91,17 @@ public abstract class BaseRegionObserver implements RegionObserver {
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
}
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
InternalScanner scanner) throws IOException {
return scanner;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e, Store store,
StoreFile resultFile) throws IOException {
}
@Override
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
}
@ -105,6 +125,13 @@ public abstract class BaseRegionObserver implements RegionObserver {
return scanner;
}
@Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s) throws IOException {
return null;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
final StoreFile resultFile) throws IOException {
@ -241,6 +268,13 @@ public abstract class BaseRegionObserver implements RegionObserver {
return s;
}
@Override
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
final KeyValueScanner s) throws IOException {
return null;
}
@Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> e,
final Scan scan, final RegionScanner s) throws IOException {

View File

@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -35,9 +36,12 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -64,20 +68,63 @@ public interface RegionObserver extends Coprocessor {
*/
void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c);
/**
* Called before a memstore is flushed to disk and prior to creating the scanner to read from
* the memstore. To override or modify how a memstore is flushed,
* implementing classes can return a new scanner to provide the KeyValues to be
* stored into the new {@code StoreFile} or null to perform the default processing.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being flushed
* @param memstoreScanner the scanner for the memstore that is flushed
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @return the scanner to use during the flush. {@code null} if the default implementation
* is to be used.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final KeyValueScanner memstoreScanner, final InternalScanner s)
throws IOException;
/**
* Called before the memstore is flushed to disk.
* @param c the environment provided by the region server
* @throws IOException if an error occurred on the coprocessor
* @deprecated use {@link #preFlush(ObserverContext, Store, InternalScanner)} instead
*/
void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException;
/**
* Called before a Store's memstore is flushed to disk.
* @param c the environment provided by the region server
* @param store the store where compaction is being requested
* @param scanner the scanner over existing data used in the store file
* @return the scanner to use during compaction. Should not be {@code null}
* unless the implementation is writing new store files on its own.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
final InternalScanner scanner) throws IOException;
/**
* Called after the memstore is flushed to disk.
* @param c the environment provided by the region server
* @throws IOException if an error occurred on the coprocessor
* @deprecated use {@link #preFlush(ObserverContext, Store, InternalScanner)} instead.
*/
void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException;
/**
* Called after a Store's memstore is flushed to disk.
* @param c the environment provided by the region server
* @param store the store being flushed
* @param resultFile the new store file written out during compaction
* @throws IOException if an error occurred on the coprocessor
*/
void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
final StoreFile resultFile) throws IOException;
/**
* Called prior to selecting the {@link StoreFile}s to compact from the list
* of available candidates. To alter the files used for compaction, you may
@ -127,6 +174,29 @@ public interface RegionObserver extends Coprocessor {
InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final InternalScanner scanner) throws IOException;
/**
* Called prior to writing the {@link StoreFile}s selected for compaction into
* a new {@code StoreFile} and prior to creating the scanner used to read the
* input files. To override or modify the compaction process,
* implementing classes can return a new scanner to provide the KeyValues to be
* stored into the new {@code StoreFile} or null to perform the default processing.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanners the list {@link StoreFileScanner}s to be read from
* @param scantype the {@link ScanType} indicating whether this is a major or minor compaction
* @param earliestPutTs timestamp of the earliest put that was found in any of the involved
* store files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @return the scanner to use during compaction. {@code null} if the default implementation
* is to be used.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s) throws IOException;
/**
* Called after compaction has completed and the new store file has been
* moved in to place.
@ -549,6 +619,30 @@ public interface RegionObserver extends Coprocessor {
final Scan scan, final RegionScanner s)
throws IOException;
/**
* Called before a store opens a new scanner.
* This hook is called when a "user" scanner is opened.
* <p>
* See {@link #preFlushScannerOpen(ObserverContext, Store, KeyValueScanner, InternalScanner)}
* and {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner)}
* to override scanners created for flushes or compactions, resp.
* <p>
* Call CoprocessorEnvironment#complete to skip any subsequent chained
* coprocessors.
* Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being scanned
* @param scan the Scan specification
* @param targetCols columns to be used in the scanner
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
* @return a KeyValueScanner instance to use or {@code null} to use the default implementation
* @throws IOException if an error occurred on the coprocessor
*/
KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
final KeyValueScanner s) throws IOException;
/**
* Called after the client opens a new scanner.
* <p>

View File

@ -32,7 +32,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;
@ -127,12 +126,21 @@ class Compactor extends Configured {
try {
InternalScanner scanner = null;
try {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
/* Include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(store, scan, scanners,
majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
smallestReadPoint, earliestPutTs);
if (store.getHRegion().getCoprocessorHost() != null) {
scanner = store
.getHRegion()
.getCoprocessorHost()
.preCompactScannerOpen(store, scanners,
majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
}
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(store.getFamily().getMaxVersions());
/* Include deletes, unless we are doing a major compaction */
scanner = new StoreScanner(store, store.scanInfo, scan, scanners,
majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
smallestReadPoint, earliestPutTs);
}
if (store.getHRegion().getCoprocessorHost() != null) {
InternalScanner cpScanner =
store.getHRegion().getCoprocessorHost().preCompact(store, scanner);

View File

@ -1216,7 +1216,7 @@ public class HRegion implements HeapSize { // , Writable{
* @param majorCompaction True to force a major compaction regardless of thresholds
* @throws IOException e
*/
void compactStores(final boolean majorCompaction)
public void compactStores(final boolean majorCompaction)
throws IOException {
if (majorCompaction) {
this.triggerMajorCompaction();
@ -3469,7 +3469,7 @@ public class HRegion implements HeapSize { // , Writable{
for (Map.Entry<byte[], NavigableSet<byte[]>> entry :
scan.getFamilyMap().entrySet()) {
Store store = stores.get(entry.getKey());
StoreScanner scanner = store.getScanner(scan, entry.getValue());
KeyValueScanner scanner = store.getScanner(scan, entry.getValue());
scanners.add(scanner);
}
this.storeHeap = new KeyValueHeap(scanners, comparator);

View File

@ -303,6 +303,31 @@ public class RegionCoprocessorHost
}
}
/**
* See
* {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner)}
*/
public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
ScanType scanType, long earliestPutTs) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
InternalScanner s = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners,
scanType, earliestPutTs, s);
} catch (Throwable e) {
handleCoprocessorThrowable(env,e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
return s;
}
/**
* Called prior to selecting the {@link StoreFile}s for compaction from
* the list of currently available candidates.
@ -389,7 +414,7 @@ public class RegionCoprocessorHost
* Called after the store compaction has completed.
* @param store the store being compacted
* @param resultFile the new store file written during compaction
* @throws IOException
* @throws IOException
*/
public void postCompact(Store store, StoreFile resultFile) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@ -408,6 +433,31 @@ public class RegionCoprocessorHost
}
}
/**
* Invoked before a memstore flush
* @throws IOException
*/
public InternalScanner preFlush(Store store, InternalScanner scanner) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
scanner = ((RegionObserver)env.getInstance()).preFlush(
ctx, store, scanner);
} catch (Throwable e) {
handleCoprocessorThrowable(env,e);
}
bypass |= ctx.shouldBypass();
if (ctx.shouldComplete()) {
break;
}
}
}
return bypass ? null : scanner;
}
/**
* Invoked before a memstore flush
* @throws IOException
@ -429,9 +479,32 @@ public class RegionCoprocessorHost
}
}
/**
* See
* {@link RegionObserver#preFlush(ObserverContext, Store, KeyValueScanner)}
*/
public InternalScanner preFlushScannerOpen(Store store, KeyValueScanner memstoreScanner) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
InternalScanner s = null;
for (RegionEnvironment env : coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store, memstoreScanner, s);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
return s;
}
/**
* Invoked after a memstore flush
* @throws IOException
* @throws IOException
*/
public void postFlush() throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@ -450,9 +523,30 @@ public class RegionCoprocessorHost
}
}
/**
* Invoked after a memstore flush
* @throws IOException
*/
public void postFlush(final Store store, final StoreFile storeFile) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
}
/**
* Invoked just before a split
* @throws IOException
* @throws IOException
*/
public void preSplit() throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
@ -1088,6 +1182,31 @@ public class RegionCoprocessorHost
return bypass ? s : null;
}
/**
* See
* {@link RegionObserver#preStoreScannerOpen(ObserverContext, Store, Scan, NavigableSet, KeyValueScanner)}
*/
public KeyValueScanner preStoreScannerOpen(Store store, Scan scan,
final NavigableSet<byte[]> targetCols) throws IOException {
KeyValueScanner s = null;
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan,
targetCols, s);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
if (ctx.shouldComplete()) {
break;
}
}
}
return s;
}
/**
* @param scan the Scan specification
* @param s the scanner

View File

@ -34,8 +34,6 @@ import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
/**
* A query matcher that is specifically designed for the scan case.
*/
@ -138,7 +136,7 @@ public class ScanQueryMatcher {
* based on TTL
*/
public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
NavigableSet<byte[]> columns, StoreScanner.ScanType scanType,
NavigableSet<byte[]> columns, ScanType scanType,
long readPointToUse, long earliestPutTs, long oldestUnexpiredTS) {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator().getRawComparator();
@ -185,7 +183,7 @@ public class ScanQueryMatcher {
*/
ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
NavigableSet<byte[]> columns, long oldestUnexpiredTS) {
this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN,
this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS);
}

View File

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Enum to distinguish general scan types.
*/
@InterfaceAudience.Private
public enum ScanType {
MAJOR_COMPACT,
MINOR_COMPACT,
USER_SCAN
}

View File

@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@ -212,9 +211,7 @@ public class Store extends SchemaConfigured implements HeapSize {
"ms in store " + this);
// Why not just pass a HColumnDescriptor in here altogether? Even if have
// to clone it?
scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
family.getMaxVersions(), ttl, family.getKeepDeletedCells(),
timeToPurgeDeletes, this.comparator);
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
// By default, compact if storefile.count >= minFilesToCompact
@ -728,15 +725,30 @@ public class Store extends SchemaConfigured implements HeapSize {
if (set.size() == 0) {
return null;
}
Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions());
// Use a store scanner to find which rows to flush.
// Note that we need to retain deletes, hence
// treat this as a minor compaction.
InternalScanner scanner = new StoreScanner(this, scan, Collections
.singletonList(new CollectionBackedScanner(set, this.comparator)),
ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
InternalScanner scanner = null;
KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
if (getHRegion().getCoprocessorHost() != null) {
scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
}
if (scanner == null) {
Scan scan = new Scan();
scan.setMaxVersions(scanInfo.getMaxVersions());
scanner = new StoreScanner(this, scanInfo, scan, Collections.singletonList(new CollectionBackedScanner(
set, this.comparator)), ScanType.MINOR_COMPACT, this.region.getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
}
if (getHRegion().getCoprocessorHost() != null) {
InternalScanner cpScanner =
getHRegion().getCoprocessorHost().preFlush(this, scanner);
// NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) {
return null;
}
scanner = cpScanner;
}
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
@ -1941,11 +1953,18 @@ public class Store extends SchemaConfigured implements HeapSize {
* are not in a compaction.
* @throws IOException
*/
public StoreScanner getScanner(Scan scan,
public KeyValueScanner getScanner(Scan scan,
final NavigableSet<byte []> targetCols) throws IOException {
lock.readLock().lock();
try {
return new StoreScanner(this, scan, targetCols);
KeyValueScanner scanner = null;
if (getHRegion().getCoprocessorHost() != null) {
scanner = getHRegion().getCoprocessorHost().preStoreScannerOpen(this, scan, targetCols);
}
if (scanner == null) {
scanner = new StoreScanner(this, getScanInfo(), scan, targetCols);
}
return scanner;
} finally {
lock.readLock().unlock();
}
@ -2065,7 +2084,7 @@ public class Store extends SchemaConfigured implements HeapSize {
return compactionSize > throttlePoint;
}
HRegion getHRegion() {
public HRegion getHRegion() {
return this.region;
}
@ -2168,6 +2187,12 @@ public class Store extends SchemaConfigured implements HeapSize {
}
storeFile = Store.this.commitFile(storeFilePath, cacheFlushId,
snapshotTimeRangeTracker, flushedSize, status);
if (Store.this.getHRegion().getCoprocessorHost() != null) {
Store.this.getHRegion()
.getCoprocessorHost()
.postFlush(Store.this, storeFile);
}
// Add new file to store files. Clear snapshot too while we have
// the Store write lock.
return Store.this.updateStorefiles(storeFile, snapshot);
@ -2210,6 +2235,10 @@ public class Store extends SchemaConfigured implements HeapSize {
return comparator;
}
public ScanInfo getScanInfo() {
return scanInfo;
}
/**
* Immutable information for scans over a store.
*/
@ -2226,6 +2255,17 @@ public class Store extends SchemaConfigured implements HeapSize {
+ (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
+ Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN);
/**
* @param family {@link HColumnDescriptor} describing the column family
* @param ttl Store's TTL (in ms)
* @param timeToPurgeDeletes duration in ms after which a delete marker can
* be purged during a major compaction.
* @param comparator The store's comparator
*/
public ScanInfo(HColumnDescriptor family, long ttl, long timeToPurgeDeletes, KVComparator comparator) {
this(family.getName(), family.getMinVersions(), family.getMaxVersions(), ttl, family
.getKeepDeletedCells(), timeToPurgeDeletes, comparator);
}
/**
* @param family Name of this store's column family
* @param minVersions Store's MIN_VERSIONS setting

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
import org.apache.hadoop.hbase.regionserver.metrics.RegionMetricsStorage;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
@ -43,7 +44,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
* into List<KeyValue> for a single row.
*/
@InterfaceAudience.Private
class StoreScanner extends NonLazyKeyValueScanner
public class StoreScanner extends NonLazyKeyValueScanner
implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
static final Log LOG = LogFactory.getLog(StoreScanner.class);
private Store store;
@ -106,16 +107,16 @@ class StoreScanner extends NonLazyKeyValueScanner
* @param columns which columns we are scanning
* @throws IOException
*/
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan, final NavigableSet<byte[]> columns)
throws IOException {
this(store, scan.getCacheBlocks(), scan, columns, store.scanInfo.getTtl(),
store.scanInfo.getMinVersions());
this(store, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions());
initializeMetricNames();
if (columns != null && scan.isRaw()) {
throw new DoNotRetryIOException(
"Cannot specify any column for a raw scan");
}
matcher = new ScanQueryMatcher(scan, store.scanInfo, columns,
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
oldestUnexpiredTS);
@ -158,13 +159,13 @@ class StoreScanner extends NonLazyKeyValueScanner
* @param smallestReadPoint the readPoint that we should use for tracking
* versions
*/
StoreScanner(Store store, Scan scan,
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType,
long smallestReadPoint, long earliestPutTs) throws IOException {
this(store, false, scan, null, store.scanInfo.getTtl(),
store.scanInfo.getMinVersions());
this(store, false, scan, null, scanInfo.getTtl(),
scanInfo.getMinVersions());
initializeMetricNames();
matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType,
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
@ -181,7 +182,7 @@ class StoreScanner extends NonLazyKeyValueScanner
/** Constructor for testing. */
StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners) throws IOException {
this(scan, scanInfo, scanType, columns, scanners,
HConstants.LATEST_TIMESTAMP);
@ -189,7 +190,7 @@ class StoreScanner extends NonLazyKeyValueScanner
// Constructor for testing.
StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
ScanType scanType, final NavigableSet<byte[]> columns,
final List<KeyValueScanner> scanners, long earliestPutTs)
throws IOException {
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
@ -598,14 +599,5 @@ class StoreScanner extends NonLazyKeyValueScanner
static void enableLazySeekGlobally(boolean enable) {
lazySeekEnabledGlobally = enable;
}
/**
* Enum to distinguish general scan types.
*/
public static enum ScanType {
MAJOR_COMPACT,
MINOR_COMPACT,
USER_SCAN
}
}

View File

@ -788,6 +788,22 @@ public class HBaseTestingUtility {
this.hbaseCluster.flushcache(tableName);
}
/**
* Compact all regions in the mini hbase cluster
* @throws IOException
*/
public void compact(boolean major) throws IOException {
this.hbaseCluster.compact(major);
}
/**
* Compact all of a table's reagion in the mini hbase cluster
* @throws IOException
*/
public void compact(byte [] tableName, boolean major) throws IOException {
this.hbaseCluster.compact(tableName, major);
}
/**
* Create a table.

View File

@ -454,6 +454,34 @@ public class MiniHBaseCluster {
}
}
/**
* Call flushCache on all regions on all participating regionservers.
* @throws IOException
*/
public void compact(boolean major) throws IOException {
for (JVMClusterUtil.RegionServerThread t:
this.hbaseCluster.getRegionServers()) {
for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
r.compactStores(major);
}
}
}
/**
* Call flushCache on all regions of the specified table.
* @throws IOException
*/
public void compact(byte [] tableName, boolean major) throws IOException {
for (JVMClusterUtil.RegionServerThread t:
this.hbaseCluster.getRegionServers()) {
for(HRegion r: t.getRegionServer().getOnlineRegionsLocalContext()) {
if(Bytes.equals(r.getTableDesc().getName(), tableName)) {
r.compactStores(major);
}
}
}
}
/**
* @return List of region server threads.
*/

View File

@ -90,12 +90,12 @@ import static org.junit.Assert.*;
@Category(LargeTests.class)
public class TestFromClientSide {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static byte [] ROW = Bytes.toBytes("testRow");
private static byte [] FAMILY = Bytes.toBytes("testFamily");
private static byte [] QUALIFIER = Bytes.toBytes("testQualifier");
private static byte [] VALUE = Bytes.toBytes("testValue");
private static int SLAVES = 3;
protected static int SLAVES = 3;
/**
* @throws java.lang.Exception

View File

@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
import org.apache.hadoop.hbase.regionserver.NoOpScanPolicyObserver;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
/**
* Test all client operations with a coprocessor that
* just implements the default flush/compact/scan policy
*/
@Category(LargeTests.class)
public class TestFromClientSideWithCoprocessor extends TestFromClientSide {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
MultiRowMutationEndpoint.class.getName(), NoOpScanPolicyObserver.class.getName());
// We need more than one region server in this test
TEST_UTIL.startMiniCluster(SLAVES);
}
}

View File

@ -29,6 +29,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Arrays;
import java.util.NavigableSet;
import com.google.common.collect.ImmutableList;
import org.apache.commons.logging.Log;
@ -42,7 +43,9 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@ -63,11 +66,13 @@ public class SimpleRegionObserver extends BaseRegionObserver {
boolean hadPreClose;
boolean hadPostClose;
boolean hadPreFlush;
boolean hadPreFlushScannerOpen;
boolean hadPostFlush;
boolean hadPreSplit;
boolean hadPostSplit;
boolean hadPreCompactSelect;
boolean hadPostCompactSelect;
boolean hadPreCompactScanner;
boolean hadPreCompact;
boolean hadPostCompact;
boolean hadPreGet = false;
@ -87,6 +92,7 @@ public class SimpleRegionObserver extends BaseRegionObserver {
boolean hadPreScannerClose = false;
boolean hadPostScannerClose = false;
boolean hadPreScannerOpen = false;
boolean hadPreStoreScannerOpen = false;
boolean hadPostScannerOpen = false;
boolean hadPreBulkLoadHFile = false;
boolean hadPostBulkLoadHFile = false;
@ -120,12 +126,20 @@ public class SimpleRegionObserver extends BaseRegionObserver {
}
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) {
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner) {
hadPreFlush = true;
return scanner;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c) {
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
hadPreFlushScannerOpen = true;
return null;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, StoreFile resultFile) {
hadPostFlush = true;
}
@ -166,6 +180,14 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return scanner;
}
@Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s) throws IOException {
hadPreCompactScanner = true;
return null;
}
@Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
Store store, StoreFile resultFile) {
@ -184,6 +206,14 @@ public class SimpleRegionObserver extends BaseRegionObserver {
return null;
}
@Override
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final Scan scan, final NavigableSet<byte[]> targetCols,
final KeyValueScanner s) throws IOException {
hadPreStoreScannerOpen = true;
return null;
}
@Override
public RegionScanner postScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final RegionScanner s)

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.LoadTestTool;
import org.apache.hadoop.hbase.util.MD5Hash;
@ -408,7 +407,7 @@ public class HFileReadWriteTest {
Scan scan = new Scan();
// Include deletes
scanner = new StoreScanner(store, scan, scanners,
scanner = new StoreScanner(store, store.scanInfo, scan, scanners,
ScanType.MAJOR_COMPACT, Long.MIN_VALUE, Long.MIN_VALUE);
ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();

View File

@ -0,0 +1,62 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TestFromClientSideWithCoprocessor;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
/**
* RegionObserver that just reimplements the default behavior,
* in order to validate that all the necessary APIs for this are public
* This observer is also used in {@link TestFromClientSideWithCoprocessor} and
* {@link TestCompactionWithCoprocessor} to make sure that a wide range
* of functionality still behaves as expected.
*/
public class NoOpScanPolicyObserver extends BaseRegionObserver {
/**
* Reimplement the default behavior
*/
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
Store.ScanInfo oldSI = store.getScanInfo();
Store.ScanInfo scanInfo = new Store.ScanInfo(store.getFamily(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
}
/**
* Reimplement the default behavior
*/
@Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType, long earliestPutTs,
InternalScanner s) throws IOException {
// this demonstrates how to override the scanners default behavior
Store.ScanInfo oldSI = store.getScanInfo();
Store.ScanInfo scanInfo = new Store.ScanInfo(store.getFamily(), oldSI.getTtl(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(oldSI.getMaxVersions());
return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion()
.getSmallestReadPoint(), earliestPutTs);
}
@Override
public KeyValueScanner preStoreScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, final Scan scan, final NavigableSet<byte[]> targetCols, KeyValueScanner s)
throws IOException {
return new StoreScanner(store, store.getScanInfo(), scan, targetCols);
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.junit.experimental.categories.Category;
/**
* Make sure all compaction tests still pass with the preFlush and preCompact
* overridden to implement the default behavior
*/
@Category(MediumTests.class)
public class TestCompactionWithCoprocessor extends TestCompaction {
/** constructor */
public TestCompactionWithCoprocessor() throws Exception {
super();
conf.setStrings(CoprocessorHost.USER_REGION_COPROCESSOR_CONF_KEY,
NoOpScanPolicyObserver.class.getName());
}
}

View File

@ -36,7 +36,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.*;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;

View File

@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.KeyValueTestUtil;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.metrics.SchemaMetrics;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
@ -559,7 +558,7 @@ public class TestStoreScanner extends TestCase {
KeyValue.COMPARATOR);
StoreScanner scanner =
new StoreScanner(scan, scanInfo,
StoreScanner.ScanType.MAJOR_COMPACT, null, scanners,
ScanType.MAJOR_COMPACT, null, scanners,
HConstants.OLDEST_TIMESTAMP);
List<KeyValue> results = new ArrayList<KeyValue>();
results = new ArrayList<KeyValue>();

View File

@ -0,0 +1,262 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;
// this is deliberately not in the o.a.h.h.regionserver package
// in order to make sure all required classes/method are available
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MediumTests;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import static org.junit.Assert.*;
@Category(MediumTests.class)
public class TestCoprocessorScanPolicy {
final Log LOG = LogFactory.getLog(getClass());
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final byte[] F = Bytes.toBytes("fam");
private static final byte[] Q = Bytes.toBytes("qual");
private static final byte[] R = Bytes.toBytes("row");
@BeforeClass
public static void setUpBeforeClass() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
ScanObserver.class.getName());
TEST_UTIL.startMiniCluster();
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
@Test
public void testBaseCases() throws Exception {
byte[] tableName = Bytes.toBytes("baseCases");
HTable t = TEST_UTIL.createTable(tableName, F, 1);
// set the version override to 2
Put p = new Put(R);
p.setAttribute("versions", new byte[]{});
p.add(F, tableName, Bytes.toBytes(2));
t.put(p);
// insert 2 versions
p = new Put(R);
p.add(F, Q, Q);
t.put(p);
p = new Put(R);
p.add(F, Q, Q);
t.put(p);
Get g = new Get(R);
g.setMaxVersions(10);
Result r = t.get(g);
assertEquals(2, r.size());
TEST_UTIL.flush(tableName);
TEST_UTIL.compact(tableName, true);
// both version are still visible even after a flush/compaction
g = new Get(R);
g.setMaxVersions(10);
r = t.get(g);
assertEquals(2, r.size());
// insert a 3rd version
p = new Put(R);
p.add(F, Q, Q);
t.put(p);
g = new Get(R);
g.setMaxVersions(10);
r = t.get(g);
// still only two version visible
assertEquals(2, r.size());
t.close();
}
@Test
public void testTTL() throws Exception {
byte[] tableName = Bytes.toBytes("testTTL");
HTableDescriptor desc = new HTableDescriptor(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(F)
.setMaxVersions(10)
.setTimeToLive(1);
desc.addFamily(hcd);
TEST_UTIL.getHBaseAdmin().createTable(desc);
HTable t = new HTable(new Configuration(TEST_UTIL.getConfiguration()), tableName);
long now = EnvironmentEdgeManager.currentTimeMillis();
ManualEnvironmentEdge me = new ManualEnvironmentEdge();
me.setValue(now);
EnvironmentEdgeManagerTestHelper.injectEdge(me);
// 2s in the past
long ts = now - 2000;
// Set the TTL override to 3s
Put p = new Put(R);
p.setAttribute("ttl", new byte[]{});
p.add(F, tableName, Bytes.toBytes(3000L));
t.put(p);
p = new Put(R);
p.add(F, Q, ts, Q);
t.put(p);
p = new Put(R);
p.add(F, Q, ts+1, Q);
t.put(p);
// these two should be expired but for the override
// (their ts was 2s in the past)
Get g = new Get(R);
g.setMaxVersions(10);
Result r = t.get(g);
// still there?
assertEquals(2, r.size());
TEST_UTIL.flush(tableName);
TEST_UTIL.compact(tableName, true);
g = new Get(R);
g.setMaxVersions(10);
r = t.get(g);
// still there?
assertEquals(2, r.size());
// roll time forward 2s.
me.setValue(now + 2000);
// now verify that data eventually does expire
g = new Get(R);
g.setMaxVersions(10);
r = t.get(g);
// should be gone now
assertEquals(0, r.size());
t.close();
}
public static class ScanObserver extends BaseRegionObserver {
private Map<String, Long> ttls = new HashMap<String,Long>();
private Map<String, Integer> versions = new HashMap<String,Integer>();
// lame way to communicate with the coprocessor,
// since it is loaded by a different class loader
@Override
public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final Put put,
final WALEdit edit, final boolean writeToWAL) throws IOException {
if (put.getAttribute("ttl") != null) {
KeyValue kv = put.getFamilyMap().values().iterator().next().get(0);
ttls.put(Bytes.toString(kv.getQualifier()), Bytes.toLong(kv.getValue()));
c.bypass();
} else if (put.getAttribute("versions") != null) {
KeyValue kv = put.getFamilyMap().values().iterator().next().get(0);
versions.put(Bytes.toString(kv.getQualifier()), Bytes.toInt(kv.getValue()));
c.bypass();
}
}
@Override
public InternalScanner preFlushScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, KeyValueScanner memstoreScanner, InternalScanner s) throws IOException {
Long newTtl = ttls.get(store.getTableName());
if (newTtl != null) {
System.out.println("PreFlush:" + newTtl);
}
Integer newVersions = versions.get(store.getTableName());
Store.ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
Store.ScanInfo scanInfo = new Store.ScanInfo(family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, Collections.singletonList(memstoreScanner),
ScanType.MINOR_COMPACT, store.getHRegion().getSmallestReadPoint(),
HConstants.OLDEST_TIMESTAMP);
}
@Override
public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
Store store, List<? extends KeyValueScanner> scanners, ScanType scanType,
long earliestPutTs, InternalScanner s) throws IOException {
Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName());
Store.ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
Store.ScanInfo scanInfo = new Store.ScanInfo(family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
Scan scan = new Scan();
scan.setMaxVersions(newVersions == null ? oldSI.getMaxVersions() : newVersions);
return new StoreScanner(store, scanInfo, scan, scanners, scanType, store.getHRegion()
.getSmallestReadPoint(), earliestPutTs);
}
@Override
public KeyValueScanner preStoreScannerOpen(
final ObserverContext<RegionCoprocessorEnvironment> c, Store store, final Scan scan,
final NavigableSet<byte[]> targetCols, KeyValueScanner s) throws IOException {
Long newTtl = ttls.get(store.getTableName());
Integer newVersions = versions.get(store.getTableName());
Store.ScanInfo oldSI = store.getScanInfo();
HColumnDescriptor family = store.getFamily();
Store.ScanInfo scanInfo = new Store.ScanInfo(family.getName(), family.getMinVersions(),
newVersions == null ? family.getMaxVersions() : newVersions,
newTtl == null ? oldSI.getTtl() : newTtl, family.getKeepDeletedCells(),
oldSI.getTimeToPurgeDeletes(), oldSI.getComparator());
return new StoreScanner(store, scanInfo, scan, targetCols);
}
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}