HBASE-18905 Allow CPs to request flush on Region and know the completion of the requested flush

This commit is contained in:
zhangduo 2017-10-25 11:00:44 +08:00
parent 705b3fa98c
commit 0b799fdbf0
34 changed files with 589 additions and 215 deletions

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.regionserver.ScannerContext;
@ -188,7 +189,7 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs
@Override @Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException { InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return wrap(scanner); return wrap(scanner);
} }

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.OperationStatus; import org.apache.hadoop.hbase.regionserver.OperationStatus;
@ -123,37 +124,43 @@ public interface RegionObserver {
/** /**
* Called before the memstore is flushed to disk. * Called before the memstore is flushed to disk.
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param tracker tracker used to track the life cycle of a flush
*/ */
default void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {} default void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {}
/** /**
* Called before a Store's memstore is flushed to disk. * Called before a Store's memstore is flushed to disk.
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param store the store where compaction is being requested * @param store the store where compaction is being requested
* @param scanner the scanner over existing data used in the store file * @param scanner the scanner over existing data used in the store file
* @param tracker tracker used to track the life cycle of a flush
* @return the scanner to use during compaction. Should not be {@code null} * @return the scanner to use during compaction. Should not be {@code null}
* unless the implementation is writing new store files on its own. * unless the implementation is writing new store files on its own.
*/ */
default InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException { InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return scanner; return scanner;
} }
/** /**
* Called after the memstore is flushed to disk. * Called after the memstore is flushed to disk.
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param tracker tracker used to track the life cycle of a flush
* @throws IOException if an error occurred on the coprocessor * @throws IOException if an error occurred on the coprocessor
*/ */
default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException {} default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {}
/** /**
* Called after a Store's memstore is flushed to disk. * Called after a Store's memstore is flushed to disk.
* @param c the environment provided by the region server * @param c the environment provided by the region server
* @param store the store being flushed * @param store the store being flushed
* @param resultFile the new store file written out during compaction * @param resultFile the new store file written out during compaction
* @param tracker tracker used to track the life cycle of a flush
*/ */
default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, default void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile) throws IOException {} StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {}
/** /**
* Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HMobStore;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
@ -100,14 +101,15 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher {
*/ */
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController) throws IOException { MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
ArrayList<Path> result = new ArrayList<>(); ArrayList<Path> result = new ArrayList<>();
long cellsCount = snapshot.getCellsCount(); long cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries if (cellsCount == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush. // Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint(); long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
if (scanner == null) { if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing return result; // NULL scanner returned from coprocessor hooks means skip normal processing
} }

View File

@ -45,14 +45,15 @@ public class DefaultStoreFlusher extends StoreFlusher {
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController) throws IOException { MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
ArrayList<Path> result = new ArrayList<>(); ArrayList<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount(); int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries if (cellsCount == 0) return result; // don't flush if there are no entries
// Use a store scanner to find which rows to flush. // Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint(); long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
if (scanner == null) { if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing return result; // NULL scanner returned from coprocessor hooks means skip normal processing
} }

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
/**
* Used to track flush execution.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC)
@InterfaceStability.Evolving
public interface FlushLifeCycleTracker {
static FlushLifeCycleTracker DUMMY = new FlushLifeCycleTracker() {
};
/**
* Called if the flush request fails for some reason.
*/
default void notExecuted(String reason) {
}
/**
* Called before flush is executed.
*/
default void beforeExecution() {
}
/**
* Called after flush is executed.
*/
default void afterExecution() {
}
}

View File

