HBASE-23968 Periodically check whether a system stop is requested in compaction by time. (#1274)

Signed-off-by: stack <stack@apache.org>
This commit is contained in:
Minwoo Kang 2020-05-05 06:45:20 +09:00 committed by GitHub
parent 1878db843c
commit 1b11ea2e1f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 289 additions and 69 deletions

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.StoreScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
@ -297,7 +298,6 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
protected boolean performCompaction(FileDetails fd, InternalScanner scanner, CellSink writer,
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
long bytesWrittenProgressForCloseCheck = 0;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
@ -321,11 +321,12 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckSizeLimit = HStore.getCloseCheckInterval();
long currentTime = EnvironmentEdgeManager.currentTime();
long lastMillis = 0;
if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime();
lastMillis = currentTime;
}
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
@ -354,7 +355,14 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
do {
hasMore = scanner.next(cells, scannerContext);
now = EnvironmentEdgeManager.currentTime();
currentTime = EnvironmentEdgeManager.currentTime();
if (LOG.isDebugEnabled()) {
now = currentTime;
}
if (closeChecker.isTimeLimit(store, currentTime)) {
progress.cancel();
return false;
}
for (Cell c : cells) {
if (compactMOBs) {
if (MobUtils.isMobReferenceCell(c)) {
@ -531,16 +539,9 @@ public class DefaultMobStoreCompactor extends DefaultCompactor {
bytesWrittenProgressForLog += len;
}
throughputController.control(compactionName, len);
// check periodically to see if a system stop is requested
if (closeCheckSizeLimit > 0) {
bytesWrittenProgressForCloseCheck += len;
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
bytesWrittenProgressForCloseCheck = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
if (closeChecker.isSizeLimit(store, len)) {
progress.cancel();
return false;
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
((ShipperListener) writer).beforeShipped();

View File

@ -2219,15 +2219,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* }
* Also in compactor.performCompaction():
* check periodically to see if a system stop is requested
* if (closeCheckInterval > 0) {
* bytesWritten += len;
* if (bytesWritten > closeCheckInterval) {
* bytesWritten = 0;
* if (!store.areWritesEnabled()) {
* progress.cancel();
* return false;
* }
* }
* if (closeChecker != null && closeChecker.isTimeLimit(store, now)) {
* progress.cancel();
* return false;
* }
* if (closeChecker != null && closeChecker.isSizeLimit(store, len)) {
* progress.cancel();
* return false;
* }
*/
public boolean compact(CompactionContext compaction, HStore store,

View File

@ -154,8 +154,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
protected CacheConfig cacheConf;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
static int closeCheckInterval = 0;
private AtomicLong storeSize = new AtomicLong();
private AtomicLong totalUncompressedBytes = new AtomicLong();
@ -297,11 +295,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
this.compactionCheckMultiplier = DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER;
}
if (HStore.closeCheckInterval == 0) {
HStore.closeCheckInterval = conf.getInt(
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}
this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
List<HStoreFile> hStoreFiles = loadStoreFiles(warmup);
// Move the storeSize calculation out of loadStoreFiles() method, because the secondary read
@ -490,13 +483,6 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
}
}
/**
* @return how many bytes to write between status checks
*/
public static int getCloseCheckInterval() {
return closeCheckInterval;
}
@Override
public ColumnFamilyDescriptor getColumnFamilyDescriptor() {
return this.family;

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Check periodically to see if a system stop is requested
*/
@InterfaceAudience.Private
public class CloseChecker {
public static final String SIZE_LIMIT_KEY = "hbase.hstore.close.check.interval";
public static final String TIME_LIMIT_KEY = "hbase.hstore.close.check.time.interval";
private final int closeCheckSizeLimit;
private final long closeCheckTimeLimit;
private long bytesWrittenProgressForCloseCheck;
private long lastCloseCheckMillis;
public CloseChecker(Configuration conf, long currentTime) {
this.closeCheckSizeLimit = conf.getInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
this.closeCheckTimeLimit = conf.getLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
this.bytesWrittenProgressForCloseCheck = 0;
this.lastCloseCheckMillis = currentTime;
}
/**
* Check periodically to see if a system stop is requested every written bytes reach size limit.
*
* @return if true, system stop.
*/
public boolean isSizeLimit(Store store, long bytesWritten) {
if (closeCheckSizeLimit <= 0) {
return false;
}
bytesWrittenProgressForCloseCheck += bytesWritten;
if (bytesWrittenProgressForCloseCheck <= closeCheckSizeLimit) {
return false;
}
bytesWrittenProgressForCloseCheck = 0;
return !store.areWritesEnabled();
}
/**
* Check periodically to see if a system stop is requested every time.
*
* @return if true, system stop.
*/
public boolean isTimeLimit(Store store, long now) {
if (closeCheckTimeLimit <= 0) {
return false;
}
final long elapsedMillis = now - lastCloseCheckMillis;
if (elapsedMillis <= closeCheckTimeLimit) {
return false;
}
lastCloseCheckMillis = now;
return !store.areWritesEnabled();
}
}

View File

@ -368,17 +368,17 @@ public abstract class Compactor<T extends CellSink> {
long smallestReadPoint, boolean cleanSeqId, ThroughputController throughputController,
boolean major, int numofFilesToCompact) throws IOException {
assert writer instanceof ShipperListener;
long bytesWrittenProgressForCloseCheck = 0;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Since scanner.next() can return 'false' but still be delivering data,
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<>();
long closeCheckSizeLimit = HStore.getCloseCheckInterval();
long currentTime = EnvironmentEdgeManager.currentTime();
long lastMillis = 0;
if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime();
lastMillis = currentTime;
}
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
@ -392,8 +392,13 @@ public abstract class Compactor<T extends CellSink> {
try {
do {
hasMore = scanner.next(cells, scannerContext);
currentTime = EnvironmentEdgeManager.currentTime();
if (LOG.isDebugEnabled()) {
now = EnvironmentEdgeManager.currentTime();
now = currentTime;
}
if (closeChecker.isTimeLimit(store, currentTime)) {
progress.cancel();
return false;
}
// output to writer:
Cell lastCleanCell = null;
@ -416,16 +421,9 @@ public abstract class Compactor<T extends CellSink> {
bytesWrittenProgressForLog += len;
}
throughputController.control(compactionName, len);
// check periodically to see if a system stop is requested
if (closeCheckSizeLimit > 0) {
bytesWrittenProgressForCloseCheck += len;
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
bytesWrittenProgressForCloseCheck = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
if (closeChecker.isSizeLimit(store, len)) {
progress.cancel();
return false;
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
if (lastCleanCell != null) {

View File

@ -43,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
import org.apache.hadoop.hbase.regionserver.ShipperListener;
import org.apache.hadoop.hbase.regionserver.StoreFileWriter;
import org.apache.hadoop.hbase.regionserver.compactions.CloseChecker;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputControlUtil;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.util.Bytes;
@ -95,7 +96,6 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
if (major) {
totalMajorCompactions.incrementAndGet();
}
long bytesWrittenProgressForCloseCheck = 0;
long bytesWrittenProgressForLog = 0;
long bytesWrittenProgressForShippedCall = 0;
// Clear old mob references
@ -121,11 +121,12 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
// we have to use a do/while loop.
List<Cell> cells = new ArrayList<>();
// Limit to "hbase.hstore.compaction.kv.max" (default 10) to avoid OOME
int closeCheckSizeLimit = HStore.getCloseCheckInterval();
long currentTime = EnvironmentEdgeManager.currentTime();
long lastMillis = 0;
if (LOG.isDebugEnabled()) {
lastMillis = EnvironmentEdgeManager.currentTime();
lastMillis = currentTime;
}
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
String compactionName = ThroughputControlUtil.getNameForThrottling(store, "compaction");
long now = 0;
boolean hasMore;
@ -170,8 +171,13 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
}
do {
hasMore = scanner.next(cells, scannerContext);
currentTime = EnvironmentEdgeManager.currentTime();
if (LOG.isDebugEnabled()) {
now = EnvironmentEdgeManager.currentTime();
now = currentTime;
}
if (closeChecker.isTimeLimit(store, currentTime)) {
progress.cancel();
return false;
}
for (Cell c : cells) {
counter++;
@ -305,16 +311,9 @@ public class FaultyMobStoreCompactor extends DefaultMobStoreCompactor {
bytesWrittenProgressForLog += len;
}
throughputController.control(compactionName, len);
// check periodically to see if a system stop is requested
if (closeCheckSizeLimit > 0) {
bytesWrittenProgressForCloseCheck += len;
if (bytesWrittenProgressForCloseCheck > closeCheckSizeLimit) {
bytesWrittenProgressForCloseCheck = 0;
if (!store.areWritesEnabled()) {
progress.cancel();
return false;
}
}
if (closeChecker.isSizeLimit(store, len)) {
progress.cancel();
return false;
}
if (kvs != null && bytesWrittenProgressForShippedCall > shippedCallSizeLimit) {
((ShipperListener) writer).beforeShipped();

View File

@ -21,6 +21,8 @@ import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY;
import static org.apache.hadoop.hbase.HBaseTestingUtility.START_KEY_BYTES;
import static org.apache.hadoop.hbase.HBaseTestingUtility.fam1;
import static org.apache.hadoop.hbase.regionserver.Store.PRIORITY_USER;
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@ -153,12 +155,11 @@ public class TestCompaction {
* @throws Exception
*/
@Test
public void testInterruptCompaction() throws Exception {
public void testInterruptCompactionBySize() throws Exception {
assertEquals(0, count());
// lower the polling interval for this test
int origWI = HStore.closeCheckInterval;
HStore.closeCheckInterval = 10*1000; // 10 KB
conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 /* 10 KB */);
try {
// Create a couple store files w/ 15KB (over 10KB interval)
@ -203,7 +204,84 @@ public class TestCompaction {
} finally {
// don't mess up future tests
r.writestate.writesEnabled = true;
HStore.closeCheckInterval = origWI;
conf.setInt(SIZE_LIMIT_KEY, 10 * 1000 * 1000 /* 10 MB */);
// Delete all Store information once done using
for (int i = 0; i < compactionThreshold; i++) {
Delete delete = new Delete(Bytes.add(STARTROW, Bytes.toBytes(i)));
byte [][] famAndQf = {COLUMN_FAMILY, null};
delete.addFamily(famAndQf[0]);
r.delete(delete);
}
r.flush(true);
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
final int ttl = 1000;
for (HStore store : this.r.stores.values()) {
ScanInfo old = store.getScanInfo();
ScanInfo si = old.customize(old.getMaxVersions(), ttl, old.getKeepDeletedCells());
store.setScanInfo(si);
}
Thread.sleep(ttl);
r.compact(true);
assertEquals(0, count());
}
}
@Test
public void testInterruptCompactionByTime() throws Exception {
assertEquals(0, count());
// lower the polling interval for this test
conf.setLong(TIME_LIMIT_KEY, 1 /* 1ms */);
try {
// Create a couple store files w/ 15KB (over 10KB interval)
int jmax = (int) Math.ceil(15.0/compactionThreshold);
byte [] pad = new byte[1000]; // 1 KB chunk
for (int i = 0; i < compactionThreshold; i++) {
Table loader = new RegionAsTable(r);
Put p = new Put(Bytes.add(STARTROW, Bytes.toBytes(i)));
p.setDurability(Durability.SKIP_WAL);
for (int j = 0; j < jmax; j++) {
p.addColumn(COLUMN_FAMILY, Bytes.toBytes(j), pad);
}
HBaseTestCase.addContent(loader, Bytes.toString(COLUMN_FAMILY));
loader.put(p);
r.flush(true);
}
HRegion spyR = spy(r);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
r.writestate.writesEnabled = false;
return invocation.callRealMethod();
}
}).when(spyR).doRegionCompactionPrep();
// force a minor compaction, but not before requesting a stop
spyR.compactStores();
// ensure that the compaction stopped, all old files are intact,
HStore s = r.getStore(COLUMN_FAMILY);
assertEquals(compactionThreshold, s.getStorefilesCount());
assertTrue(s.getStorefilesSize() > 15*1000);
// and no new store files persisted past compactStores()
// only one empty dir exists in temp dir
FileStatus[] ls = r.getFilesystem().listStatus(r.getRegionFileSystem().getTempDir());
assertEquals(1, ls.length);
Path storeTempDir =
new Path(r.getRegionFileSystem().getTempDir(), Bytes.toString(COLUMN_FAMILY));
assertTrue(r.getFilesystem().exists(storeTempDir));
ls = r.getFilesystem().listStatus(storeTempDir);
assertEquals(0, ls.length);
} finally {
// don't mess up future tests
r.writestate.writesEnabled = true;
conf.setLong(TIME_LIMIT_KEY, 10 * 1000L /* 10 s */);
// Delete all Store information once done using
for (int i = 0; i < compactionThreshold; i++) {

View File

@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.compactions;
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.SIZE_LIMIT_KEY;
import static org.apache.hadoop.hbase.regionserver.compactions.CloseChecker.TIME_LIMIT_KEY;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(SmallTests.class)
public class TestCloseChecker {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCloseChecker.class);
@Test
public void testIsClosed() {
Store enableWrite = mock(Store.class);
when(enableWrite.areWritesEnabled()).thenReturn(true);
Store disableWrite = mock(Store.class);
when(disableWrite.areWritesEnabled()).thenReturn(false);
Configuration conf = new Configuration();
long currentTime = System.currentTimeMillis();
conf.setInt(SIZE_LIMIT_KEY, 10);
conf.setLong(TIME_LIMIT_KEY, 10);
CloseChecker closeChecker = new CloseChecker(conf, currentTime);
assertFalse(closeChecker.isTimeLimit(enableWrite, currentTime));
assertFalse(closeChecker.isSizeLimit(enableWrite, 10L));
closeChecker = new CloseChecker(conf, currentTime);
assertFalse(closeChecker.isTimeLimit(enableWrite, currentTime + 11));
assertFalse(closeChecker.isSizeLimit(enableWrite, 11L));
closeChecker = new CloseChecker(conf, currentTime);
assertTrue(closeChecker.isTimeLimit(disableWrite, currentTime + 11));
assertTrue(closeChecker.isSizeLimit(disableWrite, 11L));
for (int i = 0; i < 10; i++) {
int plusTime = 5 * i;
assertFalse(closeChecker.isTimeLimit(enableWrite, currentTime + plusTime));
assertFalse(closeChecker.isSizeLimit(enableWrite, 5L));
}
closeChecker = new CloseChecker(conf, currentTime);
assertFalse(closeChecker.isTimeLimit(disableWrite, currentTime + 6));
assertFalse(closeChecker.isSizeLimit(disableWrite, 6));
assertTrue(closeChecker.isTimeLimit(disableWrite, currentTime + 12));
assertTrue(closeChecker.isSizeLimit(disableWrite, 6));
}
}