HBASE-19095 Add CP hooks in RegionObserver for in memory compaction

This commit is contained in:
zhangduo 2017-11-04 21:45:39 +08:00
parent 33ae6dce42
commit 143e4949fe
9 changed files with 377 additions and 133 deletions

View File

@ -162,6 +162,20 @@ public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObs
return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
}
@Override
public void preMemStoreCompactionCompactScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
throws IOException {
options.readAllVersions();
}
@Override
public InternalScanner preMemStoreCompactionCompact(
ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
throws IOException {
return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
}
@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
throws IOException {

View File

@ -20,58 +20,25 @@ package org.apache.hadoop.hbase.coprocessor.example;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ CoprocessorTests.class, MediumTests.class })
public class TestWriteHeavyIncrementObserver {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static TableName NAME = TableName.valueOf("TestCP");
private static byte[] FAMILY = Bytes.toBytes("cf");
private static byte[] ROW = Bytes.toBytes("row");
private static byte[] CQ1 = Bytes.toBytes("cq1");
private static byte[] CQ2 = Bytes.toBytes("cq2");
private static Table TABLE;
private static long UPPER = 1000;
private static int THREADS = 10;
public class TestWriteHeavyIncrementObserver extends WriteHeavyIncrementObserverTestBase {
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
UTIL.startMiniCluster(3);
WriteHeavyIncrementObserverTestBase.setUp();
UTIL.getAdmin()
.createTable(TableDescriptorBuilder.newBuilder(NAME)
.addCoprocessor(WriteHeavyIncrementObserver.class.getName())
@ -79,45 +46,9 @@ public class TestWriteHeavyIncrementObserver {
TABLE = UTIL.getConnection().getTable(NAME);
}
@AfterClass
public static void tearDown() throws Exception {
if (TABLE != null) {
TABLE.close();
}
UTIL.shutdownMiniCluster();
}
private static void increment() throws IOException {
for (long i = 1; i <= UPPER; i++) {
TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
try {
Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10));
} catch (InterruptedException e) {
}
}
}
private void assertSum() throws IOException {
Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
}
@Test
public void test() throws Exception {
Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
try {
increment();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, "increment-" + i)).toArray(Thread[]::new);
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
doIncrement(0);
assertSum();
// we do not hack scan operation so using scan we could get the original values added into the
// table.

View File

@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor.example;
import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({ CoprocessorTests.class, MediumTests.class })
public class TestWriteHeavyIncrementObserverWithMemStoreCompaction
extends WriteHeavyIncrementObserverTestBase {
@BeforeClass
public static void setUp() throws Exception {
WriteHeavyIncrementObserverTestBase.setUp();
UTIL.getAdmin()
.createTable(TableDescriptorBuilder.newBuilder(NAME)
.addCoprocessor(WriteHeavyIncrementObserver.class.getName())
.setValue(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
MemoryCompactionPolicy.EAGER.name())
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
TABLE = UTIL.getConnection().getTable(NAME);
}
@Test
public void test() throws Exception {
// sleep every 10 loops to give memstore compaction enough time to finish before reaching the
// flush size.
doIncrement(10);
assertSum();
HStore store = UTIL.getHBaseCluster().findRegionsForTable(NAME).get(0).getStore(FAMILY);
// should have no store files created as we have done aggregating all in memory
assertEquals(0, store.getStorefilesCount());
}
}

View File

@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor.example;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.stream.IntStream;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
public class WriteHeavyIncrementObserverTestBase {
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
protected static TableName NAME = TableName.valueOf("TestCP");
protected static byte[] FAMILY = Bytes.toBytes("cf");
protected static byte[] ROW = Bytes.toBytes("row");
protected static byte[] CQ1 = Bytes.toBytes("cq1");
protected static byte[] CQ2 = Bytes.toBytes("cq2");
protected static Table TABLE;
protected static long UPPER = 1000;
protected static int THREADS = 10;
@BeforeClass
public static void setUp() throws Exception {
UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
UTIL.startMiniCluster(3);
}
@AfterClass
public static void tearDown() throws Exception {
if (TABLE != null) {
TABLE.close();
}
UTIL.shutdownMiniCluster();
}
private static void increment(int sleepSteps) throws IOException {
for (long i = 1; i <= UPPER; i++) {
TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
if (sleepSteps > 0 && i % sleepSteps == 0) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
}
}
}
}
protected final void assertSum() throws IOException {
Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
}
protected final void doIncrement(int sleepSteps) throws InterruptedException {
Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
try {
increment(sleepSteps);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, "increment-" + i)).toArray(Thread[]::new);
for (Thread thread : threads) {
thread.start();
}
for (Thread thread : threads) {
thread.join();
}
}
}

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.coprocessor;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.io.IOException;
import java.util.List;
import java.util.Map;
@ -144,10 +146,10 @@ public interface RegionObserver {
* Called before a Store's memstore is flushed to disk.
* @param c the environment provided by the region server
* @param store the store where flush is being requested
* @param scanner the scanner over existing data used in the store file
* @param scanner the scanner over existing data used in the memstore
* @param tracker tracker used to track the life cycle of a flush
* @return the scanner to use during compaction. Should not be {@code null}
* unless the implementation is writing new store files on its own.
* @return the scanner to use during flush. Should not be {@code null} unless the implementation
* is writing new store files on its own.
*/
default InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
@ -173,6 +175,51 @@ public interface RegionObserver {
default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {}
/**
* Called before in memory compaction started.
* @param c the environment provided by the region server
* @param store the store where in memory compaction is being requested
*/
default void preMemStoreCompaction(ObserverContext<RegionCoprocessorEnvironment> c, Store store)
throws IOException {}
/**
* Called before we open store scanner for in memory compaction. You can use the {@code options}
* to change max versions and TTL for the scanner being opened. Notice that this method will only
* be called when you use {@code eager} mode. For {@code basic} mode we will not drop any cells
* thus we do not open a store scanner.
* @param c the environment provided by the region server
* @param store the store where in memory compaction is being requested
* @param options used to change max versions and TTL for the scanner being opened
*/
default void preMemStoreCompactionCompactScannerOpen(
ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
throws IOException {}
/**
* Called before we do in memory compaction. Notice that this method will only be called when you
* use {@code eager} mode. For {@code basic} mode we will not drop any cells thus there is no
* {@link InternalScanner}.
* @param c the environment provided by the region server
* @param store the store where in memory compaction is being executed
* @param scanner the scanner over existing data used in the memstore segments being compact
* @return the scanner to use during in memory compaction. Must be non-null.
*/
@NonNull
default InternalScanner preMemStoreCompactionCompact(
ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
throws IOException {
return scanner;
}
/**
* Called after the in memory compaction is finished.
* @param c the environment provided by the region server
* @param store the store where in memory compaction is being executed
*/
default void postMemStoreCompaction(ObserverContext<RegionCoprocessorEnvironment> c, Store store)
throws IOException {}
/**
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
* available candidates. To alter the files used for compaction, you may mutate the passed in list

View File

@ -96,8 +96,18 @@ public class MemStoreCompactor {
LOG.debug("Starting the In-Memory Compaction for store "
+ compactingMemStore.getStore().getColumnFamilyName());
}
HStore store = compactingMemStore.getStore();
RegionCoprocessorHost cpHost = store.getCoprocessorHost();
if (cpHost != null) {
cpHost.preMemStoreCompaction(store);
}
try {
doCompaction();
} finally {
if (cpHost != null) {
cpHost.postMemStoreCompaction(store);
}
}
return true;
}

View File

@ -23,12 +23,18 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.io.Closeables;
/**
* The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
* and performs the scan for compaction operation meaning it is based on SQM
@ -36,12 +42,14 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator {
private List<Cell> kvs = new ArrayList<>();
private boolean hasMore;
private static final Log LOG = LogFactory.getLog(MemStoreCompactorSegmentsIterator.class);
private final List<Cell> kvs = new ArrayList<>();
private boolean hasMore = true;
private Iterator<Cell> kvsIterator;
// scanner on top of pipeline scanner that uses ScanQueryMatcher
private StoreScanner compactingScanner;
private InternalScanner compactingScanner;
// C-tor
public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
@ -56,13 +64,7 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
// build the scanner based on Query Matcher
// reinitialize the compacting scanner for each instance of iterator
compactingScanner = createScanner(store, scanners);
hasMore = compactingScanner.next(kvs, scannerContext);
if (!kvs.isEmpty()) {
kvsIterator = kvs.iterator();
}
refillKVS();
}
@Override
@ -70,30 +72,26 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
if (kvsIterator == null) { // for the case when the result is empty
return false;
}
if (!kvsIterator.hasNext()) {
// refillKVS() method should be invoked only if !kvsIterator.hasNext()
if (!refillKVS()) {
return false;
}
}
return kvsIterator.hasNext();
// return true either we have cells in buffer or we can get more.
return kvsIterator.hasNext() || refillKVS();
}
@Override
public Cell next() {
if (kvsIterator == null) { // for the case when the result is empty
return null;
if (!hasNext()) {
throw new NoSuchElementException();
}
if (!kvsIterator.hasNext()) {
// refillKVS() method should be invoked only if !kvsIterator.hasNext()
if (!refillKVS()) return null;
}
return (!hasMore) ? null : kvsIterator.next();
return kvsIterator.next();
}
public void close() {
try {
compactingScanner.close();
} catch (IOException e) {
LOG.warn("close store scanner failed", e);
}
compactingScanner = null;
kvs.clear();
}
@Override
@ -105,39 +103,64 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
* Creates the scanner for compacting the pipeline.
* @return the scanner
*/
private StoreScanner createScanner(HStore store, List<KeyValueScanner> scanners)
private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners)
throws IOException {
// FIXME: This is the old comment 'Get all available versions'
// But actually if we really reset the ScanInfo to get all available versions then lots of UTs
// will fail
return new StoreScanner(store, store.getScanInfo(), scanners, ScanType.COMPACT_RETAIN_DELETES,
InternalScanner scanner = null;
boolean success = false;
try {
RegionCoprocessorHost cpHost = store.getCoprocessorHost();
ScanInfo scanInfo;
if (cpHost != null) {
scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store);
} else {
scanInfo = store.getScanInfo();
}
scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES,
store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
if (cpHost != null) {
InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner);
if (scannerFromCp == null) {
throw new CoprocessorException("Got a null InternalScanner when calling" +
" preMemStoreCompactionCompact which is not acceptable");
}
success = true;
return scannerFromCp;
} else {
success = true;
return scanner;
}
} finally {
if (!success) {
Closeables.close(scanner, true);
scanners.forEach(KeyValueScanner::close);
}
}
}
/* Refill kev-value set (should be invoked only when KVS is empty)
* Returns true if KVS is non-empty */
/*
* Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is
* non-empty
*/
private boolean refillKVS() {
kvs.clear(); // clear previous KVS, first initiated in the constructor
if (!hasMore) { // if there is nothing expected next in compactingScanner
// if there is nothing expected next in compactingScanner
if (!hasMore) {
return false;
}
try { // try to get next KVS
// clear previous KVS, first initiated in the constructor
kvs.clear();
for (;;) {
try {
hasMore = compactingScanner.next(kvs, scannerContext);
} catch (IOException ie) {
throw new IllegalStateException(ie);
} catch (IOException e) {
// should not happen as all data are in memory
throw new IllegalStateException(e);
}
if (!kvs.isEmpty() ) {// is the new KVS empty ?
if (!kvs.isEmpty()) {
kvsIterator = kvs.iterator();
return true;
} else {
// KVS is empty, but hasMore still true?
if (hasMore) { // try to move to next row
return refillKVS();
}
}
return hasMore;
} else if (!hasMore) {
return false;
}
}
}
}

View File

@ -776,6 +776,61 @@ public class RegionCoprocessorHost
});
}
/**
* Invoked before in memory compaction.
*/
public void preMemStoreCompaction(HStore store) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preMemStoreCompaction(this, store);
}
});
}
/**
* Invoked before create StoreScanner for in memory compaction.
*/
public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
}
});
return builder.build();
}
/**
* Invoked before compacting memstore.
*/
public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
throws IOException {
if (coprocEnvironments.isEmpty()) {
return scanner;
}
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
regionObserverGetter, scanner) {
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preMemStoreCompactionCompact(this, store, getResult());
}
});
}
/**
* Invoked after in memory compaction.
*/
public void postMemStoreCompaction(HStore store) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postMemStoreCompaction(this, store);
}
});
}
/**
* Invoked after a memstore flush
* @throws IOException

View File

@ -268,9 +268,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
private static final Scan SCAN_FOR_COMPACTION = new Scan();
/**
* Used for compactions.
* Used for store file compaction and memstore compaction.
* <p>
* Opens a scanner across specified StoreFiles.
* Opens a scanner across specified StoreFiles/MemStoreSegments.
* @param store who we scan
* @param scanners ancillary scanners
* @param smallestReadPoint the readPoint that we should use for tracking versions