@ -33,7 +33,7 @@ public interface FlushRequester {
* @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log
* rolling. * rolling.
*/ */
void requestFlush(HRegion region, boolean forceFlushAllStores); void requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker);
/** /**
* Tell the listener the cache needs to be flushed after a delay * Tell the listener the cache needs to be flushed after a delay

View File

@ -2203,7 +2203,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
*/ */
// TODO HBASE-18905. We might have to expose a requestFlush API for CPs // TODO HBASE-18905. We might have to expose a requestFlush API for CPs
public FlushResult flush(boolean force) throws IOException { public FlushResult flush(boolean force) throws IOException {
return flushcache(force, false); return flushcache(force, false, FlushLifeCycleTracker.DUMMY);
} }
public static interface FlushResult { public static interface FlushResult {
@ -2241,6 +2241,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* time-sensitive thread. * time-sensitive thread.
* @param forceFlushAllStores whether we want to flush all stores * @param forceFlushAllStores whether we want to flush all stores
* @param writeFlushRequestWalMarker whether to write the flush request marker to WAL * @param writeFlushRequestWalMarker whether to write the flush request marker to WAL
* @param tracker used to track the life cycle of this flush
* @return whether the flush is success and whether the region needs compacting * @return whether the flush is success and whether the region needs compacting
* *
* @throws IOException general io exceptions * @throws IOException general io exceptions
@ -2248,8 +2249,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* because a Snapshot was not properly persisted. The region is put in closing mode, and the * because a Snapshot was not properly persisted. The region is put in closing mode, and the
* caller MUST abort after this. * caller MUST abort after this.
*/ */
public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker,
throws IOException { FlushLifeCycleTracker tracker) throws IOException {
// fail-fast instead of waiting on the lock // fail-fast instead of waiting on the lock
if (this.closing.get()) { if (this.closing.get()) {
String msg = "Skipping flush on " + this + " because closing"; String msg = "Skipping flush on " + this + " because closing";
@ -2269,7 +2270,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
if (coprocessorHost != null) { if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-flush hooks"); status.setStatus("Running coprocessor pre-flush hooks");
coprocessorHost.preFlush(); coprocessorHost.preFlush(tracker);
} }
// TODO: this should be managed within memstore with the snapshot, updated only after flush // TODO: this should be managed within memstore with the snapshot, updated only after flush
// successful // successful
@ -2298,11 +2299,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
Collection<HStore> specificStoresToFlush = Collection<HStore> specificStoresToFlush =
forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush();
FlushResultImpl fs = FlushResultImpl fs =
internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker); internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker);
if (coprocessorHost != null) { if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks"); status.setStatus("Running post-flush coprocessor hooks");
coprocessorHost.postFlush(); coprocessorHost.postFlush(tracker);
} }
if(fs.isFlushSucceeded()) { if(fs.isFlushSucceeded()) {
@ -2398,7 +2399,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @see #internalFlushcache(Collection, MonitoredTask, boolean) * @see #internalFlushcache(Collection, MonitoredTask, boolean)
*/ */
private FlushResult internalFlushcache(MonitoredTask status) throws IOException { private FlushResult internalFlushcache(MonitoredTask status) throws IOException {
return internalFlushcache(stores.values(), status, false); return internalFlushcache(stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
} }
/** /**
@ -2406,9 +2407,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean) * @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean)
*/ */
private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status, private FlushResultImpl internalFlushcache(Collection<HStore> storesToFlush, MonitoredTask status,
boolean writeFlushWalMarker) throws IOException { boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, status, return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, status,
writeFlushWalMarker); writeFlushWalMarker, tracker);
} }
/** /**
@ -2429,10 +2430,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
* @throws IOException general io exceptions * @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of WAL is required. * @throws DroppedSnapshotException Thrown when replay of WAL is required.
*/ */
protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, Collection<HStore> storesToFlush, protected FlushResultImpl internalFlushcache(WAL wal, long myseqid,
MonitoredTask status, boolean writeFlushWalMarker) throws IOException { Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
PrepareFlushResult result FlushLifeCycleTracker tracker) throws IOException {
= internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker); PrepareFlushResult result =
internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker, tracker);
if (result.result == null) { if (result.result == null) {
return internalFlushCacheAndCommit(wal, status, result, storesToFlush); return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
} else { } else {
@ -2443,8 +2445,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE", @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE",
justification="FindBugs seems confused about trxId") justification="FindBugs seems confused about trxId")
protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid,
Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker) Collection<HStore> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker,
throws IOException { FlushLifeCycleTracker tracker) throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) { if (this.rsServices != null && this.rsServices.isAborted()) {
// Don't flush when server aborting, it's unsafe // Don't flush when server aborting, it's unsafe
throw new IOException("Aborting flush because server is aborted..."); throw new IOException("Aborting flush because server is aborted...");
@ -2469,9 +2471,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (wal != null) { if (wal != null) {
writeEntry = mvcc.begin(); writeEntry = mvcc.begin();
long flushOpSeqId = writeEntry.getWriteNumber(); long flushOpSeqId = writeEntry.getWriteNumber();
FlushResultImpl flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, FlushResultImpl flushResult =
flushOpSeqId, "Nothing to flush", new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId,
writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); "Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker));
mvcc.completeAndWait(writeEntry); mvcc.completeAndWait(writeEntry);
// Set to null so we don't complete it again down in finally block. // Set to null so we don't complete it again down in finally block.
writeEntry = null; writeEntry = null;
@ -2547,8 +2549,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
MemStoreSize flushableSize = s.getFlushableSize(); MemStoreSize flushableSize = s.getFlushableSize();
totalSizeOfFlushableStores.incMemStoreSize(flushableSize); totalSizeOfFlushableStores.incMemStoreSize(flushableSize);
storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(),
s.createFlushContext(flushOpSeqId)); s.createFlushContext(flushOpSeqId, tracker));
committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); // for writing stores to WAL // for writing stores to WAL
committedFiles.put(s.getColumnFamilyDescriptor().getName(), null);
storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), flushableSize); storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), flushableSize);
} }
@ -4079,29 +4082,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
} }
private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException {
if(memstoreTotalSize > this.getMemStoreFlushSize()) {
requestFlush();
}
}
private void requestFlush() {
if (this.rsServices == null) {
return;
}
synchronized (writestate) {
if (this.writestate.isFlushRequested()) {
return;
}
writestate.flushRequested = true;
}
// Make request outside of synchronize block; HBASE-818.
this.rsServices.getFlushRequester().requestFlush(this, false);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
}
}
/* /*
* @param size * @param size
* @return True if size is over the flush threshold * @return True if size is over the flush threshold
@ -4216,7 +4196,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
if (seqid > minSeqIdForTheRegion) { if (seqid > minSeqIdForTheRegion) {
// Then we added some edits to memory. Flush and cleanup split edit files. // Then we added some edits to memory. Flush and cleanup split edit files.
internalFlushcache(null, seqid, stores.values(), status, false); internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY);
} }
// Now delete the content of recovered edits. We're done w/ them. // Now delete the content of recovered edits. We're done w/ them.
if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) {
@ -4400,7 +4380,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
} }
flush = isFlushSize(this.addAndGetMemStoreSize(memstoreSize)); flush = isFlushSize(this.addAndGetMemStoreSize(memstoreSize));
if (flush) { if (flush) {
internalFlushcache(null, currentEditSeqId, stores.values(), status, false); internalFlushcache(null, currentEditSeqId, stores.values(), status, false,
FlushLifeCycleTracker.DUMMY);
} }
if (coprocessorHost != null) { if (coprocessorHost != null) {
@ -4603,8 +4584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// we can just snapshot our memstores and continue as normal. // we can just snapshot our memstores and continue as normal.
// invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal // invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal
PrepareFlushResult prepareResult = internalPrepareFlushCache(null, PrepareFlushResult prepareResult = internalPrepareFlushCache(null, flushSeqId,
flushSeqId, storesToFlush, status, false); storesToFlush, status, false, FlushLifeCycleTracker.DUMMY);
if (prepareResult.result == null) { if (prepareResult.result == null) {
// save the PrepareFlushResult so that we can use it later from commit flush // save the PrepareFlushResult so that we can use it later from commit flush
this.writestate.flushing = true; this.writestate.flushing = true;
@ -4818,7 +4799,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
StoreFlushContext ctx = null; StoreFlushContext ctx = null;
long startTime = EnvironmentEdgeManager.currentTime(); long startTime = EnvironmentEdgeManager.currentTime();
if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) { if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) {
ctx = store.createFlushContext(flush.getFlushSequenceNumber()); ctx = store.createFlushContext(flush.getFlushSequenceNumber(), FlushLifeCycleTracker.DUMMY);
} else { } else {
ctx = prepareFlushResult.storeFlushCtxs.get(family); ctx = prepareFlushResult.storeFlushCtxs.get(family);
startTime = prepareFlushResult.startTime; startTime = prepareFlushResult.startTime;
@ -4878,7 +4859,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
throws IOException { throws IOException {
MemStoreSize flushableSize = s.getFlushableSize(); MemStoreSize flushableSize = s.getFlushableSize();
this.decrMemStoreSize(flushableSize); this.decrMemStoreSize(flushableSize);
StoreFlushContext ctx = s.createFlushContext(currentSeqId); StoreFlushContext ctx = s.createFlushContext(currentSeqId, FlushLifeCycleTracker.DUMMY);
ctx.prepare(); ctx.prepare();
ctx.abort(); ctx.abort();
return flushableSize; return flushableSize;
@ -5724,7 +5705,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
// a sequence id that we can be sure is beyond the last hfile written). // a sequence id that we can be sure is beyond the last hfile written).
if (assignSeqId) { if (assignSeqId) {
FlushResult fs = flushcache(true, false); FlushResult fs = flushcache(true, false, FlushLifeCycleTracker.DUMMY);
if (fs.isFlushSucceeded()) { if (fs.isFlushSucceeded()) {
seqId = ((FlushResultImpl)fs).flushSequenceId; seqId = ((FlushResultImpl)fs).flushSequenceId;
} else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
@ -8234,4 +8215,41 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker, rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker,
RpcServer.getRequestUser().orElse(null)); RpcServer.getRequestUser().orElse(null));
} }
private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException {
if (memstoreTotalSize > this.getMemStoreFlushSize()) {
requestFlush();
}
}
private void requestFlush() {
if (this.rsServices == null) {
return;
}
requestFlush0(FlushLifeCycleTracker.DUMMY);
}
private void requestFlush0(FlushLifeCycleTracker tracker) {
boolean shouldFlush = false;
synchronized (writestate) {
if (!this.writestate.isFlushRequested()) {
shouldFlush = true;
writestate.flushRequested = true;
}
}
if (shouldFlush) {
// Make request outside of synchronize block; HBASE-818.
this.rsServices.getFlushRequester().requestFlush(this, false, tracker);
if (LOG.isDebugEnabled()) {
LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName());
}
} else {
tracker.notExecuted("Flush already requested on " + this);
}
}
@Override
public void requestFlush(FlushLifeCycleTracker tracker) throws IOException {
requestFlush0(tracker);
}
} }

View File

@ -952,7 +952,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
* @throws IOException if exception occurs during process * @throws IOException if exception occurs during process
*/ */
protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, protected List<Path> flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot,
MonitoredTask status, ThroughputController throughputController) throws IOException { MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
// If an exception happens flushing, we let it out without clearing // If an exception happens flushing, we let it out without clearing
// the memstore snapshot. The old snapshot will be returned when we say // the memstore snapshot. The old snapshot will be returned when we say
// 'snapshot', the next time flush comes around. // 'snapshot', the next time flush comes around.
@ -963,7 +964,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
for (int i = 0; i < flushRetriesNumber; i++) { for (int i = 0; i < flushRetriesNumber; i++) {
try { try {
List<Path> pathNames = List<Path> pathNames =
flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController); flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker);
Path lastPathName = null; Path lastPathName = null;
try { try {
for (Path pathName : pathNames) { for (Path pathName : pathNames) {
@ -2152,13 +2153,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
} }
} }
public StoreFlushContext createFlushContext(long cacheFlushId) { public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) {
return new StoreFlusherImpl(cacheFlushId); return new StoreFlusherImpl(cacheFlushId, tracker);
} }
private final class StoreFlusherImpl implements StoreFlushContext { private final class StoreFlusherImpl implements StoreFlushContext {
private long cacheFlushSeqNum; private final FlushLifeCycleTracker tracker;
private final long cacheFlushSeqNum;
private MemStoreSnapshot snapshot; private MemStoreSnapshot snapshot;
private List<Path> tempFiles; private List<Path> tempFiles;
private List<Path> committedFiles; private List<Path> committedFiles;
@ -2166,8 +2168,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
private long cacheFlushSize; private long cacheFlushSize;
private long outputFileSize; private long outputFileSize;
private StoreFlusherImpl(long cacheFlushSeqNum) { private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) {
this.cacheFlushSeqNum = cacheFlushSeqNum; this.cacheFlushSeqNum = cacheFlushSeqNum;
this.tracker = tracker;
} }
/** /**
@ -2188,7 +2191,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
RegionServerServices rsService = region.getRegionServerServices(); RegionServerServices rsService = region.getRegionServerServices();
ThroughputController throughputController = ThroughputController throughputController =
rsService == null ? null : rsService.getFlushThroughputController(); rsService == null ? null : rsService.getFlushThroughputController();
tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController); tempFiles =
HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker);
} }
@Override @Override
@ -2220,7 +2224,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat
for (HStoreFile sf : storeFiles) { for (HStoreFile sf : storeFiles) {
if (HStore.this.getCoprocessorHost() != null) { if (HStore.this.getCoprocessorHost() != null) {
HStore.this.getCoprocessorHost().postFlush(HStore.this, sf); HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker);
} }
committedFiles.add(sf.getPath()); committedFiles.add(sf.getPath());
} }

View File

@ -210,7 +210,7 @@ public class LogRoller extends HasThread implements Closeable {
requester = this.services.getFlushRequester(); requester = this.services.getFlushRequester();
if (requester != null) { if (requester != null) {
// force flushing all stores to clean old logs // force flushing all stores to clean old logs
requester.requestFlush(r, true); requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY);
scheduled = true; scheduled = true;
} }
} }

View File

@ -18,8 +18,6 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.util.StringUtils.humanReadableInt;
import java.io.IOException; import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler; import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList; import java.util.ArrayList;
@ -44,20 +42,20 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix;
import org.apache.htrace.Trace; import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope; import org.apache.htrace.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions;
/** /**
* Thread that flushes cache on request * Thread that flushes cache on request
@ -183,12 +181,12 @@ class MemStoreFlusher implements FlushRequester {
ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) && ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) &&
(bestRegionReplica.getMemStoreSize() (bestRegionReplica.getMemStoreSize()
> secondaryMultiplier * regionToFlush.getMemStoreSize()))) { > secondaryMultiplier * regionToFlush.getMemStoreSize()))) {
LOG.info("Refreshing storefiles of region " + bestRegionReplica LOG.info("Refreshing storefiles of region " + bestRegionReplica +
+ " due to global heap pressure. Total memstore datasize=" " due to global heap pressure. Total memstore datasize=" +
+ StringUtils TraditionalBinaryPrefix.long2String(
.humanReadableInt(server.getRegionServerAccounting().getGlobalMemStoreDataSize()) server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) +
+ " memstore heap size=" + StringUtils.humanReadableInt( " memstore heap size=" + TraditionalBinaryPrefix.long2String(
server.getRegionServerAccounting().getGlobalMemStoreHeapSize())); server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1));
flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica);
if (!flushedOne) { if (!flushedOne) {
LOG.info("Excluding secondary region " + bestRegionReplica + LOG.info("Excluding secondary region " + bestRegionReplica +
@ -196,12 +194,13 @@ class MemStoreFlusher implements FlushRequester {
excludedRegions.add(bestRegionReplica); excludedRegions.add(bestRegionReplica);
} }
} else { } else {
LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " +
+ "Total Memstore size=" "Total Memstore size=" +
+ humanReadableInt(server.getRegionServerAccounting().getGlobalMemStoreDataSize()) TraditionalBinaryPrefix.long2String(
+ ", Region memstore size=" server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) +
+ humanReadableInt(regionToFlush.getMemStoreSize())); ", Region memstore size=" +
flushedOne = flushRegion(regionToFlush, true, false); TraditionalBinaryPrefix.long2String(regionToFlush.getMemStoreSize(), "", 1));
flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY);
if (!flushedOne) { if (!flushedOne) {
LOG.info("Excluding unflushable region " + regionToFlush + LOG.info("Excluding unflushable region " + regionToFlush +
@ -348,15 +347,17 @@ class MemStoreFlusher implements FlushRequester {
} }
@Override @Override
public void requestFlush(HRegion r, boolean forceFlushAllStores) { public void requestFlush(HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
r.incrementFlushesQueuedCount(); r.incrementFlushesQueuedCount();
synchronized (regionsInQueue) { synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) { if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush // This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately. // queue. It'll come out near immediately.
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores); FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker);
this.regionsInQueue.put(r, fqe); this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe); this.flushQueue.add(fqe);
} else {
tracker.notExecuted("Flush already requested on " + r);
} }
} }
} }
@ -367,7 +368,8 @@ class MemStoreFlusher implements FlushRequester {
synchronized (regionsInQueue) { synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) { if (!regionsInQueue.containsKey(r)) {
// This entry has some delay // This entry has some delay
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores); FlushRegionEntry fqe =
new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY);
fqe.requeue(delay); fqe.requeue(delay);
this.regionsInQueue.put(r, fqe); this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe); this.flushQueue.add(fqe);
@ -463,7 +465,7 @@ class MemStoreFlusher implements FlushRequester {
return true; return true;
} }
} }
return flushRegion(region, false, fqe.isForceFlushAllStores()); return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker());
} }
/** /**
@ -478,22 +480,23 @@ class MemStoreFlusher implements FlushRequester {
* false, there will be accompanying log messages explaining why the region was * false, there will be accompanying log messages explaining why the region was
* not flushed. * not flushed.
*/ */
private boolean flushRegion(final HRegion region, final boolean emergencyFlush, private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores,
boolean forceFlushAllStores) { FlushLifeCycleTracker tracker) {
synchronized (this.regionsInQueue) { synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region); FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available // Use the start time of the FlushRegionEntry if available
if (fqe != null && emergencyFlush) { if (fqe != null && emergencyFlush) {
// Need to remove from region from delay queue. When NOT an // Need to remove from region from delay queue. When NOT an
// emergencyFlush, then item was removed via a flushQueue.poll. // emergencyFlush, then item was removed via a flushQueue.poll.
flushQueue.remove(fqe); flushQueue.remove(fqe);
} }
} }
tracker.beforeExecution();
lock.readLock().lock(); lock.readLock().lock();
try { try {
notifyFlushRequest(region, emergencyFlush); notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flush(forceFlushAllStores); FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker);
boolean shouldCompact = flushResult.isCompactionNeeded(); boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size // We just want to check the size
boolean shouldSplit = region.checkSplit() != null; boolean shouldSplit = region.checkSplit() != null;
@ -523,6 +526,7 @@ class MemStoreFlusher implements FlushRequester {
} finally { } finally {
lock.readLock().unlock(); lock.readLock().unlock();
wakeUpIfBlocking(); wakeUpIfBlocking();
tracker.afterExecution();
} }
return true; return true;
} }
@ -732,13 +736,16 @@ class MemStoreFlusher implements FlushRequester {
private long whenToExpire; private long whenToExpire;
private int requeueCount = 0; private int requeueCount = 0;
private boolean forceFlushAllStores; private final boolean forceFlushAllStores;
FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) { private final FlushLifeCycleTracker tracker;
FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) {
this.region = r; this.region = r;
this.createTime = EnvironmentEdgeManager.currentTime(); this.createTime = EnvironmentEdgeManager.currentTime();
this.whenToExpire = this.createTime; this.whenToExpire = this.createTime;
this.forceFlushAllStores = forceFlushAllStores; this.forceFlushAllStores = forceFlushAllStores;
this.tracker = tracker;
} }
/** /**
@ -764,6 +771,10 @@ class MemStoreFlusher implements FlushRequester {
return forceFlushAllStores; return forceFlushAllStores;
} }
public FlushLifeCycleTracker getTracker() {
return tracker;
}
/** /**
* @param when When to expire, when to come up out of the queue. * @param when When to expire, when to come up out of the queue.
* Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime()

View File

@ -1611,7 +1611,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ?
request.getWriteFlushWalMarker() : false; request.getWriteFlushWalMarker() : false;
// Go behind the curtain so we can manage writing of the flush WAL marker // Go behind the curtain so we can manage writing of the flush WAL marker
HRegion.FlushResultImpl flushResult = region.flushcache(true, writeFlushWalMarker); HRegion.FlushResultImpl flushResult =
region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY);
boolean compactionNeeded = flushResult.isCompactionNeeded(); boolean compactionNeeded = flushResult.isCompactionNeeded();
if (compactionNeeded) { if (compactionNeeded) {
regionServer.compactSplitThread.requestSystemCompaction(region, regionServer.compactSplitThread.requestSystemCompaction(region,

View File

@ -468,4 +468,9 @@ public interface Region extends ConfigurationObserver {
*/ */
void requestCompaction(byte[] family, String why, int priority, boolean major, void requestCompaction(byte[] family, String why, int priority, boolean major,
CompactionLifeCycleTracker tracker) throws IOException; CompactionLifeCycleTracker tracker) throws IOException;
/**
* Request flush on this region.
*/
void requestFlush(FlushLifeCycleTracker tracker) throws IOException;
} }

