HBASE-24321 - Add writable MinVersions and read-only Scan to coproc S… (#1655)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Duo Zhang <zhangduo@apache.org> Signed-off-by: Lars Hofhansl <larsh@apache.org> Signed-off-by: Anoop Sam John <anoopsamjohn@gmail.com> Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
parent
ff85daf1e2
commit
ca81283fe5
|
@ -17,7 +17,9 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.regionserver;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
|
@ -34,8 +36,22 @@ public class CustomizedScanInfoBuilder implements ScanOptions {
|
|||
|
||||
private KeepDeletedCells keepDeletedCells = null;
|
||||
|
||||
private Integer minVersions;
|
||||
|
||||
private final Scan scan;
|
||||
|
||||
public CustomizedScanInfoBuilder(ScanInfo scanInfo) {
|
||||
this.scanInfo = scanInfo;
|
||||
this.scan = new Scan();
|
||||
}
|
||||
public CustomizedScanInfoBuilder(ScanInfo scanInfo, Scan scan) {
|
||||
this.scanInfo = scanInfo;
|
||||
//copy the scan so no coproc using this ScanOptions can alter the "real" scan
|
||||
try {
|
||||
this.scan = new Scan(scan);
|
||||
} catch (IOException e) {
|
||||
throw new AssertionError("Scan should not throw IOException", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -62,12 +78,13 @@ public class CustomizedScanInfoBuilder implements ScanOptions {
|
|||
if (maxVersions == null && ttl == null && keepDeletedCells == null) {
|
||||
return scanInfo;
|
||||
}
|
||||
return scanInfo.customize(getMaxVersions(), getTTL(), getKeepDeletedCells());
|
||||
return scanInfo.customize(getMaxVersions(), getTTL(), getKeepDeletedCells(), getMinVersions());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() + "]";
|
||||
return "ScanOptions [maxVersions=" + getMaxVersions() + ", TTL=" + getTTL() +
|
||||
", KeepDeletedCells=" + getKeepDeletedCells() + ", MinVersions=" + getMinVersions() + "]";
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -80,4 +97,23 @@ public class CustomizedScanInfoBuilder implements ScanOptions {
|
|||
return keepDeletedCells != null ? keepDeletedCells : scanInfo.getKeepDeletedCells();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMinVersions() {
|
||||
return minVersions != null ? minVersions : scanInfo.getMinVersions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMinVersions(int minVersions) {
|
||||
this.minVersions = minVersions;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Scan getScan() {
|
||||
try {
|
||||
return new Scan(scan);
|
||||
} catch(IOException e) {
|
||||
throw new AssertionError("Scan should not throw IOException anymore", e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2128,7 +2128,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
|
|||
try {
|
||||
ScanInfo scanInfo;
|
||||
if (this.getCoprocessorHost() != null) {
|
||||
scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this);
|
||||
scanInfo = this.getCoprocessorHost().preStoreScannerOpen(this, scan);
|
||||
} else {
|
||||
scanInfo = getScanInfo();
|
||||
}
|
||||
|
|
|
@ -1625,9 +1625,9 @@ public class RegionCoprocessorHost
|
|||
/**
|
||||
* Called before open store scanner for user scan.
|
||||
*/
|
||||
public ScanInfo preStoreScannerOpen(HStore store) throws IOException {
|
||||
public ScanInfo preStoreScannerOpen(HStore store, Scan scan) throws IOException {
|
||||
if (coprocEnvironments.isEmpty()) return store.getScanInfo();
|
||||
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
|
||||
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo(), scan);
|
||||
execOperation(new RegionObserverOperationWithoutResult() {
|
||||
@Override
|
||||
public void call(RegionObserver observer) throws IOException {
|
||||
|
|
|
@ -174,8 +174,13 @@ public class ScanInfo {
|
|||
* Used for CP users for customizing max versions, ttl and keepDeletedCells.
|
||||
*/
|
||||
ScanInfo customize(int maxVersions, long ttl, KeepDeletedCells keepDeletedCells) {
|
||||
return customize(maxVersions, ttl, keepDeletedCells, minVersions);
|
||||
}
|
||||
|
||||
ScanInfo customize(int maxVersions, long ttl, KeepDeletedCells keepDeletedCells,
|
||||
int minVersions) {
|
||||
return new ScanInfo(family, minVersions, maxVersions, ttl, keepDeletedCells, timeToPurgeDeletes,
|
||||
comparator, tableMaxRowSize, usePread, cellsPerTimeoutCheck, parallelSeekEnabled,
|
||||
preadMaxBytes, newVersionBehavior);
|
||||
comparator, tableMaxRowSize, usePread, cellsPerTimeoutCheck, parallelSeekEnabled,
|
||||
preadMaxBytes, newVersionBehavior);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
|
|||
|
||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
|
@ -26,8 +27,9 @@ import org.apache.yetus.audience.InterfaceStability;
|
|||
* This class gives you the ability to change the max versions and TTL options before opening a
|
||||
* scanner for a Store. And also gives you some information for the scan.
|
||||
* <p>
|
||||
* Changing max versions and TTL are usually safe even for flush/compaction, so here we provide a
|
||||
* way to do it for you. If you want to do other complicated stuffs such as filtering, please wrap
|
||||
* Changing max versions, min versins, KeepDeletedCells, and TTL are usually safe even
|
||||
* for flush/compaction, so here we provide a way to do it for you. If you want to do other
|
||||
* complicated operations such as filtering, please wrap
|
||||
* the {@link InternalScanner} in the {@code preCompact} and {@code preFlush} methods in
|
||||
* {@link org.apache.hadoop.hbase.coprocessor.RegionObserver}.
|
||||
* <p>
|
||||
|
@ -64,4 +66,13 @@ public interface ScanOptions {
|
|||
void setKeepDeletedCells(KeepDeletedCells keepDeletedCells);
|
||||
|
||||
KeepDeletedCells getKeepDeletedCells();
|
||||
|
||||
int getMinVersions();
|
||||
|
||||
void setMinVersions(int minVersions);
|
||||
|
||||
/**
|
||||
* Returns a copy of the Scan object. Modifying it will have no effect.
|
||||
*/
|
||||
Scan getScan();
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Cell;
|
|||
import org.apache.hadoop.hbase.CellUtil;
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hadoop.hbase.CoprocessorEnvironment;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
import org.apache.hadoop.hbase.client.Append;
|
||||
import org.apache.hadoop.hbase.client.Delete;
|
||||
import org.apache.hadoop.hbase.client.Durability;
|
||||
|
@ -58,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.InternalScanner;
|
|||
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
|
||||
import org.apache.hadoop.hbase.regionserver.Region.Operation;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionScanner;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanOptions;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.Store;
|
||||
import org.apache.hadoop.hbase.regionserver.StoreFile;
|
||||
|
@ -685,6 +687,40 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
|
|||
return reader;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStoreScannerOpen(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||
Store store, ScanOptions options) throws IOException {
|
||||
if (options.getScan().getTimeRange().isAllTime()) {
|
||||
setScanOptions(options);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preCompactScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
ScanType scanType, ScanOptions options, CompactionLifeCycleTracker tracker,
|
||||
CompactionRequest request) throws IOException {
|
||||
setScanOptions(options);
|
||||
}
|
||||
|
||||
public void preFlushScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||
ScanOptions options,FlushLifeCycleTracker tracker) throws IOException {
|
||||
setScanOptions(options);
|
||||
}
|
||||
|
||||
public void preMemStoreCompactionCompactScannerOpen(
|
||||
ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
|
||||
throws IOException {
|
||||
setScanOptions(options);
|
||||
}
|
||||
|
||||
private void setScanOptions(ScanOptions options) {
|
||||
options.setMaxVersions(TestRegionCoprocessorHost.MAX_VERSIONS);
|
||||
options.setMinVersions(TestRegionCoprocessorHost.MIN_VERSIONS);
|
||||
options.setKeepDeletedCells(KeepDeletedCells.TRUE);
|
||||
options.setTTL(TestRegionCoprocessorHost.TTL);
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void preWALAppend(ObserverContext<RegionCoprocessorEnvironment> ctx,
|
||||
WALKey key, WALEdit edit) throws IOException {
|
||||
|
|
|
@ -22,48 +22,80 @@ import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.REGION_COPROCE
|
|||
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR;
|
||||
import static org.apache.hadoop.hbase.coprocessor.CoprocessorHost.USER_COPROCESSORS_ENABLED_CONF_KEY;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.CellComparator;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.client.RegionInfo;
|
||||
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||
import org.apache.hadoop.hbase.io.TimeRange;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
|
||||
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanInfo;
|
||||
import org.apache.hadoop.hbase.regionserver.ScanType;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
|
||||
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.junit.Before;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.junit.rules.TestName;
|
||||
import java.io.IOException;
|
||||
|
||||
@Category({SmallTests.class})
|
||||
public class TestRegionCoprocessorHost {
|
||||
private Configuration conf;
|
||||
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TestRegionCoprocessorHost.class);
|
||||
|
||||
@Test
|
||||
public void testLoadDuplicateCoprocessor() throws Exception {
|
||||
Configuration conf = HBaseConfiguration.create();
|
||||
@Rule
|
||||
public final TestName name = new TestName();
|
||||
private RegionInfo regionInfo;
|
||||
private HRegion region;
|
||||
private RegionServerServices rsServices;
|
||||
public static final int MAX_VERSIONS = 3;
|
||||
public static final int MIN_VERSIONS = 2;
|
||||
public static final int TTL = 1000;
|
||||
|
||||
@Before
|
||||
public void setup() throws IOException {
|
||||
conf = HBaseConfiguration.create();
|
||||
conf.setBoolean(COPROCESSORS_ENABLED_CONF_KEY, true);
|
||||
conf.setBoolean(USER_COPROCESSORS_ENABLED_CONF_KEY, true);
|
||||
conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, true);
|
||||
conf.set(REGION_COPROCESSOR_CONF_KEY, SimpleRegionObserver.class.getName());
|
||||
TableName tableName = TableName.valueOf("testDoubleLoadingCoprocessor");
|
||||
RegionInfo regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
|
||||
TableName tableName = TableName.valueOf(name.getMethodName());
|
||||
regionInfo = RegionInfoBuilder.newBuilder(tableName).build();
|
||||
// config a same coprocessor with system coprocessor
|
||||
TableDescriptor tableDesc = TableDescriptorBuilder.newBuilder(tableName)
|
||||
.setCoprocessor(SimpleRegionObserver.class.getName()).build();
|
||||
HRegion region = mock(HRegion.class);
|
||||
.setCoprocessor(SimpleRegionObserver.class.getName()).build();
|
||||
region = mock(HRegion.class);
|
||||
when(region.getRegionInfo()).thenReturn(regionInfo);
|
||||
when(region.getTableDescriptor()).thenReturn(tableDesc);
|
||||
RegionServerServices rsServices = mock(RegionServerServices.class);
|
||||
rsServices = mock(RegionServerServices.class);
|
||||
}
|
||||
@Test
|
||||
public void testLoadDuplicateCoprocessor() throws Exception {
|
||||
conf.setBoolean(SKIP_LOAD_DUPLICATE_TABLE_COPROCESSOR, true);
|
||||
conf.set(REGION_COPROCESSOR_CONF_KEY, SimpleRegionObserver.class.getName());
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
|
||||
// Only one coprocessor SimpleRegionObserver loaded
|
||||
assertEquals(1, host.coprocEnvironments.size());
|
||||
|
@ -74,4 +106,73 @@ public class TestRegionCoprocessorHost {
|
|||
// Two duplicate coprocessors loaded
|
||||
assertEquals(2, host.coprocEnvironments.size());
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreStoreScannerOpen() throws IOException {
|
||||
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
|
||||
Scan scan = new Scan();
|
||||
scan.setTimeRange(TimeRange.INITIAL_MIN_TIMESTAMP, TimeRange.INITIAL_MAX_TIMESTAMP);
|
||||
assertTrue("Scan is not for all time", scan.getTimeRange().isAllTime());
|
||||
//SimpleRegionObserver is set to update the ScanInfo parameters if the passed-in scan
|
||||
//is for all time. this lets us exercise both that the Scan is wired up properly in the coproc
|
||||
//and that we can customize the metadata
|
||||
|
||||
ScanInfo oldScanInfo = getScanInfo();
|
||||
|
||||
HStore store = mock(HStore.class);
|
||||
when(store.getScanInfo()).thenReturn(oldScanInfo);
|
||||
ScanInfo newScanInfo = host.preStoreScannerOpen(store, scan);
|
||||
|
||||
verifyScanInfo(newScanInfo);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreCompactScannerOpen() throws IOException {
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
|
||||
ScanInfo oldScanInfo = getScanInfo();
|
||||
HStore store = mock(HStore.class);
|
||||
when(store.getScanInfo()).thenReturn(oldScanInfo);
|
||||
ScanInfo newScanInfo = host.preCompactScannerOpen(store, ScanType.COMPACT_DROP_DELETES,
|
||||
mock(CompactionLifeCycleTracker.class), mock(CompactionRequest.class), mock(User.class));
|
||||
verifyScanInfo(newScanInfo);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreFlushScannerOpen() throws IOException {
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
|
||||
ScanInfo oldScanInfo = getScanInfo();
|
||||
HStore store = mock(HStore.class);
|
||||
when(store.getScanInfo()).thenReturn(oldScanInfo);
|
||||
ScanInfo newScanInfo = host.preFlushScannerOpen(store, mock(FlushLifeCycleTracker.class));
|
||||
verifyScanInfo(newScanInfo);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPreMemStoreCompactionCompactScannerOpen() throws IOException {
|
||||
RegionCoprocessorHost host = new RegionCoprocessorHost(region, rsServices, conf);
|
||||
ScanInfo oldScanInfo = getScanInfo();
|
||||
HStore store = mock(HStore.class);
|
||||
when(store.getScanInfo()).thenReturn(oldScanInfo);
|
||||
ScanInfo newScanInfo = host.preMemStoreCompactionCompactScannerOpen(store);
|
||||
verifyScanInfo(newScanInfo);
|
||||
}
|
||||
|
||||
private void verifyScanInfo(ScanInfo newScanInfo) {
|
||||
assertEquals(KeepDeletedCells.TRUE, newScanInfo.getKeepDeletedCells());
|
||||
assertEquals(MAX_VERSIONS, newScanInfo.getMaxVersions());
|
||||
assertEquals(MIN_VERSIONS, newScanInfo.getMinVersions());
|
||||
assertEquals(TTL, newScanInfo.getTtl());
|
||||
}
|
||||
|
||||
private ScanInfo getScanInfo() {
|
||||
int oldMaxVersions = 1;
|
||||
int oldMinVersions = 0;
|
||||
long oldTTL = 10000;
|
||||
|
||||
return new ScanInfo(conf, Bytes.toBytes("cf"), oldMinVersions, oldMaxVersions, oldTTL,
|
||||
KeepDeletedCells.FALSE, HConstants.FOREVER, 1000,
|
||||
CellComparator.getInstance(), true);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue