HBASE-18905 Allow CPs to request flush on Region and know the completion of the requested flush

This commit is contained in:
zhangduo 2017-10-25 11:00:44 +08:00
parent c2dbef1465
commit ca79a91566
34 changed files with 589 additions and 215 deletions

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
@ -188,7 +189,7 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return wrap(scanner);
}

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.OperationStatus;
@ -123,37 +124,43 @@ public interface RegionObserver {
/**
* Called before the memstore is flushed to disk.
* @param c the environment provided by the region server
* @param tracker tracker used to track the life cycle of a flush
*/
default void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {}
default void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {}
/**
* Called before a Store's memstore is flushed to disk.
* @param c the environment provided by the region server
* @param store the store where compaction is being requested
* @param scanner the scanner over existing data used in the store file
* @param tracker tracker used to track the life cycle of a flush
* @return the scanner to use during compaction. Should not be {@code null}
* unless the implementation is writing new store files on its own.
*/
default InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return scanner;
}
/**
* Called after the memstore is flushed to disk.
* @param c the environment provided by the region server
* @param tracker tracker used to track the life cycle of a flush
* @throws IOException if an error occurred on the coprocessor
*/
default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {}
default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {}
/**
* Called after a Store's memstore is flushed to disk.
* @param c the environment provided by the region server
* @param store the store being flushed
* @param resultFile the new store file written out during compaction
* @param tracker tracker used to track the life cycle of a flush
*/
default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile) throws IOException {}
StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {}
/**
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -100,14 +101,15 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
*/
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController) throws IOException {
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
long cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint);
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}

View File

@ -45,14 +45,15 @@ public class DefaultStoreFlusher extends StoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController) throws IOException {
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
ArrayList<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint);
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Used to track flush execution.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface FlushLifeCycleTracker {
static FlushLifeCycleTracker DUMMY = new FlushLifeCycleTracker() {
};
/**
* Called if the flush request fails for some reason.
*/
default void notExecuted(String reason) {
}
/**
* Called before flush is executed.
*/
default void beforeExecution() {
}
/**
* Called after flush is executed.
*/
default void afterExecution() {
}
}

View File