View File

@ -669,13 +669,13 @@ public class RegionCoprocessorHost
* Invoked before a memstore flush * Invoked before a memstore flush
* @throws IOException * @throws IOException
*/ */
public InternalScanner preFlush(HStore store, final InternalScanner scanner) public InternalScanner preFlush(HStore store, InternalScanner scanner,
throws IOException { FlushLifeCycleTracker tracker) throws IOException {
return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null : return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null
new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter) { : new ObserverOperationWithResult<RegionObserver, InternalScanner>(regionObserverGetter) {
@Override @Override
public InternalScanner call(RegionObserver observer) throws IOException { public InternalScanner call(RegionObserver observer) throws IOException {
return observer.preFlush(this, store, getResult()); return observer.preFlush(this, store, getResult(), tracker);
} }
}); });
} }
@ -684,11 +684,11 @@ public class RegionCoprocessorHost
* Invoked before a memstore flush * Invoked before a memstore flush
* @throws IOException * @throws IOException
*/ */
public void preFlush() throws IOException { public void preFlush(FlushLifeCycleTracker tracker) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override @Override
public void call(RegionObserver observer) throws IOException { public void call(RegionObserver observer) throws IOException {
observer.preFlush(this); observer.preFlush(this, tracker);
} }
}); });
} }
@ -697,11 +697,11 @@ public class RegionCoprocessorHost
* Invoked after a memstore flush * Invoked after a memstore flush
* @throws IOException * @throws IOException
*/ */
public void postFlush() throws IOException { public void postFlush(FlushLifeCycleTracker tracker) throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override @Override
public void call(RegionObserver observer) throws IOException { public void call(RegionObserver observer) throws IOException {
observer.postFlush(this); observer.postFlush(this, tracker);
} }
}); });
} }
@ -710,11 +710,12 @@ public class RegionCoprocessorHost
* Invoked after a memstore flush * Invoked after a memstore flush
* @throws IOException * @throws IOException
*/ */
public void postFlush(final HStore store, final HStoreFile storeFile) throws IOException { public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker)
throws IOException {
execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() {
@Override @Override
public void call(RegionObserver observer) throws IOException { public void call(RegionObserver observer) throws IOException {
observer.postFlush(this, store, storeFile); observer.postFlush(this, store, storeFile, tracker);
} }
}); });
} }

