HBASE-19095 Add CP hooks in RegionObserver for in memory compaction
This commit is contained in:
parent
3a0f59d031
commit
28cdf4afb8
|
@ -162,6 +162,20 @@ public class WriteHeavyIncrementObserver implements RegionCoprocessor, RegionObs
|
||||||
return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
|
return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void preMemStoreCompactionCompactScannerOpen(
|
||||||
|
ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
|
||||||
|
throws IOException {
|
||||||
|
options.readAllVersions();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public InternalScanner preMemStoreCompactionCompact(
|
||||||
|
ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
|
||||||
|
throws IOException {
|
||||||
|
return wrap(store.getColumnFamilyDescriptor().getName(), scanner);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
|
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get, List<Cell> result)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -20,58 +20,25 @@ package org.apache.hadoop.hbase.coprocessor.example;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.UncheckedIOException;
|
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
|
||||||
import java.util.stream.IntStream;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.Waiter;
|
import org.apache.hadoop.hbase.Waiter;
|
||||||
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
|
||||||
import org.apache.hadoop.hbase.client.Increment;
|
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
import org.apache.hadoop.hbase.regionserver.HStore;
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.junit.AfterClass;
|
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
@Category({ CoprocessorTests.class, MediumTests.class })
|
@Category({ CoprocessorTests.class, MediumTests.class })
|
||||||
public class TestWriteHeavyIncrementObserver {
|
public class TestWriteHeavyIncrementObserver extends WriteHeavyIncrementObserverTestBase {
|
||||||
|
|
||||||
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
|
||||||
|
|
||||||
private static TableName NAME = TableName.valueOf("TestCP");
|
|
||||||
|
|
||||||
private static byte[] FAMILY = Bytes.toBytes("cf");
|
|
||||||
|
|
||||||
private static byte[] ROW = Bytes.toBytes("row");
|
|
||||||
|
|
||||||
private static byte[] CQ1 = Bytes.toBytes("cq1");
|
|
||||||
|
|
||||||
private static byte[] CQ2 = Bytes.toBytes("cq2");
|
|
||||||
|
|
||||||
private static Table TABLE;
|
|
||||||
|
|
||||||
private static long UPPER = 1000;
|
|
||||||
|
|
||||||
private static int THREADS = 10;
|
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void setUp() throws Exception {
|
public static void setUp() throws Exception {
|
||||||
UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
|
WriteHeavyIncrementObserverTestBase.setUp();
|
||||||
UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
|
|
||||||
UTIL.startMiniCluster(3);
|
|
||||||
UTIL.getAdmin()
|
UTIL.getAdmin()
|
||||||
.createTable(TableDescriptorBuilder.newBuilder(NAME)
|
.createTable(TableDescriptorBuilder.newBuilder(NAME)
|
||||||
.addCoprocessor(WriteHeavyIncrementObserver.class.getName())
|
.addCoprocessor(WriteHeavyIncrementObserver.class.getName())
|
||||||
|
@ -79,45 +46,9 @@ public class TestWriteHeavyIncrementObserver {
|
||||||
TABLE = UTIL.getConnection().getTable(NAME);
|
TABLE = UTIL.getConnection().getTable(NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
@AfterClass
|
|
||||||
public static void tearDown() throws Exception {
|
|
||||||
if (TABLE != null) {
|
|
||||||
TABLE.close();
|
|
||||||
}
|
|
||||||
UTIL.shutdownMiniCluster();
|
|
||||||
}
|
|
||||||
|
|
||||||
private static void increment() throws IOException {
|
|
||||||
for (long i = 1; i <= UPPER; i++) {
|
|
||||||
TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
|
|
||||||
try {
|
|
||||||
Thread.sleep(ThreadLocalRandom.current().nextInt(5, 10));
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertSum() throws IOException {
|
|
||||||
Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
|
|
||||||
assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
|
|
||||||
assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void test() throws Exception {
|
public void test() throws Exception {
|
||||||
Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
|
doIncrement(0);
|
||||||
try {
|
|
||||||
increment();
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new UncheckedIOException(e);
|
|
||||||
}
|
|
||||||
}, "increment-" + i)).toArray(Thread[]::new);
|
|
||||||
for (Thread thread : threads) {
|
|
||||||
thread.start();
|
|
||||||
}
|
|
||||||
for (Thread thread : threads) {
|
|
||||||
thread.join();
|
|
||||||
}
|
|
||||||
assertSum();
|
assertSum();
|
||||||
// we do not hack scan operation so using scan we could get the original values added into the
|
// we do not hack scan operation so using scan we could get the original values added into the
|
||||||
// table.
|
// table.
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.coprocessor.example;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
||||||
|
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.CompactingMemStore;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.HStore;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category({ CoprocessorTests.class, MediumTests.class })
|
||||||
|
public class TestWriteHeavyIncrementObserverWithMemStoreCompaction
|
||||||
|
extends WriteHeavyIncrementObserverTestBase {
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
WriteHeavyIncrementObserverTestBase.setUp();
|
||||||
|
UTIL.getAdmin()
|
||||||
|
.createTable(TableDescriptorBuilder.newBuilder(NAME)
|
||||||
|
.addCoprocessor(WriteHeavyIncrementObserver.class.getName())
|
||||||
|
.setValue(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
|
||||||
|
MemoryCompactionPolicy.EAGER.name())
|
||||||
|
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build());
|
||||||
|
TABLE = UTIL.getConnection().getTable(NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void test() throws Exception {
|
||||||
|
// sleep every 10 loops to give memstore compaction enough time to finish before reaching the
|
||||||
|
// flush size.
|
||||||
|
doIncrement(10);
|
||||||
|
assertSum();
|
||||||
|
HStore store = UTIL.getHBaseCluster().findRegionsForTable(NAME).get(0).getStore(FAMILY);
|
||||||
|
// should have no store files created as we have done aggregating all in memory
|
||||||
|
assertEquals(0, store.getStorefilesCount());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,105 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.coprocessor.example;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.UncheckedIOException;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
|
import org.apache.hadoop.hbase.client.Increment;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
|
||||||
|
public class WriteHeavyIncrementObserverTestBase {
|
||||||
|
|
||||||
|
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
protected static TableName NAME = TableName.valueOf("TestCP");
|
||||||
|
|
||||||
|
protected static byte[] FAMILY = Bytes.toBytes("cf");
|
||||||
|
|
||||||
|
protected static byte[] ROW = Bytes.toBytes("row");
|
||||||
|
|
||||||
|
protected static byte[] CQ1 = Bytes.toBytes("cq1");
|
||||||
|
|
||||||
|
protected static byte[] CQ2 = Bytes.toBytes("cq2");
|
||||||
|
|
||||||
|
protected static Table TABLE;
|
||||||
|
|
||||||
|
protected static long UPPER = 1000;
|
||||||
|
|
||||||
|
protected static int THREADS = 10;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUp() throws Exception {
|
||||||
|
UTIL.getConfiguration().setLong(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, 64 * 1024L);
|
||||||
|
UTIL.getConfiguration().setLong("hbase.hregion.memstore.flush.size.limit", 1024L);
|
||||||
|
UTIL.startMiniCluster(3);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void tearDown() throws Exception {
|
||||||
|
if (TABLE != null) {
|
||||||
|
TABLE.close();
|
||||||
|
}
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void increment(int sleepSteps) throws IOException {
|
||||||
|
for (long i = 1; i <= UPPER; i++) {
|
||||||
|
TABLE.increment(new Increment(ROW).addColumn(FAMILY, CQ1, i).addColumn(FAMILY, CQ2, 2 * i));
|
||||||
|
if (sleepSteps > 0 && i % sleepSteps == 0) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(10);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final void assertSum() throws IOException {
|
||||||
|
Result result = TABLE.get(new Get(ROW).addColumn(FAMILY, CQ1).addColumn(FAMILY, CQ2));
|
||||||
|
assertEquals(THREADS * (1 + UPPER) * UPPER / 2, Bytes.toLong(result.getValue(FAMILY, CQ1)));
|
||||||
|
assertEquals(THREADS * (1 + UPPER) * UPPER, Bytes.toLong(result.getValue(FAMILY, CQ2)));
|
||||||
|
}
|
||||||
|
|
||||||
|
protected final void doIncrement(int sleepSteps) throws InterruptedException {
|
||||||
|
Thread[] threads = IntStream.range(0, THREADS).mapToObj(i -> new Thread(() -> {
|
||||||
|
try {
|
||||||
|
increment(sleepSteps);
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new UncheckedIOException(e);
|
||||||
|
}
|
||||||
|
}, "increment-" + i)).toArray(Thread[]::new);
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.start();
|
||||||
|
}
|
||||||
|
for (Thread thread : threads) {
|
||||||
|
thread.join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,4 +1,4 @@
|
||||||
/*
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -19,6 +19,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.coprocessor;
|
package org.apache.hadoop.hbase.coprocessor;
|
||||||
|
|
||||||
|
import edu.umd.cs.findbugs.annotations.NonNull;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -144,10 +146,10 @@ public interface RegionObserver {
|
||||||
* Called before a Store's memstore is flushed to disk.
|
* Called before a Store's memstore is flushed to disk.
|
||||||
* @param c the environment provided by the region server
|
* @param c the environment provided by the region server
|
||||||
* @param store the store where flush is being requested
|
* @param store the store where flush is being requested
|
||||||
* @param scanner the scanner over existing data used in the store file
|
* @param scanner the scanner over existing data used in the memstore
|
||||||
* @param tracker tracker used to track the life cycle of a flush
|
* @param tracker tracker used to track the life cycle of a flush
|
||||||
* @return the scanner to use during compaction. Should not be {@code null}
|
* @return the scanner to use during flush. Should not be {@code null} unless the implementation
|
||||||
* unless the implementation is writing new store files on its own.
|
* is writing new store files on its own.
|
||||||
*/
|
*/
|
||||||
default InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
default InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||||
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
|
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
|
||||||
|
@ -173,6 +175,51 @@ public interface RegionObserver {
|
||||||
default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
|
||||||
StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {}
|
StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called before in memory compaction started.
|
||||||
|
* @param c the environment provided by the region server
|
||||||
|
* @param store the store where in memory compaction is being requested
|
||||||
|
*/
|
||||||
|
default void preMemStoreCompaction(ObserverContext<RegionCoprocessorEnvironment> c, Store store)
|
||||||
|
throws IOException {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called before we open store scanner for in memory compaction. You can use the {@code options}
|
||||||
|
* to change max versions and TTL for the scanner being opened. Notice that this method will only
|
||||||
|
* be called when you use {@code eager} mode. For {@code basic} mode we will not drop any cells
|
||||||
|
* thus we do not open a store scanner.
|
||||||
|
* @param c the environment provided by the region server
|
||||||
|
* @param store the store where in memory compaction is being requested
|
||||||
|
* @param options used to change max versions and TTL for the scanner being opened
|
||||||
|
*/
|
||||||
|
default void preMemStoreCompactionCompactScannerOpen(
|
||||||
|
ObserverContext<RegionCoprocessorEnvironment> c, Store store, ScanOptions options)
|
||||||
|
throws IOException {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called before we do in memory compaction. Notice that this method will only be called when you
|
||||||
|
* use {@code eager} mode. For {@code basic} mode we will not drop any cells thus there is no
|
||||||
|
* {@link InternalScanner}.
|
||||||
|
* @param c the environment provided by the region server
|
||||||
|
* @param store the store where in memory compaction is being executed
|
||||||
|
* @param scanner the scanner over existing data used in the memstore segments being compact
|
||||||
|
* @return the scanner to use during in memory compaction. Must be non-null.
|
||||||
|
*/
|
||||||
|
@NonNull
|
||||||
|
default InternalScanner preMemStoreCompactionCompact(
|
||||||
|
ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner)
|
||||||
|
throws IOException {
|
||||||
|
return scanner;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Called after the in memory compaction is finished.
|
||||||
|
* @param c the environment provided by the region server
|
||||||
|
* @param store the store where in memory compaction is being executed
|
||||||
|
*/
|
||||||
|
default void postMemStoreCompaction(ObserverContext<RegionCoprocessorEnvironment> c, Store store)
|
||||||
|
throws IOException {}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
|
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
|
||||||
* available candidates. To alter the files used for compaction, you may mutate the passed in list
|
* available candidates. To alter the files used for compaction, you may mutate the passed in list
|
||||||
|
|
|
@ -96,8 +96,18 @@ public class MemStoreCompactor {
|
||||||
LOG.debug("Starting the In-Memory Compaction for store "
|
LOG.debug("Starting the In-Memory Compaction for store "
|
||||||
+ compactingMemStore.getStore().getColumnFamilyName());
|
+ compactingMemStore.getStore().getColumnFamilyName());
|
||||||
}
|
}
|
||||||
|
HStore store = compactingMemStore.getStore();
|
||||||
doCompaction();
|
RegionCoprocessorHost cpHost = store.getCoprocessorHost();
|
||||||
|
if (cpHost != null) {
|
||||||
|
cpHost.preMemStoreCompaction(store);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
doCompaction();
|
||||||
|
} finally {
|
||||||
|
if (cpHost != null) {
|
||||||
|
cpHost.postMemStoreCompaction(store);
|
||||||
|
}
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -23,12 +23,18 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.NoSuchElementException;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.Cell;
|
import org.apache.hadoop.hbase.Cell;
|
||||||
import org.apache.hadoop.hbase.CellComparator;
|
import org.apache.hadoop.hbase.CellComparator;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.com.google.common.io.Closeables;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
|
* The MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
|
||||||
* and performs the scan for compaction operation meaning it is based on SQM
|
* and performs the scan for compaction operation meaning it is based on SQM
|
||||||
|
@ -36,12 +42,14 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator {
|
public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator {
|
||||||
|
|
||||||
private List<Cell> kvs = new ArrayList<>();
|
private static final Log LOG = LogFactory.getLog(MemStoreCompactorSegmentsIterator.class);
|
||||||
private boolean hasMore;
|
|
||||||
|
private final List<Cell> kvs = new ArrayList<>();
|
||||||
|
private boolean hasMore = true;
|
||||||
private Iterator<Cell> kvsIterator;
|
private Iterator<Cell> kvsIterator;
|
||||||
|
|
||||||
// scanner on top of pipeline scanner that uses ScanQueryMatcher
|
// scanner on top of pipeline scanner that uses ScanQueryMatcher
|
||||||
private StoreScanner compactingScanner;
|
private InternalScanner compactingScanner;
|
||||||
|
|
||||||
// C-tor
|
// C-tor
|
||||||
public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
|
public MemStoreCompactorSegmentsIterator(List<ImmutableSegment> segments,
|
||||||
|
@ -56,44 +64,34 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
|
||||||
// build the scanner based on Query Matcher
|
// build the scanner based on Query Matcher
|
||||||
// reinitialize the compacting scanner for each instance of iterator
|
// reinitialize the compacting scanner for each instance of iterator
|
||||||
compactingScanner = createScanner(store, scanners);
|
compactingScanner = createScanner(store, scanners);
|
||||||
|
refillKVS();
|
||||||
hasMore = compactingScanner.next(kvs, scannerContext);
|
|
||||||
|
|
||||||
if (!kvs.isEmpty()) {
|
|
||||||
kvsIterator = kvs.iterator();
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean hasNext() {
|
public boolean hasNext() {
|
||||||
if (kvsIterator == null) { // for the case when the result is empty
|
if (kvsIterator == null) { // for the case when the result is empty
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (!kvsIterator.hasNext()) {
|
// return true either we have cells in buffer or we can get more.
|
||||||
// refillKVS() method should be invoked only if !kvsIterator.hasNext()
|
return kvsIterator.hasNext() || refillKVS();
|
||||||
if (!refillKVS()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return kvsIterator.hasNext();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Cell next() {
|
public Cell next() {
|
||||||
if (kvsIterator == null) { // for the case when the result is empty
|
if (!hasNext()) {
|
||||||
return null;
|
throw new NoSuchElementException();
|
||||||
}
|
}
|
||||||
if (!kvsIterator.hasNext()) {
|
return kvsIterator.next();
|
||||||
// refillKVS() method should be invoked only if !kvsIterator.hasNext()
|
|
||||||
if (!refillKVS()) return null;
|
|
||||||
}
|
|
||||||
return (!hasMore) ? null : kvsIterator.next();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void close() {
|
public void close() {
|
||||||
compactingScanner.close();
|
try {
|
||||||
|
compactingScanner.close();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("close store scanner failed", e);
|
||||||
|
}
|
||||||
compactingScanner = null;
|
compactingScanner = null;
|
||||||
|
kvs.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -105,39 +103,64 @@ public class MemStoreCompactorSegmentsIterator extends MemStoreSegmentsIterator
|
||||||
* Creates the scanner for compacting the pipeline.
|
* Creates the scanner for compacting the pipeline.
|
||||||
* @return the scanner
|
* @return the scanner
|
||||||
*/
|
*/
|
||||||
private StoreScanner createScanner(HStore store, List<KeyValueScanner> scanners)
|
private InternalScanner createScanner(HStore store, List<KeyValueScanner> scanners)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
// FIXME: This is the old comment 'Get all available versions'
|
InternalScanner scanner = null;
|
||||||
// But actually if we really reset the ScanInfo to get all available versions then lots of UTs
|
boolean success = false;
|
||||||
// will fail
|
try {
|
||||||
return new StoreScanner(store, store.getScanInfo(), scanners, ScanType.COMPACT_RETAIN_DELETES,
|
RegionCoprocessorHost cpHost = store.getCoprocessorHost();
|
||||||
store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
|
ScanInfo scanInfo;
|
||||||
|
if (cpHost != null) {
|
||||||
|
scanInfo = cpHost.preMemStoreCompactionCompactScannerOpen(store);
|
||||||
|
} else {
|
||||||
|
scanInfo = store.getScanInfo();
|
||||||
|
}
|
||||||
|
scanner = new StoreScanner(store, scanInfo, scanners, ScanType.COMPACT_RETAIN_DELETES,
|
||||||
|
store.getSmallestReadPoint(), HConstants.OLDEST_TIMESTAMP);
|
||||||
|
if (cpHost != null) {
|
||||||
|
InternalScanner scannerFromCp = cpHost.preMemStoreCompactionCompact(store, scanner);
|
||||||
|
if (scannerFromCp == null) {
|
||||||
|
throw new CoprocessorException("Got a null InternalScanner when calling" +
|
||||||
|
" preMemStoreCompactionCompact which is not acceptable");
|
||||||
|
}
|
||||||
|
success = true;
|
||||||
|
return scannerFromCp;
|
||||||
|
} else {
|
||||||
|
success = true;
|
||||||
|
return scanner;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (!success) {
|
||||||
|
Closeables.close(scanner, true);
|
||||||
|
scanners.forEach(KeyValueScanner::close);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Refill kev-value set (should be invoked only when KVS is empty)
|
/*
|
||||||
* Returns true if KVS is non-empty */
|
* Refill kev-value set (should be invoked only when KVS is empty) Returns true if KVS is
|
||||||
|
* non-empty
|
||||||
|
*/
|
||||||
private boolean refillKVS() {
|
private boolean refillKVS() {
|
||||||
kvs.clear(); // clear previous KVS, first initiated in the constructor
|
// if there is nothing expected next in compactingScanner
|
||||||
if (!hasMore) { // if there is nothing expected next in compactingScanner
|
if (!hasMore) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
// clear previous KVS, first initiated in the constructor
|
||||||
try { // try to get next KVS
|
kvs.clear();
|
||||||
hasMore = compactingScanner.next(kvs, scannerContext);
|
for (;;) {
|
||||||
} catch (IOException ie) {
|
try {
|
||||||
throw new IllegalStateException(ie);
|
hasMore = compactingScanner.next(kvs, scannerContext);
|
||||||
}
|
} catch (IOException e) {
|
||||||
|
// should not happen as all data are in memory
|
||||||
if (!kvs.isEmpty() ) {// is the new KVS empty ?
|
throw new IllegalStateException(e);
|
||||||
kvsIterator = kvs.iterator();
|
}
|
||||||
return true;
|
if (!kvs.isEmpty()) {
|
||||||
} else {
|
kvsIterator = kvs.iterator();
|
||||||
// KVS is empty, but hasMore still true?
|
return true;
|
||||||
if (hasMore) { // try to move to next row
|
} else if (!hasMore) {
|
||||||
return refillKVS();
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
return hasMore;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -776,6 +776,61 @@ public class RegionCoprocessorHost
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked before in memory compaction.
|
||||||
|
*/
|
||||||
|
public void preMemStoreCompaction(HStore store) throws IOException {
|
||||||
|
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
|
||||||
|
@Override
|
||||||
|
public void call(RegionObserver observer) throws IOException {
|
||||||
|
observer.preMemStoreCompaction(this, store);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked before create StoreScanner for in memory compaction.
|
||||||
|
*/
|
||||||
|
public ScanInfo preMemStoreCompactionCompactScannerOpen(HStore store) throws IOException {
|
||||||
|
CustomizedScanInfoBuilder builder = new CustomizedScanInfoBuilder(store.getScanInfo());
|
||||||
|
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
|
||||||
|
@Override
|
||||||
|
public void call(RegionObserver observer) throws IOException {
|
||||||
|
observer.preMemStoreCompactionCompactScannerOpen(this, store, builder);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return builder.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked before compacting memstore.
|
||||||
|
*/
|
||||||
|
public InternalScanner preMemStoreCompactionCompact(HStore store, InternalScanner scanner)
|
||||||
|
throws IOException {
|
||||||
|
if (coprocEnvironments.isEmpty()) {
|
||||||
|
return scanner;
|
||||||
|
}
|
||||||
|
return execOperationWithResult(new ObserverOperationWithResult<RegionObserver, InternalScanner>(
|
||||||
|
regionObserverGetter, scanner) {
|
||||||
|
@Override
|
||||||
|
public InternalScanner call(RegionObserver observer) throws IOException {
|
||||||
|
return observer.preMemStoreCompactionCompact(this, store, getResult());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Invoked after in memory compaction.
|
||||||
|
*/
|
||||||
|
public void postMemStoreCompaction(HStore store) throws IOException {
|
||||||
|
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperationWithoutResult() {
|
||||||
|
@Override
|
||||||
|
public void call(RegionObserver observer) throws IOException {
|
||||||
|
observer.postMemStoreCompaction(this, store);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invoked after a memstore flush
|
* Invoked after a memstore flush
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
|
|
@ -268,9 +268,9 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
|
||||||
private static final Scan SCAN_FOR_COMPACTION = new Scan();
|
private static final Scan SCAN_FOR_COMPACTION = new Scan();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Used for compactions.
|
* Used for store file compaction and memstore compaction.
|
||||||
* <p>
|
* <p>
|
||||||
* Opens a scanner across specified StoreFiles.
|
* Opens a scanner across specified StoreFiles/MemStoreSegments.
|
||||||
* @param store who we scan
|
* @param store who we scan
|
||||||
* @param scanners ancillary scanners
|
* @param scanners ancillary scanners
|
||||||
* @param smallestReadPoint the readPoint that we should use for tracking versions
|
* @param smallestReadPoint the readPoint that we should use for tracking versions
|
||||||
|
|
Loading…
Reference in New Issue