@ -33,7 +33,7 @@ public interface FlushRequester {
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
* rolling.
*/
void requestFlush(HRegion region, boolean forceFlushAllStores);
void requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker);
/**
* Tell the listener the cache needs to be flushed after a delay

View File

@ -2203,7 +2203,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/
// TODO HBASE-18905. We might have to expose a requestFlush API for CPs
public FlushResult flush(boolean force) throws IOException {
return flushcache(force, false);
return flushcache(force, false, FlushLifeCycleTracker.DUMMY);
}
public static interface FlushResult {
@ -2241,6 +2241,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* time-sensitive thread.
* @param forceFlushAllStores whether we want to flush all stores
* @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
* @param tracker used to track the life cycle of this flush
* @return whether the flush is success and whether the region needs compacting
*
* @throws IOException general io exceptions
@ -2248,8 +2249,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* because a Snapshot was not properly persisted. The region is put in closing mode, and the
* caller MUST abort after this.
*/
public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker)
throws IOException {
public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
// fail-fast instead of waiting on the lock
if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing";
@ -2269,7 +2270,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-flush hooks");
coprocessorHost.preFlush();
coprocessorHost.preFlush(tracker);
}
// TODO: this should be managed within memstore with the snapshot, updated only after flush
// successful
@ -2298,11 +2299,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Collection<HStore> specificStoresToFlush =
forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
FlushResultImpl fs =
internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker);
internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker);
if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks");
coprocessorHost.postFlush();
coprocessorHost.postFlush(tracker);
}
if(fs.isFlushSucceeded()) {
@ -2398,7 +2399,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @see #internalFlushcache(Collection, MonitoredTask, boolean)
*/
private FlushResult internalFlushcache(MonitoredTask status) throws IOException {
return internalFlushcache(stores.values(), status, false);
return internalFlushcache(stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
}
/**
@ -2406,9 +2407,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean)
*/
private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status,
boolean writeFlushWalMarker) throws IOException {
boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, status,
writeFlushWalMarker);
writeFlushWalMarker, tracker);
}
/**
@ -2429,10 +2430,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of WAL is required.
*/
protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, Collection<HStore> storesToFlush,
MonitoredTask status, boolean writeFlushWalMarker) throws IOException {
PrepareFlushResult result
= internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
protected FlushResultImpl internalFlushcache(WAL wal, long myseqid,
Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
PrepareFlushResult result =
internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker, tracker);
if (result.result == null) {
return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
} else {
@ -2443,8 +2445,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
justification="FindBugs seems confused about trxId")
protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
throws IOException {
Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
FlushLifeCycleTracker tracker) throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted...");
@ -2469,9 +2471,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (wal != null) {
writeEntry = mvcc.begin();
long flushOpSeqId = writeEntry.getWriteNumber();
FlushResultImpl flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY,
flushOpSeqId, "Nothing to flush",
writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
FlushResultImpl flushResult =
new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId,
"Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
mvcc.completeAndWait(writeEntry);
// Set to null so we don't complete it again down in finally block.
writeEntry = null;
@ -2547,8 +2549,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
MemStoreSize flushableSize = s.getFlushableSize();
totalSizeOfFlushableStores.incMemStoreSize(flushableSize);
storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(),
s.createFlushContext(flushOpSeqId));
committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); // for writing stores to WAL
s.createFlushContext(flushOpSeqId, tracker));
// for writing stores to WAL
committedFiles.put(s.getColumnFamilyDescriptor().getName(), null);
storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), flushableSize);
}
@ -4079,29 +4082,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException {
if(memstoreTotalSize > this.getMemStoreFlushSize()) {
requestFlush();
}
}
private void requestFlush() {
if (this.rsServices == null) {
return;
}
synchronized (writestate) {
if (this.writestate.isFlushRequested()) {
return;
}
writestate.flushRequested = true;
}
// Make request outside of synchronize block; HBASE-818.
this.rsServices.getFlushRequester().requestFlush(this, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
}
}
/*
* @param size
* @return True if size is over the flush threshold
@ -4216,7 +4196,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, stores.values(), status, false);
internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
}
// Now delete the content of recovered edits. We're done w/ them.
if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
@ -4400,7 +4380,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
flush = isFlushSize(this.addAndGetMemStoreSize(memstoreSize));
if (flush) {
internalFlushcache(null, currentEditSeqId, stores.values(), status, false);
internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
FlushLifeCycleTracker.DUMMY);
}
if (coprocessorHost != null) {
@ -4603,8 +4584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// we can just snapshot our memstores and continue as normal.
// invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal
PrepareFlushResult prepareResult = internalPrepareFlushCache(null,
flushSeqId, storesToFlush, status, false);
PrepareFlushResult prepareResult = internalPrepareFlushCache(null, flushSeqId,
storesToFlush, status, false, FlushLifeCycleTracker.DUMMY);
if (prepareResult.result == null) {
// save the PrepareFlushResult so that we can use it later from commit flush
this.writestate.flushing = true;
@ -4818,7 +4799,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
StoreFlushContext ctx = null;
long startTime = EnvironmentEdgeManager.currentTime();
if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
ctx = store.createFlushContext(flush.getFlushSequenceNumber());
ctx = store.createFlushContext(flush.getFlushSequenceNumber(), FlushLifeCycleTracker.DUMMY);
} else {
ctx = prepareFlushResult.storeFlushCtxs.get(family);
startTime = prepareFlushResult.startTime;
@ -4878,7 +4859,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throws IOException {
MemStoreSize flushableSize = s.getFlushableSize();
this.decrMemStoreSize(flushableSize);
StoreFlushContext ctx = s.createFlushContext(currentSeqId);
StoreFlushContext ctx = s.createFlushContext(currentSeqId, FlushLifeCycleTracker.DUMMY);
ctx.prepare();
ctx.abort();
return flushableSize;
@ -5724,7 +5705,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
// a sequence id that we can be sure is beyond the last hfile written).
if (assignSeqId) {
FlushResult fs = flushcache(true, false);
FlushResult fs = flushcache(true, false, FlushLifeCycleTracker.DUMMY);
if (fs.isFlushSucceeded()) {
seqId = ((FlushResultImpl)fs).flushSequenceId;
} else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
@ -8234,4 +8215,41 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker,
RpcServer.getRequestUser().orElse(null));
}
private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException {
if (memstoreTotalSize > this.getMemStoreFlushSize()) {
requestFlush();
}
}
private void requestFlush() {
if (this.rsServices == null) {
return;
}
requestFlush0(FlushLifeCycleTracker.DUMMY);
}
private void requestFlush0(FlushLifeCycleTracker tracker) {
boolean shouldFlush = false;
synchronized (writestate) {
if (!this.writestate.isFlushRequested()) {
shouldFlush = true;
writestate.flushRequested = true;
}
}
if (shouldFlush) {
// Make request outside of synchronize block; HBASE-818.
this.rsServices.getFlushRequester().requestFlush(this, false, tracker);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
}
} else {
tracker.notExecuted("Flush already requested on " + this);
}
}
@Override
public void requestFlush(FlushLifeCycleTracker tracker) throws IOException {
requestFlush0(tracker);
}
}

View File

@ -952,7 +952,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* @throws IOException if exception occurs during process
*/
protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
MonitoredTask status, ThroughputController throughputController) throws IOException {
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
// If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around.
@ -963,7 +964,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
for (int i = 0; i < flushRetriesNumber; i++) {
try {
List<Path> pathNames =
flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController);
flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker);
Path lastPathName = null;
try {
for (Path pathName : pathNames) {
@ -2152,13 +2153,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
}
}
public StoreFlushContext createFlushContext(long cacheFlushId) {
return new StoreFlusherImpl(cacheFlushId);
public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) {
return new StoreFlusherImpl(cacheFlushId, tracker);
}
private final class StoreFlusherImpl implements StoreFlushContext {
private long cacheFlushSeqNum;
private final FlushLifeCycleTracker tracker;
private final long cacheFlushSeqNum;
private MemStoreSnapshot snapshot;
private List<Path> tempFiles;
private List<Path> committedFiles;
@ -2166,8 +2168,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
private long cacheFlushSize;
private long outputFileSize;
private StoreFlusherImpl(long cacheFlushSeqNum) {
private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
this.cacheFlushSeqNum = cacheFlushSeqNum;
this.tracker = tracker;
}
/**
@ -2188,7 +2191,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
RegionServerServices rsService = region.getRegionServerServices();
ThroughputController throughputController =
rsService == null ? null : rsService.getFlushThroughputController();
tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController);
tempFiles =
HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker);
}
@Override
@ -2220,7 +2224,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
for (HStoreFile sf : storeFiles) {
if (HStore.this.getCoprocessorHost() != null) {
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf);
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker);
}
committedFiles.add(sf.getPath());
}

View File

@ -210,7 +210,7 @@ public class LogRoller extends HasThread implements Closeable {
requester = this.services.getFlushRequester();
if (requester != null) {
// force flushing all stores to clean old logs
requester.requestFlush(r, true);
requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
scheduled = true;
}
}

View File

@ -18,8 +18,6 @@
*/
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.util.StringUtils.humanReadableInt;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
@ -44,20 +42,20 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
/**
* Thread that flushes cache on request
@ -183,12 +181,12 @@ class MemStoreFlusher implements FlushRequester {
ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
(bestRegionReplica.getMemStoreSize()
> secondaryMultiplier * regionToFlush.getMemStoreSize()))) {
LOG.info("Refreshing storefiles of region " + bestRegionReplica
+ " due to global heap pressure. Total memstore datasize="
+ StringUtils
.humanReadableInt(server.getRegionServerAccounting().getGlobalMemStoreDataSize())
+ " memstore heap size=" + StringUtils.humanReadableInt(
server.getRegionServerAccounting().getGlobalMemStoreHeapSize()));
LOG.info("Refreshing storefiles of region " + bestRegionReplica +
" due to global heap pressure. Total memstore datasize=" +
TraditionalBinaryPrefix.long2String(
server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) +
" memstore heap size=" + TraditionalBinaryPrefix.long2String(
server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
if (!flushedOne) {
LOG.info("Excluding secondary region " + bestRegionReplica +
@ -196,12 +194,13 @@ class MemStoreFlusher implements FlushRequester {
excludedRegions.add(bestRegionReplica);
}
} else {
LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. "
+ "Total Memstore size="
+ humanReadableInt(server.getRegionServerAccounting().getGlobalMemStoreDataSize())
+ ", Region memstore size="
+ humanReadableInt(regionToFlush.getMemStoreSize()));
flushedOne = flushRegion(regionToFlush, true, false);
LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " +
"Total Memstore size=" +
TraditionalBinaryPrefix.long2String(
server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) +
", Region memstore size=" +
TraditionalBinaryPrefix.long2String(regionToFlush.getMemStoreSize(), "", 1));
flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush +
@ -348,15 +347,17 @@ class MemStoreFlusher implements FlushRequester {
}
@Override
public void requestFlush(HRegion r, boolean forceFlushAllStores) {
public void requestFlush(HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
r.incrementFlushesQueuedCount();
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
} else {
tracker.notExecuted("Flush already requested on " + r);
}
}
}
@ -367,7 +368,8 @@ class MemStoreFlusher implements FlushRequester {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has some delay
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
FlushRegionEntry fqe =
new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
fqe.requeue(delay);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
@ -463,7 +465,7 @@ class MemStoreFlusher implements FlushRequester {
return true;
}
}
return flushRegion(region, false, fqe.isForceFlushAllStores());
return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
}
/**
@ -478,8 +480,8 @@ class MemStoreFlusher implements FlushRequester {
* false, there will be accompanying log messages explaining why the region was
* not flushed.
*/
private boolean flushRegion(final HRegion region, final boolean emergencyFlush,
boolean forceFlushAllStores) {
private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
FlushLifeCycleTracker tracker) {
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
@ -490,10 +492,11 @@ class MemStoreFlusher implements FlushRequester {
}
}
tracker.beforeExecution();
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flush(forceFlushAllStores);
FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
@ -523,6 +526,7 @@ class MemStoreFlusher implements FlushRequester {
} finally {
lock.readLock().unlock();
wakeUpIfBlocking();
tracker.afterExecution();
}
return true;
}
@ -732,13 +736,16 @@ class MemStoreFlusher implements FlushRequester {
private long whenToExpire;
private int requeueCount = 0;
private boolean forceFlushAllStores;
private final boolean forceFlushAllStores;
FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) {
private final FlushLifeCycleTracker tracker;
FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
this.region = r;
this.createTime = EnvironmentEdgeManager.currentTime();
this.whenToExpire = this.createTime;
this.forceFlushAllStores = forceFlushAllStores;
this.tracker = tracker;
}
/**
@ -764,6 +771,10 @@ class MemStoreFlusher implements FlushRequester {
return forceFlushAllStores;
}
public FlushLifeCycleTracker getTracker() {
return tracker;
}
/**
* @param when When to expire, when to come up out of the queue.
* Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime()

View File

@ -1611,7 +1611,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
request.getWriteFlushWalMarker() : false;
// Go behind the curtain so we can manage writing of the flush WAL marker
HRegion.FlushResultImpl flushResult = region.flushcache(true, writeFlushWalMarker);
HRegion.FlushResultImpl flushResult =
region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region,

View File

@ -468,4 +468,9 @@ public interface Region extends ConfigurationObserver {
*/
void requestCompaction(byte[] family, String why, int priority, boolean major,
CompactionLifeCycleTracker tracker) throws IOException;
/**
* Request flush on this region.
*/
void requestFlush(FlushLifeCycleTracker tracker) throws IOException;
}