View File

@ -57,7 +57,8 @@ abstract class StoreFlusher {
* @return List of files written. Can be empty; must not be null. * @return List of files written. Can be empty; must not be null.
*/ */
public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, public abstract List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController) throws IOException; MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException;
protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum,
MonitoredTask status) throws IOException { MonitoredTask status) throws IOException {
@ -77,15 +78,15 @@ abstract class StoreFlusher {
* @param smallestReadPoint * @param smallestReadPoint
* @return The scanner; null if coprocessor is canceling the flush. * @return The scanner; null if coprocessor is canceling the flush.
*/ */
protected InternalScanner createScanner(List<KeyValueScanner> snapshotScanners, protected final InternalScanner createScanner(List<KeyValueScanner> snapshotScanners,
long smallestReadPoint) throws IOException { long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException {
InternalScanner scanner = InternalScanner scanner =
new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners, new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners,
ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
assert scanner != null; assert scanner != null;
if (store.getCoprocessorHost() != null) { if (store.getCoprocessorHost() != null) {
try { try {
return store.getCoprocessorHost().preFlush(store, scanner); return store.getCoprocessorHost().preFlush(store, scanner, tracker);
} catch (IOException ioe) { } catch (IOException ioe) {
scanner.close(); scanner.close();
throw ioe; throw ioe;

View File

@ -56,13 +56,14 @@ public class StripeStoreFlusher extends StoreFlusher {
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum,
MonitoredTask status, ThroughputController throughputController) throws IOException { MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
List<Path> result = new ArrayList<>(); List<Path> result = new ArrayList<>();
int cellsCount = snapshot.getCellsCount(); int cellsCount = snapshot.getCellsCount();
if (cellsCount == 0) return result; // don't flush if there are no entries if (cellsCount == 0) return result; // don't flush if there are no entries
long smallestReadPoint = store.getSmallestReadPoint(); long smallestReadPoint = store.getSmallestReadPoint();
InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker);
if (scanner == null) { if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing return result; // NULL scanner returned from coprocessor hooks means skip normal processing
} }

View File

@ -110,6 +110,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings; import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings;
import org.apache.hadoop.hbase.regionserver.BloomType; import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
@ -1592,9 +1593,10 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor,
} }
@Override @Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c) throws IOException { public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {
requirePermission(getActiveUser(c), "flush", getTableName(c.getEnvironment()), null, null, requirePermission(getActiveUser(c), "flush", getTableName(c.getEnvironment()), null, null,
Action.ADMIN, Action.CREATE); Action.ADMIN, Action.CREATE);
} }
@Override @Override

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner;
import org.apache.hadoop.hbase.mob.MobConstants; import org.apache.hadoop.hbase.mob.MobConstants;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils; import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils;
import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
@ -147,7 +148,8 @@ public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient
} }
@Override @Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) throws IOException { public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) throws IOException {
if (delayFlush) { if (delayFlush) {
try { try {
if (Bytes.compareTo(e.getEnvironment().getRegionInfo().getStartKey(), if (Bytes.compareTo(e.getEnvironment().getRegionInfo().getStartKey(),

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
@ -171,14 +172,14 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver {
@Override @Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, InternalScanner scanner) throws IOException { Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
ctPreFlush.incrementAndGet(); ctPreFlush.incrementAndGet();
return scanner; return scanner;
} }
@Override @Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
Store store, StoreFile resultFile) throws IOException { Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {
ctPostFlush.incrementAndGet(); ctPostFlush.incrementAndGet();
if (throwOnPostFlush.get()){ if (throwOnPostFlush.get()){
throw new IOException("throwOnPostFlush is true in postFlush"); throw new IOException("throwOnPostFlush is true in postFlush");

View File

@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
@ -214,12 +215,16 @@ public class TestCoprocessorInterface {
CompactionRequest request) { CompactionRequest request) {
postCompactCalled = true; postCompactCalled = true;
} }
@Override @Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) { public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) {
preFlushCalled = true; preFlushCalled = true;
} }
@Override @Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) { public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) {
postFlushCalled = true; postFlushCalled = true;
} }

View File

@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContext;
import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext; import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext;
@ -465,7 +466,8 @@ public class TestRegionObserverInterface {
} }
@Override @Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) { public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e,
FlushLifeCycleTracker tracker) {
lastFlush = EnvironmentEdgeManager.currentTime(); lastFlush = EnvironmentEdgeManager.currentTime();
} }
} }

View File

@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.filter.FilterBase;
import org.apache.hadoop.hbase.regionserver.ChunkCreator; import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.HStore;
@ -157,7 +158,7 @@ public class TestRegionObserverScannerOpenHook {
@Override @Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException { InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return NO_DATA; return NO_DATA;
} }
} }

View File

@ -45,7 +45,7 @@ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver
@Override @Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException { InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return new DelegatingInternalScanner(scanner); return new DelegatingInternalScanner(scanner);
} }

