diff --git a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java index f849c8653b4..d6d66bbd5a0 100644 --- a/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java +++ b/hbase-examples/src/main/java/org/apache/hadoop/hbase/coprocessor/example/ZooKeeperScanPolicyObserver.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.ScannerContext; @@ -188,7 +189,7 @@ public class ZooKeeperScanPolicyObserver implements RegionCoprocessor, RegionObs @Override public InternalScanner preFlush(ObserverContext c, Store store, - InternalScanner scanner) throws IOException { + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { return wrap(scanner); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java index 5c89149fa42..2ca14952de5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.OperationStatus; @@ -123,37 +124,43 @@ public interface RegionObserver { /** * Called before the memstore is flushed to disk. * @param c the environment provided by the region server + * @param tracker tracker used to track the life cycle of a flush */ - default void preFlush(final ObserverContext c) throws IOException {} + default void preFlush(final ObserverContext c, + FlushLifeCycleTracker tracker) throws IOException {} /** * Called before a Store's memstore is flushed to disk. * @param c the environment provided by the region server * @param store the store where compaction is being requested * @param scanner the scanner over existing data used in the store file + * @param tracker tracker used to track the life cycle of a flush * @return the scanner to use during compaction. Should not be {@code null} * unless the implementation is writing new store files on its own. */ default InternalScanner preFlush(ObserverContext c, Store store, - InternalScanner scanner) throws IOException { + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { return scanner; } /** * Called after the memstore is flushed to disk. * @param c the environment provided by the region server + * @param tracker tracker used to track the life cycle of a flush * @throws IOException if an error occurred on the coprocessor */ - default void postFlush(ObserverContext c) throws IOException {} + default void postFlush(ObserverContext c, + FlushLifeCycleTracker tracker) throws IOException {} /** * Called after a Store's memstore is flushed to disk. * @param c the environment provided by the region server * @param store the store being flushed * @param resultFile the new store file written out during compaction + * @param tracker tracker used to track the life cycle of a flush */ default void postFlush(ObserverContext c, Store store, - StoreFile resultFile) throws IOException {} + StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException {} /** * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java index 738f3bc67e1..1bc80688de9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/DefaultMobStoreFlusher.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HMobStore; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.InternalScanner; @@ -100,14 +101,15 @@ public class DefaultMobStoreFlusher extends DefaultStoreFlusher { */ @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status, ThroughputController throughputController) throws IOException { + MonitoredTask status, ThroughputController throughputController, + FlushLifeCycleTracker tracker) throws IOException { ArrayList result = new ArrayList<>(); long cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java index 2e907e85989..06d47522ae1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java @@ -45,14 +45,15 @@ public class DefaultStoreFlusher extends StoreFlusher { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status, ThroughputController throughputController) throws IOException { + MonitoredTask status, ThroughputController throughputController, + FlushLifeCycleTracker tracker) throws IOException { ArrayList result = new ArrayList<>(); int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries // Use a store scanner to find which rows to flush. long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLifeCycleTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLifeCycleTracker.java new file mode 100644 index 00000000000..f8060232c2a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushLifeCycleTracker.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/** + * Used to track flush execution. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface FlushLifeCycleTracker { + + static FlushLifeCycleTracker DUMMY = new FlushLifeCycleTracker() { + }; + + /** + * Called if the flush request fails for some reason. + */ + default void notExecuted(String reason) { + } + + /** + * Called before flush is executed. + */ + default void beforeExecution() { + } + + /** + * Called after flush is executed. + */ + default void afterExecution() { + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java index 931a7374012..c54f771395c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRequester.java @@ -33,7 +33,7 @@ public interface FlushRequester { * @param forceFlushAllStores whether we want to flush all stores. e.g., when request from log * rolling. */ - void requestFlush(HRegion region, boolean forceFlushAllStores); + void requestFlush(HRegion region, boolean forceFlushAllStores, FlushLifeCycleTracker tracker); /** * Tell the listener the cache needs to be flushed after a delay diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 5cbf8895156..f0c9ec28f53 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2203,7 +2203,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi */ // TODO HBASE-18905. We might have to expose a requestFlush API for CPs public FlushResult flush(boolean force) throws IOException { - return flushcache(force, false); + return flushcache(force, false, FlushLifeCycleTracker.DUMMY); } public static interface FlushResult { @@ -2241,6 +2241,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * time-sensitive thread. * @param forceFlushAllStores whether we want to flush all stores * @param writeFlushRequestWalMarker whether to write the flush request marker to WAL + * @param tracker used to track the life cycle of this flush * @return whether the flush is success and whether the region needs compacting * * @throws IOException general io exceptions @@ -2248,8 +2249,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * because a Snapshot was not properly persisted. The region is put in closing mode, and the * caller MUST abort after this. */ - public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker) - throws IOException { + public FlushResultImpl flushcache(boolean forceFlushAllStores, boolean writeFlushRequestWalMarker, + FlushLifeCycleTracker tracker) throws IOException { // fail-fast instead of waiting on the lock if (this.closing.get()) { String msg = "Skipping flush on " + this + " because closing"; @@ -2269,7 +2270,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (coprocessorHost != null) { status.setStatus("Running coprocessor pre-flush hooks"); - coprocessorHost.preFlush(); + coprocessorHost.preFlush(tracker); } // TODO: this should be managed within memstore with the snapshot, updated only after flush // successful @@ -2298,11 +2299,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi Collection specificStoresToFlush = forceFlushAllStores ? stores.values() : flushPolicy.selectStoresToFlush(); FlushResultImpl fs = - internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker); + internalFlushcache(specificStoresToFlush, status, writeFlushRequestWalMarker, tracker); if (coprocessorHost != null) { status.setStatus("Running post-flush coprocessor hooks"); - coprocessorHost.postFlush(); + coprocessorHost.postFlush(tracker); } if(fs.isFlushSucceeded()) { @@ -2398,7 +2399,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @see #internalFlushcache(Collection, MonitoredTask, boolean) */ private FlushResult internalFlushcache(MonitoredTask status) throws IOException { - return internalFlushcache(stores.values(), status, false); + return internalFlushcache(stores.values(), status, false, FlushLifeCycleTracker.DUMMY); } /** @@ -2406,9 +2407,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @see #internalFlushcache(WAL, long, Collection, MonitoredTask, boolean) */ private FlushResultImpl internalFlushcache(Collection storesToFlush, MonitoredTask status, - boolean writeFlushWalMarker) throws IOException { + boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { return internalFlushcache(this.wal, HConstants.NO_SEQNUM, storesToFlush, status, - writeFlushWalMarker); + writeFlushWalMarker, tracker); } /** @@ -2429,10 +2430,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi * @throws IOException general io exceptions * @throws DroppedSnapshotException Thrown when replay of WAL is required. */ - protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, Collection storesToFlush, - MonitoredTask status, boolean writeFlushWalMarker) throws IOException { - PrepareFlushResult result - = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker); + protected FlushResultImpl internalFlushcache(WAL wal, long myseqid, + Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, + FlushLifeCycleTracker tracker) throws IOException { + PrepareFlushResult result = + internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker, tracker); if (result.result == null) { return internalFlushCacheAndCommit(wal, status, result, storesToFlush); } else { @@ -2443,8 +2445,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DLS_DEAD_LOCAL_STORE", justification="FindBugs seems confused about trxId") protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, - Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker) - throws IOException { + Collection storesToFlush, MonitoredTask status, boolean writeFlushWalMarker, + FlushLifeCycleTracker tracker) throws IOException { if (this.rsServices != null && this.rsServices.isAborted()) { // Don't flush when server aborting, it's unsafe throw new IOException("Aborting flush because server is aborted..."); @@ -2469,9 +2471,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi if (wal != null) { writeEntry = mvcc.begin(); long flushOpSeqId = writeEntry.getWriteNumber(); - FlushResultImpl flushResult = new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, - flushOpSeqId, "Nothing to flush", - writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); + FlushResultImpl flushResult = + new FlushResultImpl(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, flushOpSeqId, + "Nothing to flush", writeFlushRequestMarkerToWAL(wal, writeFlushWalMarker)); mvcc.completeAndWait(writeEntry); // Set to null so we don't complete it again down in finally block. writeEntry = null; @@ -2547,8 +2549,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi MemStoreSize flushableSize = s.getFlushableSize(); totalSizeOfFlushableStores.incMemStoreSize(flushableSize); storeFlushCtxs.put(s.getColumnFamilyDescriptor().getName(), - s.createFlushContext(flushOpSeqId)); - committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); // for writing stores to WAL + s.createFlushContext(flushOpSeqId, tracker)); + // for writing stores to WAL + committedFiles.put(s.getColumnFamilyDescriptor().getName(), null); storeFlushableSize.put(s.getColumnFamilyDescriptor().getName(), flushableSize); } @@ -4079,29 +4082,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } } - private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException { - if(memstoreTotalSize > this.getMemStoreFlushSize()) { - requestFlush(); - } - } - - private void requestFlush() { - if (this.rsServices == null) { - return; - } - synchronized (writestate) { - if (this.writestate.isFlushRequested()) { - return; - } - writestate.flushRequested = true; - } - // Make request outside of synchronize block; HBASE-818. - this.rsServices.getFlushRequester().requestFlush(this, false); - if (LOG.isDebugEnabled()) { - LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName()); - } - } - /* * @param size * @return True if size is over the flush threshold @@ -4216,7 +4196,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } if (seqid > minSeqIdForTheRegion) { // Then we added some edits to memory. Flush and cleanup split edit files. - internalFlushcache(null, seqid, stores.values(), status, false); + internalFlushcache(null, seqid, stores.values(), status, false, FlushLifeCycleTracker.DUMMY); } // Now delete the content of recovered edits. We're done w/ them. if (files.size() > 0 && this.conf.getBoolean("hbase.region.archive.recovered.edits", false)) { @@ -4400,7 +4380,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi } flush = isFlushSize(this.addAndGetMemStoreSize(memstoreSize)); if (flush) { - internalFlushcache(null, currentEditSeqId, stores.values(), status, false); + internalFlushcache(null, currentEditSeqId, stores.values(), status, false, + FlushLifeCycleTracker.DUMMY); } if (coprocessorHost != null) { @@ -4603,8 +4584,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // we can just snapshot our memstores and continue as normal. // invoke prepareFlushCache. Send null as wal since we do not want the flush events in wal - PrepareFlushResult prepareResult = internalPrepareFlushCache(null, - flushSeqId, storesToFlush, status, false); + PrepareFlushResult prepareResult = internalPrepareFlushCache(null, flushSeqId, + storesToFlush, status, false, FlushLifeCycleTracker.DUMMY); if (prepareResult.result == null) { // save the PrepareFlushResult so that we can use it later from commit flush this.writestate.flushing = true; @@ -4818,7 +4799,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi StoreFlushContext ctx = null; long startTime = EnvironmentEdgeManager.currentTime(); if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) { - ctx = store.createFlushContext(flush.getFlushSequenceNumber()); + ctx = store.createFlushContext(flush.getFlushSequenceNumber(), FlushLifeCycleTracker.DUMMY); } else { ctx = prepareFlushResult.storeFlushCtxs.get(family); startTime = prepareFlushResult.startTime; @@ -4878,7 +4859,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi throws IOException { MemStoreSize flushableSize = s.getFlushableSize(); this.decrMemStoreSize(flushableSize); - StoreFlushContext ctx = s.createFlushContext(currentSeqId); + StoreFlushContext ctx = s.createFlushContext(currentSeqId, FlushLifeCycleTracker.DUMMY); ctx.prepare(); ctx.abort(); return flushableSize; @@ -5724,7 +5705,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is // a sequence id that we can be sure is beyond the last hfile written). if (assignSeqId) { - FlushResult fs = flushcache(true, false); + FlushResult fs = flushcache(true, false, FlushLifeCycleTracker.DUMMY); if (fs.isFlushSucceeded()) { seqId = ((FlushResultImpl)fs).flushSequenceId; } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { @@ -8234,4 +8215,41 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker, RpcServer.getRequestUser().orElse(null)); } + + private void requestFlushIfNeeded(long memstoreTotalSize) throws RegionTooBusyException { + if (memstoreTotalSize > this.getMemStoreFlushSize()) { + requestFlush(); + } + } + + private void requestFlush() { + if (this.rsServices == null) { + return; + } + requestFlush0(FlushLifeCycleTracker.DUMMY); + } + + private void requestFlush0(FlushLifeCycleTracker tracker) { + boolean shouldFlush = false; + synchronized (writestate) { + if (!this.writestate.isFlushRequested()) { + shouldFlush = true; + writestate.flushRequested = true; + } + } + if (shouldFlush) { + // Make request outside of synchronize block; HBASE-818. + this.rsServices.getFlushRequester().requestFlush(this, false, tracker); + if (LOG.isDebugEnabled()) { + LOG.debug("Flush requested on " + this.getRegionInfo().getEncodedName()); + } + } else { + tracker.notExecuted("Flush already requested on " + this); + } + } + + @Override + public void requestFlush(FlushLifeCycleTracker tracker) throws IOException { + requestFlush0(tracker); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index e3d6724b3de..52db6997f03 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -952,7 +952,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat * @throws IOException if exception occurs during process */ protected List flushCache(final long logCacheFlushId, MemStoreSnapshot snapshot, - MonitoredTask status, ThroughputController throughputController) throws IOException { + MonitoredTask status, ThroughputController throughputController, + FlushLifeCycleTracker tracker) throws IOException { // If an exception happens flushing, we let it out without clearing // the memstore snapshot. The old snapshot will be returned when we say // 'snapshot', the next time flush comes around. @@ -963,7 +964,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat for (int i = 0; i < flushRetriesNumber; i++) { try { List pathNames = - flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController); + flusher.flushSnapshot(snapshot, logCacheFlushId, status, throughputController, tracker); Path lastPathName = null; try { for (Path pathName : pathNames) { @@ -2152,13 +2153,14 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat } } - public StoreFlushContext createFlushContext(long cacheFlushId) { - return new StoreFlusherImpl(cacheFlushId); + public StoreFlushContext createFlushContext(long cacheFlushId, FlushLifeCycleTracker tracker) { + return new StoreFlusherImpl(cacheFlushId, tracker); } private final class StoreFlusherImpl implements StoreFlushContext { - private long cacheFlushSeqNum; + private final FlushLifeCycleTracker tracker; + private final long cacheFlushSeqNum; private MemStoreSnapshot snapshot; private List tempFiles; private List committedFiles; @@ -2166,8 +2168,9 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat private long cacheFlushSize; private long outputFileSize; - private StoreFlusherImpl(long cacheFlushSeqNum) { + private StoreFlusherImpl(long cacheFlushSeqNum, FlushLifeCycleTracker tracker) { this.cacheFlushSeqNum = cacheFlushSeqNum; + this.tracker = tracker; } /** @@ -2188,7 +2191,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat RegionServerServices rsService = region.getRegionServerServices(); ThroughputController throughputController = rsService == null ? null : rsService.getFlushThroughputController(); - tempFiles = HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController); + tempFiles = + HStore.this.flushCache(cacheFlushSeqNum, snapshot, status, throughputController, tracker); } @Override @@ -2220,7 +2224,7 @@ public class HStore implements Store, HeapSize, StoreConfigInformation, Propagat for (HStoreFile sf : storeFiles) { if (HStore.this.getCoprocessorHost() != null) { - HStore.this.getCoprocessorHost().postFlush(HStore.this, sf); + HStore.this.getCoprocessorHost().postFlush(HStore.this, sf, tracker); } committedFiles.add(sf.getPath()); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 20b6c5f2013..dbdb27a60e8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -210,7 +210,7 @@ public class LogRoller extends HasThread implements Closeable { requester = this.services.getFlushRequester(); if (requester != null) { // force flushing all stores to clean old logs - requester.requestFlush(r, true); + requester.requestFlush(r, true, FlushLifeCycleTracker.DUMMY); scheduled = true; } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 82390bd7fc9..ae4c8ebc2aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -18,8 +18,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import static org.apache.hadoop.util.StringUtils.humanReadableInt; - import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; @@ -44,20 +42,20 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; -import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.shaded.com.google.common.base.Preconditions; /** * Thread that flushes cache on request @@ -183,12 +181,12 @@ class MemStoreFlusher implements FlushRequester { ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) && (bestRegionReplica.getMemStoreSize() > secondaryMultiplier * regionToFlush.getMemStoreSize()))) { - LOG.info("Refreshing storefiles of region " + bestRegionReplica - + " due to global heap pressure. Total memstore datasize=" - + StringUtils - .humanReadableInt(server.getRegionServerAccounting().getGlobalMemStoreDataSize()) - + " memstore heap size=" + StringUtils.humanReadableInt( - server.getRegionServerAccounting().getGlobalMemStoreHeapSize())); + LOG.info("Refreshing storefiles of region " + bestRegionReplica + + " due to global heap pressure. Total memstore datasize=" + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) + + " memstore heap size=" + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemStoreHeapSize(), "", 1)); flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); if (!flushedOne) { LOG.info("Excluding secondary region " + bestRegionReplica + @@ -196,12 +194,13 @@ class MemStoreFlusher implements FlushRequester { excludedRegions.add(bestRegionReplica); } } else { - LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " - + "Total Memstore size=" - + humanReadableInt(server.getRegionServerAccounting().getGlobalMemStoreDataSize()) - + ", Region memstore size=" - + humanReadableInt(regionToFlush.getMemStoreSize())); - flushedOne = flushRegion(regionToFlush, true, false); + LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " + + "Total Memstore size=" + + TraditionalBinaryPrefix.long2String( + server.getRegionServerAccounting().getGlobalMemStoreDataSize(), "", 1) + + ", Region memstore size=" + + TraditionalBinaryPrefix.long2String(regionToFlush.getMemStoreSize(), "", 1)); + flushedOne = flushRegion(regionToFlush, true, false, FlushLifeCycleTracker.DUMMY); if (!flushedOne) { LOG.info("Excluding unflushable region " + regionToFlush + @@ -348,15 +347,17 @@ class MemStoreFlusher implements FlushRequester { } @Override - public void requestFlush(HRegion r, boolean forceFlushAllStores) { + public void requestFlush(HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) { r.incrementFlushesQueuedCount(); synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has no delay so it will be added at the top of the flush - // queue. It'll come out near immediately. - FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores); + // queue. It'll come out near immediately. + FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores, tracker); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); + } else { + tracker.notExecuted("Flush already requested on " + r); } } } @@ -367,7 +368,8 @@ class MemStoreFlusher implements FlushRequester { synchronized (regionsInQueue) { if (!regionsInQueue.containsKey(r)) { // This entry has some delay - FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores); + FlushRegionEntry fqe = + new FlushRegionEntry(r, forceFlushAllStores, FlushLifeCycleTracker.DUMMY); fqe.requeue(delay); this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); @@ -463,7 +465,7 @@ class MemStoreFlusher implements FlushRequester { return true; } } - return flushRegion(region, false, fqe.isForceFlushAllStores()); + return flushRegion(region, false, fqe.isForceFlushAllStores(), fqe.getTracker()); } /** @@ -478,22 +480,23 @@ class MemStoreFlusher implements FlushRequester { * false, there will be accompanying log messages explaining why the region was * not flushed. */ - private boolean flushRegion(final HRegion region, final boolean emergencyFlush, - boolean forceFlushAllStores) { + private boolean flushRegion(HRegion region, boolean emergencyFlush, boolean forceFlushAllStores, + FlushLifeCycleTracker tracker) { synchronized (this.regionsInQueue) { FlushRegionEntry fqe = this.regionsInQueue.remove(region); // Use the start time of the FlushRegionEntry if available if (fqe != null && emergencyFlush) { - // Need to remove from region from delay queue. When NOT an + // Need to remove from region from delay queue. When NOT an // emergencyFlush, then item was removed via a flushQueue.poll. flushQueue.remove(fqe); } } + tracker.beforeExecution(); lock.readLock().lock(); try { notifyFlushRequest(region, emergencyFlush); - FlushResult flushResult = region.flush(forceFlushAllStores); + FlushResult flushResult = region.flushcache(forceFlushAllStores, false, tracker); boolean shouldCompact = flushResult.isCompactionNeeded(); // We just want to check the size boolean shouldSplit = region.checkSplit() != null; @@ -523,6 +526,7 @@ class MemStoreFlusher implements FlushRequester { } finally { lock.readLock().unlock(); wakeUpIfBlocking(); + tracker.afterExecution(); } return true; } @@ -732,13 +736,16 @@ class MemStoreFlusher implements FlushRequester { private long whenToExpire; private int requeueCount = 0; - private boolean forceFlushAllStores; + private final boolean forceFlushAllStores; - FlushRegionEntry(final HRegion r, boolean forceFlushAllStores) { + private final FlushLifeCycleTracker tracker; + + FlushRegionEntry(final HRegion r, boolean forceFlushAllStores, FlushLifeCycleTracker tracker) { this.region = r; this.createTime = EnvironmentEdgeManager.currentTime(); this.whenToExpire = this.createTime; this.forceFlushAllStores = forceFlushAllStores; + this.tracker = tracker; } /** @@ -764,6 +771,10 @@ class MemStoreFlusher implements FlushRequester { return forceFlushAllStores; } + public FlushLifeCycleTracker getTracker() { + return tracker; + } + /** * @param when When to expire, when to come up out of the queue. * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 28f73aaf7c2..5d450cc6cda 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1611,7 +1611,8 @@ public class RSRpcServices implements HBaseRPCErrorHandler, boolean writeFlushWalMarker = request.hasWriteFlushWalMarker() ? request.getWriteFlushWalMarker() : false; // Go behind the curtain so we can manage writing of the flush WAL marker - HRegion.FlushResultImpl flushResult = region.flushcache(true, writeFlushWalMarker); + HRegion.FlushResultImpl flushResult = + region.flushcache(true, writeFlushWalMarker, FlushLifeCycleTracker.DUMMY); boolean compactionNeeded = flushResult.isCompactionNeeded(); if (compactionNeeded) { regionServer.compactSplitThread.requestSystemCompaction(region, diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 0c93ed13cd4..c0827cbf04e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -468,4 +468,9 @@ public interface Region extends ConfigurationObserver { */ void requestCompaction(byte[] family, String why, int priority, boolean major, CompactionLifeCycleTracker tracker) throws IOException; + + /** + * Request flush on this region. + */ + void requestFlush(FlushLifeCycleTracker tracker) throws IOException; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 42d7ac99161..e25b0905cea 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -669,13 +669,13 @@ public class RegionCoprocessorHost * Invoked before a memstore flush * @throws IOException */ - public InternalScanner preFlush(HStore store, final InternalScanner scanner) - throws IOException { - return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null : - new ObserverOperationWithResult(regionObserverGetter) { + public InternalScanner preFlush(HStore store, InternalScanner scanner, + FlushLifeCycleTracker tracker) throws IOException { + return execOperationWithResult(false, scanner, coprocEnvironments.isEmpty() ? null + : new ObserverOperationWithResult(regionObserverGetter) { @Override public InternalScanner call(RegionObserver observer) throws IOException { - return observer.preFlush(this, store, getResult()); + return observer.preFlush(this, store, getResult(), tracker); } }); } @@ -684,11 +684,11 @@ public class RegionCoprocessorHost * Invoked before a memstore flush * @throws IOException */ - public void preFlush() throws IOException { + public void preFlush(FlushLifeCycleTracker tracker) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { - observer.preFlush(this); + observer.preFlush(this, tracker); } }); } @@ -697,11 +697,11 @@ public class RegionCoprocessorHost * Invoked after a memstore flush * @throws IOException */ - public void postFlush() throws IOException { + public void postFlush(FlushLifeCycleTracker tracker) throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { - observer.postFlush(this); + observer.postFlush(this, tracker); } }); } @@ -710,11 +710,12 @@ public class RegionCoprocessorHost * Invoked after a memstore flush * @throws IOException */ - public void postFlush(final HStore store, final HStoreFile storeFile) throws IOException { + public void postFlush(HStore store, HStoreFile storeFile, FlushLifeCycleTracker tracker) + throws IOException { execOperation(coprocEnvironments.isEmpty() ? null : new RegionObserverOperation() { @Override public void call(RegionObserver observer) throws IOException { - observer.postFlush(this, store, storeFile); + observer.postFlush(this, store, storeFile, tracker); } }); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java index 8fde7d5e047..b0bff106785 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java @@ -57,7 +57,8 @@ abstract class StoreFlusher { * @return List of files written. Can be empty; must not be null. */ public abstract List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status, ThroughputController throughputController) throws IOException; + MonitoredTask status, ThroughputController throughputController, + FlushLifeCycleTracker tracker) throws IOException; protected void finalizeWriter(StoreFileWriter writer, long cacheFlushSeqNum, MonitoredTask status) throws IOException { @@ -77,15 +78,15 @@ abstract class StoreFlusher { * @param smallestReadPoint * @return The scanner; null if coprocessor is canceling the flush. */ - protected InternalScanner createScanner(List snapshotScanners, - long smallestReadPoint) throws IOException { + protected final InternalScanner createScanner(List snapshotScanners, + long smallestReadPoint, FlushLifeCycleTracker tracker) throws IOException { InternalScanner scanner = new StoreScanner(store, store.getScanInfo(), OptionalInt.empty(), snapshotScanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint, HConstants.OLDEST_TIMESTAMP); assert scanner != null; if (store.getCoprocessorHost() != null) { try { - return store.getCoprocessorHost().preFlush(store, scanner); + return store.getCoprocessorHost().preFlush(store, scanner, tracker); } catch (IOException ioe) { scanner.close(); throw ioe; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java index c858f8f6d49..259b3334fec 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java @@ -56,13 +56,14 @@ public class StripeStoreFlusher extends StoreFlusher { @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushSeqNum, - MonitoredTask status, ThroughputController throughputController) throws IOException { + MonitoredTask status, ThroughputController throughputController, + FlushLifeCycleTracker tracker) throws IOException { List result = new ArrayList<>(); int cellsCount = snapshot.getCellsCount(); if (cellsCount == 0) return result; // don't flush if there are no entries long smallestReadPoint = store.getSmallestReadPoint(); - InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint); + InternalScanner scanner = createScanner(snapshot.getScanners(), smallestReadPoint, tracker); if (scanner == null) { return result; // NULL scanner returned from coprocessor hooks means skip normal processing } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java index be027c563bc..1d1cf5b4bad 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/AccessController.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.quotas.GlobalQuotaSettings; import org.apache.hadoop.hbase.regionserver.BloomType; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.Region; @@ -1592,9 +1593,10 @@ public class AccessController implements MasterCoprocessor, RegionCoprocessor, } @Override - public void preFlush(ObserverContext c) throws IOException { + public void preFlush(ObserverContext c, + FlushLifeCycleTracker tracker) throws IOException { requirePermission(getActiveUser(c), "flush", getTableName(c.getEnvironment()), null, null, - Action.ADMIN, Action.CREATE); + Action.ADMIN, Action.CREATE); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java index 1745c8201ae..8cbd2a5cbd3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMobCloneSnapshotFromClient.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.master.cleaner.TimeToLiveHFileCleaner; import org.apache.hadoop.hbase.mob.MobConstants; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.snapshot.MobSnapshotTestingUtils; import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils; import org.apache.hadoop.hbase.testclassification.ClientTests; @@ -147,7 +148,8 @@ public class TestMobCloneSnapshotFromClient extends TestCloneSnapshotFromClient } @Override - public void preFlush(ObserverContext e) throws IOException { + public void preFlush(ObserverContext e, + FlushLifeCycleTracker tracker) throws IOException { if (delayFlush) { try { if (Bytes.compareTo(e.getEnvironment().getRegionInfo().getStartKey(), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java index 71ef48456fe..611d53b29a5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.hfile.CacheConfig; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.KeyValueScanner; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; @@ -171,14 +172,14 @@ public class SimpleRegionObserver implements RegionCoprocessor, RegionObserver { @Override public InternalScanner preFlush(ObserverContext c, - Store store, InternalScanner scanner) throws IOException { + Store store, InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { ctPreFlush.incrementAndGet(); return scanner; } @Override public void postFlush(ObserverContext c, - Store store, StoreFile resultFile) throws IOException { + Store store, StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException { ctPostFlush.incrementAndGet(); if (throwOnPostFlush.get()){ throw new IOException("throwOnPostFlush is true in postFlush"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 22ecd2fe120..e0d9f5f98b9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; @@ -214,12 +215,16 @@ public class TestCoprocessorInterface { CompactionRequest request) { postCompactCalled = true; } + @Override - public void preFlush(ObserverContext e) { + public void preFlush(ObserverContext e, + FlushLifeCycleTracker tracker) { preFlushCalled = true; } + @Override - public void postFlush(ObserverContext e) { + public void postFlush(ObserverContext e, + FlushLifeCycleTracker tracker) { postFlushCalled = true; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java index 32058995884..49fc3fd5470 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverInterface.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.HFile; import org.apache.hadoop.hbase.io.hfile.HFileContext; import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.NoLimitScannerContext; @@ -465,7 +466,8 @@ public class TestRegionObserverInterface { } @Override - public void postFlush(ObserverContext e) { + public void postFlush(ObserverContext e, + FlushLifeCycleTracker tracker) { lastFlush = EnvironmentEdgeManager.currentTime(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java index 4d6bfec1abc..9e7c1847fda 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverScannerOpenHook.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.filter.FilterBase; import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HStore; @@ -157,7 +158,7 @@ public class TestRegionObserverScannerOpenHook { @Override public InternalScanner preFlush(ObserverContext c, Store store, - InternalScanner scanner) throws IOException { + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { return NO_DATA; } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java index cdad8501dec..f0d9f1a0e99 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/NoOpScanPolicyObserver.java @@ -45,7 +45,7 @@ public class NoOpScanPolicyObserver implements RegionCoprocessor, RegionObserver @Override public InternalScanner preFlush(ObserverContext c, Store store, - InternalScanner scanner) throws IOException { + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { return new DelegatingInternalScanner(scanner); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushLifeCycleTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushLifeCycleTracker.java new file mode 100644 index 00000000000..6677c18bb5e --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushLifeCycleTracker.java @@ -0,0 +1,240 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Optional; +import java.util.concurrent.CountDownLatch; + +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.coprocessor.RegionObserver; +import org.apache.hadoop.hbase.testclassification.CoprocessorTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +/** + * Confirm that the function of FlushLifeCycleTracker is OK as we do not use it in our own code. + */ +@Category({ CoprocessorTests.class, MediumTests.class }) +public class TestFlushLifeCycleTracker { + + private static final HBaseTestingUtility UTIL = new HBaseTestingUtility(); + + private static final TableName NAME = + TableName.valueOf(TestCompactionLifeCycleTracker.class.getSimpleName()); + + private static final byte[] CF = Bytes.toBytes("CF"); + + private static final byte[] QUALIFIER = Bytes.toBytes("CQ"); + + private HRegion region; + + private static FlushLifeCycleTracker TRACKER; + + private static volatile CountDownLatch ARRIVE; + + private static volatile CountDownLatch BLOCK; + + public static final class FlushObserver implements RegionObserver, RegionCoprocessor { + + @Override + public Optional getRegionObserver() { + return Optional.of(this); + } + + @Override + public void preFlush(ObserverContext c, + FlushLifeCycleTracker tracker) throws IOException { + if (TRACKER != null) { + assertSame(tracker, TRACKER); + } + } + + @Override + public InternalScanner preFlush(ObserverContext c, Store store, + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { + if (TRACKER != null) { + assertSame(tracker, TRACKER); + } + return scanner; + } + + @Override + public void postFlush(ObserverContext c, + FlushLifeCycleTracker tracker) throws IOException { + if (TRACKER != null) { + assertSame(tracker, TRACKER); + } + } + + @Override + public void postFlush(ObserverContext c, Store store, + StoreFile resultFile, FlushLifeCycleTracker tracker) throws IOException { + if (TRACKER != null) { + assertSame(tracker, TRACKER); + } + // inject here so we can make a flush request to fail because of we already have a flush + // ongoing. + CountDownLatch arrive = ARRIVE; + if (arrive != null) { + arrive.countDown(); + } + try { + BLOCK.await(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + } + + private static final class Tracker implements FlushLifeCycleTracker { + + private String reason; + + private boolean beforeExecutionCalled; + + private boolean afterExecutionCalled; + + private boolean completed = false; + + @Override + public synchronized void notExecuted(String reason) { + this.reason = reason; + completed = true; + notifyAll(); + } + + @Override + public void beforeExecution() { + this.beforeExecutionCalled = true; + } + + @Override + public synchronized void afterExecution() { + this.afterExecutionCalled = true; + completed = true; + notifyAll(); + } + + public synchronized void await() throws InterruptedException { + while (!completed) { + wait(); + } + } + } + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + UTIL.startMiniCluster(3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException { + UTIL.getAdmin() + .createTable(TableDescriptorBuilder.newBuilder(NAME) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF)) + .addCoprocessor(FlushObserver.class.getName()).build()); + region = UTIL.getHBaseCluster().getRegions(NAME).get(0); + } + + @After + public void tearDown() throws IOException { + region = null; + TRACKER = null; + UTIL.deleteTable(NAME); + } + + @Test + public void test() throws IOException, InterruptedException { + try (Table table = UTIL.getConnection().getTable(NAME)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addImmutable(CF, QUALIFIER, Bytes.toBytes(i))); + } + } + Tracker tracker = new Tracker(); + TRACKER = tracker; + region.requestFlush(tracker); + tracker.await(); + assertNull(tracker.reason); + assertTrue(tracker.beforeExecutionCalled); + assertTrue(tracker.afterExecutionCalled); + + // request flush on a region with empty memstore should still success + tracker = new Tracker(); + TRACKER = tracker; + region.requestFlush(tracker); + tracker.await(); + assertNull(tracker.reason); + assertTrue(tracker.beforeExecutionCalled); + assertTrue(tracker.afterExecutionCalled); + } + + @Test + public void testNotExecuted() throws IOException, InterruptedException { + try (Table table = UTIL.getConnection().getTable(NAME)) { + for (int i = 0; i < 100; i++) { + table.put(new Put(Bytes.toBytes(i)).addImmutable(CF, QUALIFIER, Bytes.toBytes(i))); + } + } + // here we may have overlap when calling the CP hooks so we do not assert on TRACKER + Tracker tracker1 = new Tracker(); + ARRIVE = new CountDownLatch(1); + BLOCK = new CountDownLatch(1); + region.requestFlush(tracker1); + ARRIVE.await(); + + Tracker tracker2 = new Tracker(); + region.requestFlush(tracker2); + tracker2.await(); + assertNotNull(tracker2.reason); + assertFalse(tracker2.beforeExecutionCalled); + assertFalse(tracker2.afterExecutionCalled); + + BLOCK.countDown(); + tracker1.await(); + assertNull(tracker1.reason); + assertTrue(tracker1.beforeExecutionCalled); + assertTrue(tracker1.afterExecutionCalled); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java index 0eed44974bd..12fdb77ed73 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlushRegionEntry.java @@ -1,25 +1,34 @@ /** - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable - * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" - * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License - * for the specific language governing permissions and limitations under the License. + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. */ package org.apache.hadoop.hbase.regionserver; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; -import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionInfoBuilder; +import org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry; import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; -import org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -27,7 +36,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.rules.TestName; -@Category({RegionServerTests.class, MediumTests.class}) +@Category({ RegionServerTests.class, MediumTests.class }) public class TestFlushRegionEntry { @Rule public TestName name = new TestName(); @@ -46,15 +55,15 @@ public class TestFlushRegionEntry { @Test public void testFlushRegionEntryEquality() { - HRegionInfo hri = new HRegionInfo(1, TableName.valueOf(name.getMethodName()), 0); + RegionInfo hri = RegionInfoBuilder.newBuilder(TableName.valueOf(name.getMethodName())) + .setRegionId(1).setReplicaId(0).build(); HRegion r = mock(HRegion.class); doReturn(hri).when(r).getRegionInfo(); - FlushRegionEntry entry = new FlushRegionEntry(r, true); - FlushRegionEntry other = new FlushRegionEntry(r, true); + FlushRegionEntry entry = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY); + FlushRegionEntry other = new FlushRegionEntry(r, true, FlushLifeCycleTracker.DUMMY); assertEquals(entry.hashCode(), other.hashCode()); assertEquals(entry, other); } - } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java index 38d038f3de1..95efa806258 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHMobStore.java @@ -482,7 +482,7 @@ public class TestHMobStore { * @throws IOException */ private static void flushStore(HMobStore store, long id) throws IOException { - StoreFlushContext storeFlushCtx = store.createFlushContext(id); + StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); storeFlushCtx.prepare(); storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index 24e42bbadc0..41bd99780b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -293,7 +293,7 @@ public class TestHRegion { put.addColumn(COLUMN_FAMILY_BYTES, null, value); // First put something in current memstore, which will be in snapshot after flusher.prepare() region.put(put); - StoreFlushContext storeFlushCtx = store.createFlushContext(12345); + StoreFlushContext storeFlushCtx = store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY); storeFlushCtx.prepare(); // Second put something in current memstore put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); @@ -337,7 +337,7 @@ public class TestHRegion { HStore store = region.getStore(COLUMN_FAMILY_BYTES); // Get some random bytes. byte [] value = Bytes.toBytes(method); - faultyLog.setStoreFlushCtx(store.createFlushContext(12345)); + faultyLog.setStoreFlushCtx(store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY)); Put put = new Put(value); put.addColumn(COLUMN_FAMILY_BYTES, Bytes.toBytes("abc"), value); @@ -400,8 +400,8 @@ public class TestHRegion { // save normalCPHost and replaced by mockedCPHost, which will cancel flush requests RegionCoprocessorHost normalCPHost = region.getCoprocessorHost(); RegionCoprocessorHost mockedCPHost = Mockito.mock(RegionCoprocessorHost.class); - when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class))). - thenReturn(null); + when(mockedCPHost.preFlush(Mockito.isA(HStore.class), Mockito.isA(InternalScanner.class), + Mockito.isA(FlushLifeCycleTracker.class))).thenReturn(null); region.setCoprocessorHost(mockedCPHost); region.put(put); region.flush(true); @@ -567,7 +567,8 @@ public class TestHRegion { region.put(p1); // Manufacture an outstanding snapshot -- fake a failed flush by doing prepare step only. HStore store = region.getStore(COLUMN_FAMILY_BYTES); - StoreFlushContext storeFlushCtx = store.createFlushContext(12345); + StoreFlushContext storeFlushCtx = + store.createFlushContext(12345, FlushLifeCycleTracker.DUMMY); storeFlushCtx.prepare(); // Now add two entries to the foreground memstore. Put p2 = new Put(row); @@ -5626,7 +5627,7 @@ public class TestHRegion { Put put = new Put(Bytes.toBytes("19998")); put.addColumn(cf1, col, Bytes.toBytes("val")); region.put(put); - region.flushcache(true, true); + region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); Put put2 = new Put(Bytes.toBytes("19997")); put2.addColumn(cf1, col, Bytes.toBytes("val")); region.put(put2); @@ -5642,7 +5643,7 @@ public class TestHRegion { p.addColumn(cf1, col, Bytes.toBytes("" + i)); region.put(p); } - region.flushcache(true, true); + region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); // create one memstore contains many rows will be skipped // to check MemStoreScanner.seekToPreviousRow @@ -5689,7 +5690,7 @@ public class TestHRegion { RegionScanner scanner = region.getScanner(scan); // flush the cache. This will reset the store scanner - region.flushcache(true, true); + region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); // create one memstore contains many rows will be skipped // to check MemStoreScanner.seekToPreviousRow diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index f1d9475fa19..bab5b2673be 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -1207,13 +1207,15 @@ public class TestHRegionReplayEvents { @Test public void testWriteFlushRequestMarker() throws IOException { // primary region is empty at this point. Request a flush with writeFlushRequestWalMarker=false - FlushResultImpl result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, false); + FlushResultImpl result = (FlushResultImpl) ((HRegion) primaryRegion).flushcache(true, false, + FlushLifeCycleTracker.DUMMY); assertNotNull(result); assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY); assertFalse(result.wroteFlushWalMarker); // request flush again, but this time with writeFlushRequestWalMarker = true - result = (FlushResultImpl)((HRegion)primaryRegion).flushcache(true, true); + result = (FlushResultImpl) ((HRegion) primaryRegion).flushcache(true, true, + FlushLifeCycleTracker.DUMMY); assertNotNull(result); assertEquals(result.result, FlushResultImpl.Result.CANNOT_FLUSH_MEMSTORE_EMPTY); assertTrue(result.wroteFlushWalMarker); @@ -1248,7 +1250,7 @@ public class TestHRegionReplayEvents { // Test case 1: Test that replaying CANNOT_FLUSH request marker assuming this came from // triggered flush restores readsEnabled - primaryRegion.flushcache(true, true); + primaryRegion.flushcache(true, true, FlushLifeCycleTracker.DUMMY); reader = createWALReaderForPrimary(); while (true) { WAL.Entry entry = reader.next(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java index b9054f4b53d..138260358e9 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java @@ -769,7 +769,7 @@ public class TestHStore { } private static void flushStore(HStore store, long id) throws IOException { - StoreFlushContext storeFlushCtx = store.createFlushContext(id); + StoreFlushContext storeFlushCtx = store.createFlushContext(id, FlushLifeCycleTracker.DUMMY); storeFlushCtx.prepare(); storeFlushCtx.flushCache(Mockito.mock(MonitoredTask.class)); storeFlushCtx.commit(Mockito.mock(MonitoredTask.class)); @@ -1081,7 +1081,7 @@ public class TestHStore { seqId = Math.max(seqId, c.getSequenceId()); } inputCellsBeforeSnapshot.forEach(c -> store.add(c, null)); - StoreFlushContext storeFlushCtx = store.createFlushContext(id++); + StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); storeFlushCtx.prepare(); inputCellsAfterSnapshot.forEach(c -> store.add(c, null)); int numberOfMemScannersBeforeFlush = inputCellsAfterSnapshot.isEmpty() ? 1 : 2; @@ -1287,7 +1287,7 @@ public class TestHStore { quals.add(qf1); quals.add(qf2); quals.add(qf3); - StoreFlushContext storeFlushCtx = store.createFlushContext(id++); + StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); MyCompactingMemStore.START_TEST.set(true); Runnable flush = () -> { // this is blocked until we create first scanner from pipeline and snapshot -- phase (1/5) @@ -1363,7 +1363,7 @@ public class TestHStore { myStore.add(createCell(qf3, ts, seqId, oldValue), memStoreSizing); long snapshotId = id++; // push older data into snapshot -- phase (1/4) - StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId); + StoreFlushContext storeFlushCtx = store.createFlushContext(snapshotId, FlushLifeCycleTracker.DUMMY); storeFlushCtx.prepare(); // insert current data into active -- phase (2/4) @@ -1475,7 +1475,7 @@ public class TestHStore { store.add(createCell(qf2, ts, seqId, value), memStoreSizing); store.add(createCell(qf3, ts, seqId, value), memStoreSizing); assertEquals(1, MyCompactingMemStoreWithCustomCompactor.RUNNER_COUNT.get()); - StoreFlushContext storeFlushCtx = store.createFlushContext(id++); + StoreFlushContext storeFlushCtx = store.createFlushContext(id++, FlushLifeCycleTracker.DUMMY); storeFlushCtx.prepare(); // This shouldn't invoke another in-memory flush because the first compactor thread // hasn't accomplished the in-memory compaction. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java index 739519f0de8..aae04dfe164 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHeapMemoryManager.java @@ -135,11 +135,11 @@ public class TestHeapMemoryManager { final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up Thread.sleep(1500); // No changes should be made by tuner as we already have lot of empty space @@ -178,10 +178,10 @@ public class TestHeapMemoryManager { // do some offheap flushes also. So there should be decrease in memstore but // not as that when we don't have offheap flushes memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -226,10 +226,10 @@ public class TestHeapMemoryManager { // do some offheap flushes also. So there should be decrease in memstore but // not as that when we don't have offheap flushes memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(-maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -242,10 +242,10 @@ public class TestHeapMemoryManager { // flushes are due to onheap overhead. This should once again call for increase in // memstore size but that increase should be to the safe size memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(maxStepValue, oldMemstoreHeapSize, memStoreFlusher.memstoreSize); @@ -308,10 +308,10 @@ public class TestHeapMemoryManager { final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize, @@ -322,8 +322,8 @@ public class TestHeapMemoryManager { oldBlockCacheSize = blockCache.maxSize; // Do some more flushes before the next run of HeapMemoryTuner memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize, @@ -357,10 +357,10 @@ public class TestHeapMemoryManager { heapMemoryManager.start(choreService); // this should not change anything with onheap memstore memStoreFlusher.flushType = FlushType.ABOVE_OFFHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up Thread.sleep(1500); // No changes should be made by tuner as we already have lot of empty space @@ -444,9 +444,9 @@ public class TestHeapMemoryManager { final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); blockCache.evictBlock(null); // Allow the tuner to run once and do necessary memory up Thread.sleep(1500); @@ -455,9 +455,9 @@ public class TestHeapMemoryManager { assertEquals(oldBlockCacheSize, blockCache.maxSize); // Do some more flushes before the next run of HeapMemoryTuner memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); // Allow the tuner to run once and do necessary memory up waitForTune(memStoreFlusher, memStoreFlusher.memstoreSize); assertHeapSpaceDelta(DefaultHeapMemoryTuner.DEFAULT_MAX_STEP_VALUE, oldMemstoreHeapSize, @@ -490,9 +490,9 @@ public class TestHeapMemoryManager { final ChoreService choreService = new ChoreService("TEST_SERVER_NAME"); heapMemoryManager.start(choreService); memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_LOWER_MARK; - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); blockCache.evictBlock(null); blockCache.evictBlock(null); // Allow the tuner to run once and do necessary memory up @@ -502,7 +502,7 @@ public class TestHeapMemoryManager { assertEquals(oldBlockCacheSize, blockCache.maxSize); // Flushes that block updates memStoreFlusher.flushType = FlushType.ABOVE_ONHEAP_HIGHER_MARK; - memStoreFlusher.requestFlush(null, false); + memStoreFlusher.requestFlush(null, false, FlushLifeCycleTracker.DUMMY); blockCache.evictBlock(null); blockCache.evictBlock(null); blockCache.evictBlock(null); @@ -805,7 +805,8 @@ public class TestHeapMemoryManager { } @Override - public void requestFlush(HRegion region, boolean forceFlushAllStores) { + public void requestFlush(HRegion region, boolean forceFlushAllStores, + FlushLifeCycleTracker tracker) { this.listener.flushRequested(flushType, region); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java index 40abf79d65d..99cf91d5da4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitWalDataLoss.java @@ -31,20 +31,20 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; import org.apache.hadoop.hbase.regionserver.HRegion.FlushResult; +import org.apache.hadoop.hbase.regionserver.HRegion.PrepareFlushResult; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -83,7 +83,8 @@ public class TestSplitWalDataLoss { testUtil.startMiniCluster(2); Admin admin = testUtil.getAdmin(); admin.createNamespace(namespace); - admin.createTable(new HTableDescriptor(tableName).addFamily(new HColumnDescriptor(family))); + admin.createTable(TableDescriptorBuilder.newBuilder(tableName) + .addColumnFamily(ColumnFamilyDescriptorBuilder.of(family)).build()); testUtil.waitTableAvailable(tableName); } @@ -136,7 +137,7 @@ public class TestSplitWalDataLoss { long oldestSeqIdOfStore = region.getOldestSeqIdOfStore(family); LOG.info("CHANGE OLDEST " + oldestSeqIdOfStore); assertTrue(oldestSeqIdOfStore > HConstants.NO_SEQNUM); - rs.cacheFlusher.requestFlush(spiedRegion, false); + rs.cacheFlusher.requestFlush(spiedRegion, false, FlushLifeCycleTracker.DUMMY); synchronized (flushed) { while (!flushed.booleanValue()) { flushed.wait(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index e42095930bf..3f6fa3bb013 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine; import org.apache.hadoop.hbase.regionserver.DefaultStoreFlusher; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.FlushRequestListener; import org.apache.hadoop.hbase.regionserver.FlushRequester; import org.apache.hadoop.hbase.regionserver.HRegion; @@ -650,15 +651,16 @@ public abstract class AbstractTestWALReplay { public CustomStoreFlusher(Configuration conf, HStore store) { super(conf, store); } + @Override public List flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId, - MonitoredTask status, ThroughputController throughputController) throws IOException { + MonitoredTask status, ThroughputController throughputController, + FlushLifeCycleTracker tracker) throws IOException { if (throwExceptionWhenFlushing.get()) { throw new IOException("Simulated exception by tests"); } - return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController); + return super.flushSnapshot(snapshot, cacheFlushId, status, throughputController, tracker); } - }; /** @@ -831,16 +833,14 @@ public abstract class AbstractTestWALReplay { WAL newWal = createWAL(newConf, hbaseRootDir, logName); final AtomicInteger flushcount = new AtomicInteger(0); try { - final HRegion region = - new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { + final HRegion region = new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) { @Override protected FlushResultImpl internalFlushcache(final WAL wal, final long myseqid, final Collection storesToFlush, MonitoredTask status, - boolean writeFlushWalMarker) - throws IOException { + boolean writeFlushWalMarker, FlushLifeCycleTracker tracker) throws IOException { LOG.info("InternalFlushCache Invoked"); FlushResultImpl fs = super.internalFlushcache(wal, myseqid, storesToFlush, - Mockito.mock(MonitoredTask.class), writeFlushWalMarker); + Mockito.mock(MonitoredTask.class), writeFlushWalMarker, tracker); flushcount.incrementAndGet(); return fs; } @@ -1117,7 +1117,7 @@ public abstract class AbstractTestWALReplay { private HRegion r; @Override - public void requestFlush(HRegion region, boolean force) { + public void requestFlush(HRegion region, boolean force, FlushLifeCycleTracker tracker) { try { r.flush(force); } catch (IOException e) { @@ -1127,8 +1127,6 @@ public abstract class AbstractTestWALReplay { @Override public void requestDelayedFlush(HRegion region, long when, boolean forceFlushAllStores) { - // TODO Auto-generated method stub - } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java index 276487abbb9..d196b64ae74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestAccessController.java @@ -112,6 +112,7 @@ import org.apache.hadoop.hbase.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.AccessControlService; import org.apache.hadoop.hbase.protobuf.generated.AccessControlProtos.CheckPermissionsRequest; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; @@ -807,7 +808,8 @@ public class TestAccessController extends SecureTestUtil { AccessTestAction action = new AccessTestAction() { @Override public Object run() throws Exception { - ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV)); + ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV), + FlushLifeCycleTracker.DUMMY); return null; } }; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java index 724641f2648..3cab57281ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/access/TestWithDisabledAuthorization.java @@ -58,6 +58,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionServerCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.BinaryComparator; import org.apache.hadoop.hbase.master.MasterCoprocessorHost; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress; import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; @@ -814,7 +815,8 @@ public class TestWithDisabledAuthorization extends SecureTestUtil { verifyAllowed(new AccessTestAction() { @Override public Object run() throws Exception { - ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV)); + ACCESS_CONTROLLER.preFlush(ObserverContextImpl.createAndPrepare(RCP_ENV), + FlushLifeCycleTracker.DUMMY); return null; } }, SUPERUSER, USER_ADMIN, USER_RW, USER_RO, USER_OWNER, USER_CREATE, USER_QUAL, USER_NONE); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index c67d7bf15c0..62244954aec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; +import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; @@ -311,7 +312,7 @@ public class TestCoprocessorScanPolicy { @Override public InternalScanner preFlush(ObserverContext c, Store store, - InternalScanner scanner) throws IOException { + InternalScanner scanner, FlushLifeCycleTracker tracker) throws IOException { return wrap(store, scanner); }