View File

@ -669,13 +669,13 @@ public class RegionCoprocessorHost
* Invoked before a memstore flush
* @throws IOException
*/
public InternalScanner preFlush(HStore store, final InternalScanner scanner)
throws IOException {
return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null :
new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter) {
public InternalScanner preFlush(HStore store, InternalScanner scanner,
FlushLifeCycleTracker tracker) throws IOException {
return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null
: new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter) {
@Override
public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preFlush(this, store, getResult());
return observer.preFlush(this, store, getResult(), tracker);
}
});
}
@ -684,11 +684,11 @@ public class RegionCoprocessorHost
* Invoked before a memstore flush
* @throws IOException
*/
public void preFlush() throws IOException {
public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.preFlush(this);
observer.preFlush(this, tracker);
}
});
}
@ -697,11 +697,11 @@ public class RegionCoprocessorHost
* Invoked after a memstore flush
* @throws IOException
*/
public void postFlush() throws IOException {
public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postFlush(this);
observer.postFlush(this, tracker);
}
});
}
@ -710,11 +710,12 @@ public class RegionCoprocessorHost
* Invoked after a memstore flush
* @throws IOException
*/
public void postFlush(final HStore store, final HStoreFile storeFile) throws IOException {
public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override
public void call(RegionObserver observer) throws IOException {
observer.postFlush(this, store, storeFile);
observer.postFlush(this, store, storeFile, tracker);
}
});
}