View File

@ -0,0 +1,240 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
/**
* Confirm that the function of FlushLifeCycleTracker is OK as we do not use it in our own code.
*/
@Category({ CoprocessorTests.class, MediumTests.class })
public class TestFlushLifeCycleTracker {
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
private static final TableName NAME =
TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName());
private static final byte[] CF = Bytes.toBytes("CF");
private static final byte[] QUALIFIER = Bytes.toBytes("CQ");
private HRegion region;
private static FlushLifeCycleTracker TRACKER;
private static volatile CountDownLatch ARRIVE;
private static volatile CountDownLatch BLOCK;
public static final class FlushObserver implements RegionObserver, RegionCoprocessor {
@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}
@Override
public void preFlush(ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {
if (TRACKER != null) {
assertSame(tracker, TRACKER);
}
}
@Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
if (TRACKER != null) {
assertSame(tracker, TRACKER);
}
return scanner;
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c,
FlushLifeCycleTracker tracker) throws IOException {
if (TRACKER != null) {
assertSame(tracker, TRACKER);
}
}
@Override
public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {
if (TRACKER != null) {
assertSame(tracker, TRACKER);
}
// inject here so we can make a flush request to fail because of we already have a flush
// ongoing.
CountDownLatch arrive = ARRIVE;
if (arrive != null) {
arrive.countDown();
}
try {
BLOCK.await();
} catch (InterruptedException e) {
throw new InterruptedIOException();
}
}
}
private static final class Tracker implements FlushLifeCycleTracker {
private String reason;
private boolean beforeExecutionCalled;
private boolean afterExecutionCalled;
private boolean completed = false;
@Override
public synchronized void notExecuted(String reason) {
this.reason = reason;
completed = true;
notifyAll();
}
@Override
public void beforeExecution() {
this.beforeExecutionCalled = true;
}
@Override
public synchronized void afterExecution() {
this.afterExecutionCalled = true;
completed = true;
notifyAll();
}
public synchronized void await() throws InterruptedException {
while (!completed) {
wait();
}
}
}
@BeforeClass
public static void setUpBeforeClass() throws Exception {
UTIL.startMiniCluster(3);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
UTIL.shutdownMiniCluster();
}
@Before
public void setUp() throws IOException {
UTIL.getAdmin()
.createTable(TableDescriptorBuilder.newBuilder(NAME)
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF))
.addCoprocessor(FlushObserver.class.getName()).build());
region = UTIL.getHBaseCluster().getRegions(NAME).get(0);
}
@After
public void tearDown() throws IOException {
region = null;
TRACKER = null;
UTIL.deleteTable(NAME);
}
@Test
public void test() throws IOException, InterruptedException {
try (Table table = UTIL.getConnection().getTable(NAME)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addImmutable(CF, QUALIFIER, Bytes.toBytes(i)));
}
}
Tracker tracker = new Tracker();
TRACKER = tracker;
region.requestFlush(tracker);
tracker.await();
assertNull(tracker.reason);
assertTrue(tracker.beforeExecutionCalled);
assertTrue(tracker.afterExecutionCalled);
// request flush on a region with empty memstore should still success
tracker = new Tracker();
TRACKER = tracker;
region.requestFlush(tracker);
tracker.await();
assertNull(tracker.reason);
assertTrue(tracker.beforeExecutionCalled);
assertTrue(tracker.afterExecutionCalled);
}
@Test
public void testNotExecuted() throws IOException, InterruptedException {
try (Table table = UTIL.getConnection().getTable(NAME)) {
for (int i = 0; i < 100; i++) {
table.put(new Put(Bytes.toBytes(i)).addImmutable(CF, QUALIFIER, Bytes.toBytes(i)));
}
}
// here we may have overlap when calling the CP hooks so we do not assert on TRACKER
Tracker tracker1 = new Tracker();
ARRIVE = new CountDownLatch(1);
BLOCK = new CountDownLatch(1);
region.requestFlush(tracker1);
ARRIVE.await();
Tracker tracker2 = new Tracker();
region.requestFlush(tracker2);
tracker2.await();
assertNotNull(tracker2.reason);
assertFalse(tracker2.beforeExecutionCalled);
assertFalse(tracker2.afterExecutionCalled);
BLOCK.countDown();
tracker1.await();
assertNull(tracker1.reason);
assertTrue(tracker1.beforeExecutionCalled);
assertTrue(tracker1.afterExecutionCalled);
}
}

View File

@ -1,25 +1,34 @@
/** /**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license * Licensed to the Apache Software Foundation (ASF) under one
* agreements. See the NOTICE file distributed with this work for additional information regarding * or more contributor license agreements. See the NOTICE file
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the * distributed with this work for additional information
* "License"); you may not use this file except in compliance with the License. You may obtain a * regarding copyright ownership. The ASF licenses this file
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable * to you under the Apache License, Version 2.0 (the
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS" * "License"); you may not use this file except in compliance
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License * with the License. You may obtain a copy of the License at
* for the specific language governing permissions and limitations under the License. *
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.*; import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
import org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.Rule; import org.junit.Rule;
@ -27,7 +36,7 @@ import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName; import org.junit.rules.TestName;
@Category({RegionServerTests.class, MediumTests.class}) @Category({ RegionServerTests.class, MediumTests.class })
public class TestFlushRegionEntry { public class TestFlushRegionEntry {
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
@ -46,15 +55,15 @@ public class TestFlushRegionEntry {
@Test @Test
public void testFlushRegionEntryEquality() { public void testFlushRegionEntryEquality() {
HRegionInfo hri = new HRegionInfo(1, TableName.valueOf(name.getMethodName()), 0); RegionInfo hri = RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName()))
.setRegionId(1).setReplicaId(0).build();
HRegion r = mock(HRegion.class); HRegion r = mock(HRegion.class);
doReturn(hri).when(r).getRegionInfo(); doReturn(hri).when(r).getRegionInfo();
FlushRegionEntry entry = new FlushRegionEntry(r, true); FlushRegionEntry entry = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
FlushRegionEntry other = new FlushRegionEntry(r, true); FlushRegionEntry other = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY);
assertEquals(entry.hashCode(), other.hashCode()); assertEquals(entry.hashCode(), other.hashCode());
assertEquals(entry, other); assertEquals(entry, other);
} }
} }

View File

@ -482,7 +482,7 @@ public class TestHMobStore {
* @throws IOException * @throws IOException
*/ */
private static void flushStore(HMobStore store, long id) throws IOException { private static void flushStore(HMobStore store, long id) throws IOException {
StoreFlushContext storeFlushCtx = store.createFlushContext(id); StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));

View File