View File

@ -57,7 +57,8 @@ abstract class StoreFlusher {
* @return List of files written. Can be empty; must not be null.
*/
public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController) throws IOException;
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException;
protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException {
@ -77,15 +78,15 @@ abstract class StoreFlusher {
* @param smallestReadPoint
* @return The scanner; null if coprocessor is canceling the flush.
*/
protected InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
long smallestReadPoint) throws IOException {
protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException {
InternalScanner scanner =
new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners,
ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
assert scanner != null;
if (store.getCoprocessorHost() != null) {
try {
return store.getCoprocessorHost().preFlush(store, scanner);
return store.getCoprocessorHost().preFlush(store, scanner, tracker);
} catch (IOException ioe) {
scanner.close();
throw ioe;

View File

@ -56,13 +56,14 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController) throws IOException {
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
List<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries
long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint);
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}

View File

@ -110,6 +110,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region;
@ -1592,7 +1593,8 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
}
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {
requirePermission(getActiveUser(c), "flush", getTableName(c.getEnvironment()), null, null,
Action.ADMIN, Action.CREATE);
}

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -147,7 +148,8 @@ public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient
}
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException {
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) throws IOException {
if (delayFlush) {
try {
if (Bytes.compareTo(e.getEnvironment().getRegionInfo().getStartKey(),

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@ -171,14 +172,14 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, InternalScanner scanner) throws IOException {
Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
ctPreFlush.incrementAndGet();
return scanner;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile) throws IOException {
Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {
ctPostFlush.incrementAndGet();
if (throwOnPostFlush.get()){
throw new IOException("throwOnPostFlush is true in postFlush");

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
@ -214,12 +215,16 @@ public class TestCoprocessorInterface {
CompactionRequest request) {
postCompactCalled = true;
}
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) {
preFlushCalled = true;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) {
postFlushCalled = true;
}

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
@ -465,7 +466,8 @@ public class TestRegionObserverInterface {
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) {
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) {
lastFlush = EnvironmentEdgeManager.currentTime();
}
}

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
@ -157,7 +158,7 @@ public class TestRegionObserverScannerOpenHook {
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return NO_DATA;
}
}

View File

@ -45,7 +45,7 @@ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return new DelegatingInternalScanner(scanner);
}

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Confirm that the function of FlushLifeCycleTracker is OK as we do not use it in our own code.
*/
@Category({ CoprocessorTests.class, MediumTests.class })
public class TestFlushLifeCycleTracker {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final TableName NAME =
TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName());
private static final byte[] CF = Bytes.toBytes("CF");
private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
private HRegion region;
private static FlushLifeCycleTracker TRACKER;
private static volatile CountDownLatch ARRIVE;
private static volatile CountDownLatch BLOCK;
public static final class FlushObserver implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {
if (TRACKER != null) {
assertSame(tracker, TRACKER);
}
}
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
if (TRACKER != null) {
assertSame(tracker, TRACKER);
}
return scanner;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {
if (TRACKER != null) {
assertSame(tracker, TRACKER);
}
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {
if (TRACKER != null) {
assertSame(tracker, TRACKER);
}
// inject here so we can make a flush request to fail because of we already have a flush
// ongoing.
CountDownLatch arrive = ARRIVE;
if (arrive != null) {
arrive.countDown();
}
try {
BLOCK.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
}
private static final class Tracker implements FlushLifeCycleTracker {
private String reason;
private boolean beforeExecutionCalled;
private boolean afterExecutionCalled;
private boolean completed = false;
@Override
public synchronized void notExecuted(String reason) {
this.reason = reason;
completed = true;
notifyAll();
}
@Override
public void beforeExecution() {
this.beforeExecutionCalled = true;
}
@Override
public synchronized void afterExecution() {
this.afterExecutionCalled = true;
completed = true;
notifyAll();
}
public synchronized void await() throws InterruptedException {
while (!completed) {
wait();
}
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster(3);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws IOException {
UTIL.getAdmin()
.createTable(TableDescriptorBuilder.newBuilder(NAME)
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF))
.addCoprocessor(FlushObserver.class.getName()).build());
region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
}
@After
public void tearDown() throws IOException {
region = null;
TRACKER = null;
UTIL.deleteTable(NAME);
}
@Test
public void test() throws IOException, InterruptedException {
try (Table table = UTIL.getConnection().getTable(NAME)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addImmutable(CF, QUALIFIER, Bytes.toBytes(i)));
}
}
Tracker tracker = new Tracker();
TRACKER = tracker;
region.requestFlush(tracker);
tracker.await();
assertNull(tracker.reason);
assertTrue(tracker.beforeExecutionCalled);
assertTrue(tracker.afterExecutionCalled);
// request flush on a region with empty memstore should still success
tracker = new Tracker();
TRACKER = tracker;
region.requestFlush(tracker);
tracker.await();
assertNull(tracker.reason);
assertTrue(tracker.beforeExecutionCalled);
assertTrue(tracker.afterExecutionCalled);
}
@Test
public void testNotExecuted() throws IOException, InterruptedException {
try (Table table = UTIL.getConnection().getTable(NAME)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addImmutable(CF, QUALIFIER, Bytes.toBytes(i)));
}
}
// here we may have overlap when calling the CP hooks so we do not assert on TRACKER
Tracker tracker1 = new Tracker();
ARRIVE = new CountDownLatch(1);
BLOCK = new CountDownLatch(1);
region.requestFlush(tracker1);
ARRIVE.await();
Tracker tracker2 = new Tracker();
region.requestFlush(tracker2);
tracker2.await();
assertNotNull(tracker2.reason);
assertFalse(tracker2.beforeExecutionCalled);
assertFalse(tracker2.afterExecutionCalled);
BLOCK.countDown();
tracker1.await();
assertNull(tracker1.reason);
assertTrue(tracker1.beforeExecutionCalled);
assertTrue(tracker1.afterExecutionCalled);
}
}

View File

@ -1,25 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@ -46,15 +55,15 @@ public class TestFlushRegionEntry {
@Test
public void testFlushRegionEntryEquality() {
HRegionInfo hri = new HRegionInfo(1, TableName.valueOf(name.getMethodName()), 0);
RegionInfo hri = RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
.setRegionId(1).setReplicaId(0).build();
HRegion r = mock(HRegion.class);
doReturn(hri).when(r).getRegionInfo();
FlushRegionEntry entry = new FlushRegionEntry(r, true);
FlushRegionEntry other = new FlushRegionEntry(r, true);
FlushRegionEntry entry = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
FlushRegionEntry other = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
assertEquals(entry.hashCode(), other.hashCode());
assertEquals(entry, other);
}
}

View File

@ -482,7 +482,7 @@ public class TestHMobStore {
* @throws IOException
*/
private static void flushStore(HMobStore store, long id) throws IOException {
StoreFlushContext storeFlushCtx = store.createFlushContext(id);
StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));

View File

@ -293,7 +293,7 @@ public class TestHRegion {
put.addColumn(COLUMN_FAMILY_BYTES, null, value);
// First put something in current memstore, which will be in snapshot after flusher.prepare()
region.put(put);
StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
// Second put something in current memstore
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
@ -337,7 +337,7 @@ public class TestHRegion {
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
// Get some random bytes.
byte [] value = Bytes.toBytes(method);
faultyLog.setStoreFlushCtx(store.createFlushContext(12345));
faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
@ -400,8 +400,8 @@ public class TestHRegion {
// save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class))).
thenReturn(null);
when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class),
Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null);
region.setCoprocessorHost(mockedCPHost);
region.put(put);
region.flush(true);
@ -567,7 +567,8 @@ public class TestHRegion {
region.put(p1);
// Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
HStore store = region.getStore(COLUMN_FAMILY_BYTES);
StoreFlushContext storeFlushCtx = store.createFlushContext(12345);
StoreFlushContext storeFlushCtx =
store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
// Now add two entries to the foreground memstore.
Put p2 = new Put(row);
@ -5626,7 +5627,7 @@ public class TestHRegion {
Put put = new Put(Bytes.toBytes("19998"));
put.addColumn(cf1, col, Bytes.toBytes("val"));
region.put(put);
region.flushcache(true, true);
region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
Put put2 = new Put(Bytes.toBytes("19997"));
put2.addColumn(cf1, col, Bytes.toBytes("val"));
region.put(put2);
@ -5642,7 +5643,7 @@ public class TestHRegion {
p.addColumn(cf1, col, Bytes.toBytes("" + i));
region.put(p);
}
region.flushcache(true, true);
region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
// create one memstore contains many rows will be skipped
// to check MemStoreScanner.seekToPreviousRow
@ -5689,7 +5690,7 @@ public class TestHRegion {
RegionScanner scanner = region.getScanner(scan);
// flush the cache. This will reset the store scanner
region.flushcache(true, true);
region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
// create one memstore contains many rows will be skipped
// to check MemStoreScanner.seekToPreviousRow

View File

@ -1207,13 +1207,15 @@ public class TestHRegionReplayEvents {
@Test
public void testWriteFlushRequestMarker() throws IOException {
// primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
FlushResultImpl result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, false);
FlushResultImpl result = (FlushResultImpl) ((HRegion) primaryRegion).flushcache(true, false,
FlushLifeCycleTracker.DUMMY);
assertNotNull(result);
assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
assertFalse(result.wroteFlushWalMarker);
// request flush again, but this time with writeFlushRequestWalMarker = true
result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, true);
result = (FlushResultImpl) ((HRegion) primaryRegion).flushcache(true, true,
FlushLifeCycleTracker.DUMMY);
assertNotNull(result);
assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
assertTrue(result.wroteFlushWalMarker);
@ -1248,7 +1250,7 @@ public class TestHRegionReplayEvents {
// Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
// triggered flush restores readsEnabled
primaryRegion.flushcache(true, true);
primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
reader = createWALReaderForPrimary();
while (true) {
WAL.Entry entry = reader.next();

View File

@ -769,7 +769,7 @@ public class TestHStore {
}
private static void flushStore(HStore store, long id) throws IOException {
StoreFlushContext storeFlushCtx = store.createFlushContext(id);
StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
@ -1081,7 +1081,7 @@ public class TestHStore {
seqId = Math.max(seqId, c.getSequenceId());
}
inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
@ -1287,7 +1287,7 @@ public class TestHStore {
quals.add(qf1);
quals.add(qf2);
quals.add(qf3);
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
MyCompactingMemStore.START_TEST.set(true);
Runnable flush = () -> {
// this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
@ -1363,7 +1363,7 @@ public class TestHStore {
myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
long snapshotId = id++;
// push older data into snapshot -- phase (1/4)
StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId);
StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
// insert current data into active -- phase (2/4)
@ -1475,7 +1475,7 @@ public class TestHStore {
store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
StoreFlushContext storeFlushCtx = store.createFlushContext(id++);
StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare();
// This shouldn't invoke another in-memory flush because the first compactor thread
// hasn't accomplished the in-memory compaction.

View File

@ -135,11 +135,11 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
Thread.sleep(1500);
// No changes should be made by tuner as we already have lot of empty space
@ -178,10 +178,10 @@ public class TestHeapMemoryManager {
// do some offheap flushes also. So there should be decrease in memstore but
// not as that when we don't have offheap flushes
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@ -226,10 +226,10 @@ public class TestHeapMemoryManager {
// do some offheap flushes also. So there should be decrease in memstore but
// not as that when we don't have offheap flushes
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@ -242,10 +242,10 @@ public class TestHeapMemoryManager {
// flushes are due to onheap overhead. This should once again call for increase in
// memstore size but that increase should be to the safe size
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@ -308,10 +308,10 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@ -322,8 +322,8 @@ public class TestHeapMemoryManager {
oldBlockCacheSize = blockCache.maxSize;
// Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@ -357,10 +357,10 @@ public class TestHeapMemoryManager {
heapMemoryManager.start(choreService);
// this should not change anything with onheap memstore
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
Thread.sleep(1500);
// No changes should be made by tuner as we already have lot of empty space
@ -444,9 +444,9 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
blockCache.evictBlock(null);
// Allow the tuner to run once and do necessary memory up
Thread.sleep(1500);
@ -455,9 +455,9 @@ public class TestHeapMemoryManager {
assertEquals(oldBlockCacheSize, blockCache.maxSize);
// Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@ -490,9 +490,9 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
// Allow the tuner to run once and do necessary memory up
@ -502,7 +502,7 @@ public class TestHeapMemoryManager {
assertEquals(oldBlockCacheSize, blockCache.maxSize);
// Flushes that block updates
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false);
memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
blockCache.evictBlock(null);
@ -805,7 +805,8 @@ public class TestHeapMemoryManager {
}
@Override
public void requestFlush(HRegion region, boolean forceFlushAllStores) {
public void requestFlush(HRegion region, boolean forceFlushAllStores,
FlushLifeCycleTracker tracker) {
this.listener.flushRequested(flushType, region);
}

View File

@ -31,20 +31,20 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -83,7 +83,8 @@ public class TestSplitWalDataLoss {
testUtil.startMiniCluster(2);
Admin admin = testUtil.getAdmin();
admin.createNamespace(namespace);
admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family)));
admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build());
testUtil.waitTableAvailable(tableName);
}
@ -136,7 +137,7 @@ public class TestSplitWalDataLoss {
long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
rs.cacheFlusher.requestFlush(spiedRegion, false);
rs.cacheFlusher.requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY);
synchronized (flushed) {
while (!flushed.booleanValue()) {
flushed.wait();

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion;
@ -650,15 +651,16 @@ public abstract class AbstractTestWALReplay {
public CustomStoreFlusher(Configuration conf, HStore store) {
super(conf, store);
}
@Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController) throws IOException {
MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests");
}
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController);
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
}
};
/**
@ -831,16 +833,14 @@ public abstract class AbstractTestWALReplay {
WAL newWal = createWAL(newConf, hbaseRootDir, logName);
final AtomicInteger flushcount = new AtomicInteger(0);
try {
final HRegion region =
new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
@Override
protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
final Collection<HStore> storesToFlush, MonitoredTask status,
boolean writeFlushWalMarker)
throws IOException {
boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
LOG.info("InternalFlushCache Invoked");
FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
Mockito.mock(MonitoredTask.class), writeFlushWalMarker);
Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker);
flushcount.incrementAndGet();
return fs;
}
@ -1117,7 +1117,7 @@ public abstract class AbstractTestWALReplay {
private HRegion r;
@Override
public void requestFlush(HRegion region, boolean force) {
public void requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) {
try {
r.flush(force);
} catch (IOException e) {
@ -1127,8 +1127,6 @@ public abstract class AbstractTestWALReplay {
@Override
public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
// TODO Auto-generated method stub
}
@Override

View File

@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.CheckPermissionsRequest;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
@ -807,7 +808,8 @@ public class TestAccessController extends SecureTestUtil {
AccessTestAction action = new AccessTestAction() {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV));
ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV),
FlushLifeCycleTracker.DUMMY);
return null;
}
};

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
@ -814,7 +815,8 @@ public class TestWithDisabledAuthorization extends SecureTestUtil {
verifyAllowed(new AccessTestAction() {
@Override
public Object run() throws Exception {
ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV));
ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV),
FlushLifeCycleTracker.DUMMY);
return null;
}
}, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE);

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
@ -311,7 +312,7 @@ public class TestCoprocessorScanPolicy {
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException {
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return wrap(store, scanner);
}