@ -293,7 +293,7 @@ public class TestHRegion {
put.addColumn(COLUMN_FAMILY_BYTES, null, value); put.addColumn(COLUMN_FAMILY_BYTES, null, value);
// First put something in current memstore, which will be in snapshot after flusher.prepare() // First put something in current memstore, which will be in snapshot after flusher.prepare()
region.put(put); region.put(put);
StoreFlushContext storeFlushCtx = store.createFlushContext(12345); StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
// Second put something in current memstore // Second put something in current memstore
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
@ -337,7 +337,7 @@ public class TestHRegion {
HStore store = region.getStore(COLUMN_FAMILY_BYTES); HStore store = region.getStore(COLUMN_FAMILY_BYTES);
// Get some random bytes. // Get some random bytes.
byte [] value = Bytes.toBytes(method); byte [] value = Bytes.toBytes(method);
faultyLog.setStoreFlushCtx(store.createFlushContext(12345)); faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY));
Put put = new Put(value); Put put = new Put(value);
put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value);
@ -400,8 +400,8 @@ public class TestHRegion {
// save normalCPHost and replaced by mockedCPHost, which will cancel flush requests // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests
RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); RegionCoprocessorHost normalCPHost = region.getCoprocessorHost();
RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class);
when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class))). when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class),
thenReturn(null); Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null);
region.setCoprocessorHost(mockedCPHost); region.setCoprocessorHost(mockedCPHost);
region.put(put); region.put(put);
region.flush(true); region.flush(true);
@ -567,7 +567,8 @@ public class TestHRegion {
region.put(p1); region.put(p1);
// Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only. // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only.
HStore store = region.getStore(COLUMN_FAMILY_BYTES); HStore store = region.getStore(COLUMN_FAMILY_BYTES);
StoreFlushContext storeFlushCtx = store.createFlushContext(12345); StoreFlushContext storeFlushCtx =
store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
// Now add two entries to the foreground memstore. // Now add two entries to the foreground memstore.
Put p2 = new Put(row); Put p2 = new Put(row);
@ -5626,7 +5627,7 @@ public class TestHRegion {
Put put = new Put(Bytes.toBytes("19998")); Put put = new Put(Bytes.toBytes("19998"));
put.addColumn(cf1, col, Bytes.toBytes("val")); put.addColumn(cf1, col, Bytes.toBytes("val"));
region.put(put); region.put(put);
region.flushcache(true, true); region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
Put put2 = new Put(Bytes.toBytes("19997")); Put put2 = new Put(Bytes.toBytes("19997"));
put2.addColumn(cf1, col, Bytes.toBytes("val")); put2.addColumn(cf1, col, Bytes.toBytes("val"));
region.put(put2); region.put(put2);
@ -5642,7 +5643,7 @@ public class TestHRegion {
p.addColumn(cf1, col, Bytes.toBytes("" + i)); p.addColumn(cf1, col, Bytes.toBytes("" + i));
region.put(p); region.put(p);
} }
region.flushcache(true, true); region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
// create one memstore contains many rows will be skipped // create one memstore contains many rows will be skipped
// to check MemStoreScanner.seekToPreviousRow // to check MemStoreScanner.seekToPreviousRow
@ -5689,7 +5690,7 @@ public class TestHRegion {
RegionScanner scanner = region.getScanner(scan); RegionScanner scanner = region.getScanner(scan);
// flush the cache. This will reset the store scanner // flush the cache. This will reset the store scanner
region.flushcache(true, true); region.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
// create one memstore contains many rows will be skipped // create one memstore contains many rows will be skipped
// to check MemStoreScanner.seekToPreviousRow // to check MemStoreScanner.seekToPreviousRow

View File

@ -1207,13 +1207,15 @@ public class TestHRegionReplayEvents {
@Test @Test
public void testWriteFlushRequestMarker() throws IOException { public void testWriteFlushRequestMarker() throws IOException {
// primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false
FlushResultImpl result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, false); FlushResultImpl result = (FlushResultImpl) ((HRegion) primaryRegion).flushcache(true, false,
FlushLifeCycleTracker.DUMMY);
assertNotNull(result); assertNotNull(result);
assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY); assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
assertFalse(result.wroteFlushWalMarker); assertFalse(result.wroteFlushWalMarker);
// request flush again, but this time with writeFlushRequestWalMarker = true // request flush again, but this time with writeFlushRequestWalMarker = true
result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, true); result = (FlushResultImpl) ((HRegion) primaryRegion).flushcache(true, true,
FlushLifeCycleTracker.DUMMY);
assertNotNull(result); assertNotNull(result);
assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY); assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY);
assertTrue(result.wroteFlushWalMarker); assertTrue(result.wroteFlushWalMarker);
@ -1248,7 +1250,7 @@ public class TestHRegionReplayEvents {
// Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from
// triggered flush restores readsEnabled // triggered flush restores readsEnabled
primaryRegion.flushcache(true, true); primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY);
reader = createWALReaderForPrimary(); reader = createWALReaderForPrimary();
while (true) { while (true) {
WAL.Entry entry = reader.next(); WAL.Entry entry = reader.next();

View File

@ -769,7 +769,7 @@ public class TestHStore {
} }
private static void flushStore(HStore store, long id) throws IOException { private static void flushStore(HStore store, long id) throws IOException {
StoreFlushContext storeFlushCtx = store.createFlushContext(id); StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class));
storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); storeFlushCtx.commit(Mockito.mock(MonitoredTask.class));
@ -1081,7 +1081,7 @@ public class TestHStore {
seqId = Math.max(seqId, c.getSequenceId()); seqId = Math.max(seqId, c.getSequenceId());
} }
inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); inputCellsBeforeSnapshot.forEach(c -> store.add(c, null));
StoreFlushContext storeFlushCtx = store.createFlushContext(id++); StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); inputCellsAfterSnapshot.forEach(c -> store.add(c, null));
int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2;
@ -1287,7 +1287,7 @@ public class TestHStore {
quals.add(qf1); quals.add(qf1);
quals.add(qf2); quals.add(qf2);
quals.add(qf3); quals.add(qf3);
StoreFlushContext storeFlushCtx = store.createFlushContext(id++); StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
MyCompactingMemStore.START_TEST.set(true); MyCompactingMemStore.START_TEST.set(true);
Runnable flush = () -> { Runnable flush = () -> {
// this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5)
@ -1363,7 +1363,7 @@ public class TestHStore {
myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing);
long snapshotId = id++; long snapshotId = id++;
// push older data into snapshot -- phase (1/4) // push older data into snapshot -- phase (1/4)
StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId); StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
// insert current data into active -- phase (2/4) // insert current data into active -- phase (2/4)
@ -1475,7 +1475,7 @@ public class TestHStore {
store.add(createCell(qf2, ts, seqId, value), memStoreSizing); store.add(createCell(qf2, ts, seqId, value), memStoreSizing);
store.add(createCell(qf3, ts, seqId, value), memStoreSizing); store.add(createCell(qf3, ts, seqId, value), memStoreSizing);
assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get());
StoreFlushContext storeFlushCtx = store.createFlushContext(id++); StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY);
storeFlushCtx.prepare(); storeFlushCtx.prepare();
// This shouldn't invoke another in-memory flush because the first compactor thread // This shouldn't invoke another in-memory flush because the first compactor thread
// hasn't accomplished the in-memory compaction. // hasn't accomplished the in-memory compaction.

View File

@ -135,11 +135,11 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService); heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
Thread.sleep(1500); Thread.sleep(1500);
// No changes should be made by tuner as we already have lot of empty space // No changes should be made by tuner as we already have lot of empty space
@ -178,10 +178,10 @@ public class TestHeapMemoryManager {
// do some offheap flushes also. So there should be decrease in memstore but // do some offheap flushes also. So there should be decrease in memstore but
// not as that when we don't have offheap flushes // not as that when we don't have offheap flushes
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@ -226,10 +226,10 @@ public class TestHeapMemoryManager {
// do some offheap flushes also. So there should be decrease in memstore but // do some offheap flushes also. So there should be decrease in memstore but
// not as that when we don't have offheap flushes // not as that when we don't have offheap flushes
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@ -242,10 +242,10 @@ public class TestHeapMemoryManager {
// flushes are due to onheap overhead. This should once again call for increase in // flushes are due to onheap overhead. This should once again call for increase in
// memstore size but that increase should be to the safe size // memstore size but that increase should be to the safe size
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize);
@ -308,10 +308,10 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService); heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize, assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@ -322,8 +322,8 @@ public class TestHeapMemoryManager {
oldBlockCacheSize = blockCache.maxSize; oldBlockCacheSize = blockCache.maxSize;
// Do some more flushes before the next run of HeapMemoryTuner // Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize, assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@ -357,10 +357,10 @@ public class TestHeapMemoryManager {
heapMemoryManager.start(choreService); heapMemoryManager.start(choreService);
// this should not change anything with onheap memstore // this should not change anything with onheap memstore
memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
Thread.sleep(1500); Thread.sleep(1500);
// No changes should be made by tuner as we already have lot of empty space // No changes should be made by tuner as we already have lot of empty space
@ -444,9 +444,9 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService); heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
blockCache.evictBlock(null); blockCache.evictBlock(null);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
Thread.sleep(1500); Thread.sleep(1500);
@ -455,9 +455,9 @@ public class TestHeapMemoryManager {
assertEquals(oldBlockCacheSize, blockCache.maxSize); assertEquals(oldBlockCacheSize, blockCache.maxSize);
// Do some more flushes before the next run of HeapMemoryTuner // Do some more flushes before the next run of HeapMemoryTuner
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize);
assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize, assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize,
@ -490,9 +490,9 @@ public class TestHeapMemoryManager {
final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); final ChoreService choreService = new ChoreService("TEST_SERVER_NAME");
heapMemoryManager.start(choreService); heapMemoryManager.start(choreService);
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
blockCache.evictBlock(null); blockCache.evictBlock(null);
blockCache.evictBlock(null); blockCache.evictBlock(null);
// Allow the tuner to run once and do necessary memory up // Allow the tuner to run once and do necessary memory up
@ -502,7 +502,7 @@ public class TestHeapMemoryManager {
assertEquals(oldBlockCacheSize, blockCache.maxSize); assertEquals(oldBlockCacheSize, blockCache.maxSize);
// Flushes that block updates // Flushes that block updates
memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK;
memStoreFlusher.requestFlush(null, false); memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY);
blockCache.evictBlock(null); blockCache.evictBlock(null);
blockCache.evictBlock(null); blockCache.evictBlock(null);
blockCache.evictBlock(null); blockCache.evictBlock(null);
@ -805,7 +805,8 @@ public class TestHeapMemoryManager {
} }
@Override @Override
public void requestFlush(HRegion region, boolean forceFlushAllStores) { public void requestFlush(HRegion region, boolean forceFlushAllStores,
FlushLifeCycleTracker tracker) {
this.listener.flushRequested(flushType, region); this.listener.flushRequested(flushType, region);
} }

View File

@ -31,20 +31,20 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult;
import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@ -83,7 +83,8 @@ public class TestSplitWalDataLoss {
testUtil.startMiniCluster(2); testUtil.startMiniCluster(2);
Admin admin = testUtil.getAdmin(); Admin admin = testUtil.getAdmin();
admin.createNamespace(namespace); admin.createNamespace(namespace);
admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family))); admin.createTable(TableDescriptorBuilder.newBuilder(tableName)
.addColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build());
testUtil.waitTableAvailable(tableName); testUtil.waitTableAvailable(tableName);
} }
@ -136,7 +137,7 @@ public class TestSplitWalDataLoss {
long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family); long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family);
LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore); LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore);
assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM); assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM);
rs.cacheFlusher.requestFlush(spiedRegion, false); rs.cacheFlusher.requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY);
synchronized (flushed) { synchronized (flushed) {
while (!flushed.booleanValue()) { while (!flushed.booleanValue()) {
flushed.wait(); flushed.wait();

View File

@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.FlushRequestListener; import org.apache.hadoop.hbase.regionserver.FlushRequestListener;
import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
@ -650,15 +651,16 @@ public abstract class AbstractTestWALReplay {
public CustomStoreFlusher(Configuration conf, HStore store) { public CustomStoreFlusher(Configuration conf, HStore store) {
super(conf, store); super(conf, store);
} }
@Override @Override
public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
MonitoredTask status, ThroughputController throughputController) throws IOException { MonitoredTask status, ThroughputController throughputController,
FlushLifeCycleTracker tracker) throws IOException {
if (throwExceptionWhenFlushing.get()) { if (throwExceptionWhenFlushing.get()) {
throw new IOException("Simulated exception by tests"); throw new IOException("Simulated exception by tests");
} }
return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController); return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker);
} }
}; };
/** /**
@ -831,16 +833,14 @@ public abstract class AbstractTestWALReplay {
WAL newWal = createWAL(newConf, hbaseRootDir, logName); WAL newWal = createWAL(newConf, hbaseRootDir, logName);
final AtomicInteger flushcount = new AtomicInteger(0); final AtomicInteger flushcount = new AtomicInteger(0);
try { try {
final HRegion region = final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
@Override @Override
protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid, protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid,
final Collection<HStore> storesToFlush, MonitoredTask status, final Collection<HStore> storesToFlush, MonitoredTask status,
boolean writeFlushWalMarker) boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException {
throws IOException {
LOG.info("InternalFlushCache Invoked"); LOG.info("InternalFlushCache Invoked");
FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush, FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush,
Mockito.mock(MonitoredTask.class), writeFlushWalMarker); Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker);
flushcount.incrementAndGet(); flushcount.incrementAndGet();
return fs; return fs;
} }
@ -1117,7 +1117,7 @@ public abstract class AbstractTestWALReplay {
private HRegion r; private HRegion r;
@Override @Override
public void requestFlush(HRegion region, boolean force) { public void requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) {
try { try {
r.flush(force); r.flush(force);
} catch (IOException e) { } catch (IOException e) {
@ -1127,8 +1127,6 @@ public abstract class AbstractTestWALReplay {
@Override @Override
public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) { public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) {
// TODO Auto-generated method stub
} }
@Override @Override

View File

@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService;
import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.CheckPermissionsRequest; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.CheckPermissionsRequest;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
@ -807,7 +808,8 @@ public class TestAccessController extends SecureTestUtil {
AccessTestAction action = new AccessTestAction() { AccessTestAction action = new AccessTestAction() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV)); ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV),
FlushLifeCycleTracker.DUMMY);
return null; return null;
} }
}; };

View File

@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment;
import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost;
@ -814,7 +815,8 @@ public class TestWithDisabledAuthorization extends SecureTestUtil {
verifyAllowed(new AccessTestAction() { verifyAllowed(new AccessTestAction() {
@Override @Override
public Object run() throws Exception { public Object run() throws Exception {
ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV)); ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV),
FlushLifeCycleTracker.DUMMY);
return null; return null;
} }
}, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE); }, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE);

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner;
import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.RegionScanner;
@ -311,7 +312,7 @@ public class TestCoprocessorScanPolicy {
@Override @Override
public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, public InternalScanner preFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store,
InternalScanner scanner) throws IOException { InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException {
return wrap(store, scanner); return wrap(store, scanner);
} }