diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java index 67153aef995..1c5a593b00a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/FileLink.java @@ -476,6 +476,9 @@ public class FileLink { @Override public boolean equals(Object obj) { + if (obj == null) { + return false; + } // Assumes that the ordering of locations between objects are the same. This is true for the // current subclasses already (HFileLink, WALLink). Otherwise, we may have to sort the locations // or keep them presorted diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 830907d5a36..5cfd618f7f4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -4469,7 +4469,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // * @param flush the flush descriptor * @throws IOException */ - private void dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException { + private long dropMemstoreContentsForSeqId(long seqId, Store store) throws IOException { + long totalFreedSize = 0; this.updatesLock.writeLock().lock(); try { mvcc.waitForPreviousTransactionsComplete(); @@ -4483,10 +4484,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // Prepare flush (take a snapshot) and then abort (drop the snapshot) if (store == null ) { for (Store s : stores.values()) { - dropStoreMemstoreContentsForSeqId(s, currentSeqId); + totalFreedSize += doDropStoreMemstoreContentsForSeqId(s, currentSeqId); } } else { - dropStoreMemstoreContentsForSeqId(store, currentSeqId); + totalFreedSize += doDropStoreMemstoreContentsForSeqId(store, currentSeqId); } } else { LOG.info(getRegionInfo().getEncodedName() + " : " @@ -4496,13 +4497,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } finally { this.updatesLock.writeLock().unlock(); } + return totalFreedSize; } - private void dropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException { - this.addAndGetGlobalMemstoreSize(-s.getFlushableSize()); + private long doDropStoreMemstoreContentsForSeqId(Store s, long currentSeqId) throws IOException { + long snapshotSize = s.getFlushableSize(); + this.addAndGetGlobalMemstoreSize(-snapshotSize); StoreFlushContext ctx = s.createFlushContext(currentSeqId); ctx.prepare(); ctx.abort(); + return snapshotSize; } private void replayWALFlushAbortMarker(FlushDescriptor flush) { @@ -4623,27 +4627,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // // if all stores ended up dropping their snapshots, we can safely drop the // prepareFlushResult - if (writestate.flushing) { - boolean canDrop = true; - for (Entry entry - : prepareFlushResult.storeFlushCtxs.entrySet()) { - Store store = getStore(entry.getKey()); - if (store == null) { - continue; - } - if (store.getSnapshotSize() > 0) { - canDrop = false; - } - } - - // this means that all the stores in the region has finished flushing, but the WAL marker - // may not have been written or we did not receive it yet. - if (canDrop) { - writestate.flushing = false; - this.prepareFlushResult = null; - } - } - + dropPrepareFlushIfPossible(); // advance the mvcc read point so that the new flushed file is visible. // there may be some in-flight transactions, but they won't be made visible since they are @@ -4745,6 +4729,126 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { // } } + /** + * If all stores ended up dropping their snapshots, we can safely drop the prepareFlushResult + */ + private void dropPrepareFlushIfPossible() { + if (writestate.flushing) { + boolean canDrop = true; + if (prepareFlushResult.storeFlushCtxs != null) { + for (Entry entry + : prepareFlushResult.storeFlushCtxs.entrySet()) { + Store store = getStore(entry.getKey()); + if (store == null) { + continue; + } + if (store.getSnapshotSize() > 0) { + canDrop = false; + break; + } + } + } + + // this means that all the stores in the region has finished flushing, but the WAL marker + // may not have been written or we did not receive it yet. + if (canDrop) { + writestate.flushing = false; + this.prepareFlushResult = null; + } + } + } + + /** + * Checks the underlying store files, and opens the files that have not + * been opened, and removes the store file readers for store files no longer + * available. Mainly used by secondary region replicas to keep up to date with + * the primary region files or open new flushed files and drop their memstore snapshots in case + * of memory pressure. + * @throws IOException + */ + boolean refreshStoreFiles() throws IOException { + if (ServerRegionReplicaUtil.isDefaultReplica(this.getRegionInfo())) { + return false; // if primary nothing to do + } + + if (LOG.isDebugEnabled()) { + LOG.debug(getRegionInfo().getEncodedName() + " : " + + "Refreshing store files to see whether we can free up memstore"); + } + + long totalFreedSize = 0; + + long smallestSeqIdInStores = Long.MAX_VALUE; + + startRegionOperation(); // obtain region close lock + try { + synchronized (writestate) { + for (Store store : getStores().values()) { + // TODO: some stores might see new data from flush, while others do not which + // MIGHT break atomic edits across column families. + long maxSeqIdBefore = store.getMaxSequenceId(); + + // refresh the store files. This is similar to observing a region open wal marker. + store.refreshStoreFiles(); + + long storeSeqId = store.getMaxSequenceId(); + if (storeSeqId < smallestSeqIdInStores) { + smallestSeqIdInStores = storeSeqId; + } + + // see whether we can drop the memstore or the snapshot + if (storeSeqId > maxSeqIdBefore) { + + if (writestate.flushing) { + // only drop memstore snapshots if they are smaller than last flush for the store + if (this.prepareFlushResult.flushOpSeqId <= storeSeqId) { + StoreFlushContext ctx = this.prepareFlushResult.storeFlushCtxs == null ? + null : this.prepareFlushResult.storeFlushCtxs.get(store.getFamily().getName()); + if (ctx != null) { + long snapshotSize = store.getFlushableSize(); + ctx.abort(); + this.addAndGetGlobalMemstoreSize(-snapshotSize); + this.prepareFlushResult.storeFlushCtxs.remove(store.getFamily().getName()); + totalFreedSize += snapshotSize; + } + } + } + + // Drop the memstore contents if they are now smaller than the latest seen flushed file + totalFreedSize += dropMemstoreContentsForSeqId(storeSeqId, store); + } + } + + // if all stores ended up dropping their snapshots, we can safely drop the + // prepareFlushResult + dropPrepareFlushIfPossible(); + + // advance the mvcc read point so that the new flushed files are visible. + // there may be some in-flight transactions, but they won't be made visible since they are + // either greater than flush seq number or they were already picked up via flush. + for (Store s : getStores().values()) { + getMVCC().advanceMemstoreReadPointIfNeeded(s.getMaxMemstoreTS()); + } + + // smallestSeqIdInStores is the seqId that we have a corresponding hfile for. We can safely + // skip all edits that are to be replayed in the future with that has a smaller seqId + // than this. We are updating lastReplayedOpenRegionSeqId so that we can skip all edits + // that we have picked the flush files for + if (this.lastReplayedOpenRegionSeqId < smallestSeqIdInStores) { + this.lastReplayedOpenRegionSeqId = smallestSeqIdInStores; + } + } + // C. Finally notify anyone waiting on memstore to clear: + // e.g. checkResources(). + synchronized (this) { + notifyAll(); // FindBugs NN_NAKED_NOTIFY + } + return totalFreedSize > 0; + } finally { + closeRegionOperation(); + } + } + /** Checks whether the given regionName is either equal to our region, or that * the regionName is the primary region to our corresponding range for the secondary replica. */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java index 6268b7824d9..6e8bbf2ca87 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver; import static org.apache.hadoop.util.StringUtils.humanReadableInt; + import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.lang.management.ManagementFactory; @@ -44,13 +45,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Counter; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix; import org.apache.htrace.Trace; import org.apache.htrace.TraceScope; @@ -70,6 +74,7 @@ import com.google.common.base.Preconditions; class MemStoreFlusher implements FlushRequester { static final Log LOG = LogFactory.getLog(MemStoreFlusher.class); + private Configuration conf; // These two data members go together. Any entry in the one must have // a corresponding entry in the other. private final BlockingQueue flushQueue = @@ -100,6 +105,7 @@ class MemStoreFlusher implements FlushRequester { public MemStoreFlusher(final Configuration conf, final HRegionServer server) { super(); + this.conf = conf; this.server = server; this.threadWakeFrequency = conf.getLong(HConstants.THREAD_WAKE_FREQUENCY, 10 * 1000); @@ -138,6 +144,9 @@ class MemStoreFlusher implements FlushRequester { Set excludedRegions = new HashSet(); + double secondaryMultiplier + = ServerRegionReplicaUtil.getRegionReplicaStoreFileRefreshMultiplier(conf); + boolean flushedOne = false; while (!flushedOne) { // Find the biggest region that doesn't have too many storefiles @@ -147,8 +156,11 @@ class MemStoreFlusher implements FlushRequester { // Find the biggest region, total, even if it might have too many flushes. HRegion bestAnyRegion = getBiggestMemstoreRegion( regionsBySize, excludedRegions, false); + // Find the biggest region that is a secondary region + HRegion bestRegionReplica = getBiggestMemstoreOfRegionReplica(regionsBySize, + excludedRegions); - if (bestAnyRegion == null) { + if (bestAnyRegion == null && bestRegionReplica == null) { LOG.error("Above memory mark but there are no flushable regions!"); return false; } @@ -177,18 +189,37 @@ class MemStoreFlusher implements FlushRequester { } } - Preconditions.checkState(regionToFlush.memstoreSize.get() > 0); + Preconditions.checkState( + (regionToFlush != null && regionToFlush.memstoreSize.get() > 0) || + (bestRegionReplica != null && bestRegionReplica.memstoreSize.get() > 0)); - LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " - + "Total Memstore size=" - + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) - + ", Region memstore size=" - + humanReadableInt(regionToFlush.memstoreSize.get())); - flushedOne = flushRegion(regionToFlush, true, true); - if (!flushedOne) { - LOG.info("Excluding unflushable region " + regionToFlush + - " - trying to find a different region to flush."); - excludedRegions.add(regionToFlush); + if (regionToFlush == null || + (bestRegionReplica != null && + ServerRegionReplicaUtil.isRegionReplicaStoreFileRefreshEnabled(conf) && + (bestRegionReplica.memstoreSize.get() + > secondaryMultiplier * regionToFlush.memstoreSize.get()))) { + LOG.info("Refreshing storefiles of region " + regionToFlush + + " due to global heap pressure. memstore size=" + StringUtils.humanReadableInt( + server.getRegionServerAccounting().getGlobalMemstoreSize())); + flushedOne = refreshStoreFilesAndReclaimMemory(bestRegionReplica); + if (!flushedOne) { + LOG.info("Excluding secondary region " + regionToFlush + + " - trying to find a different region to refresh files."); + excludedRegions.add(bestRegionReplica); + } + } else { + LOG.info("Flush of region " + regionToFlush + " due to global heap pressure. " + + "Total Memstore size=" + + humanReadableInt(server.getRegionServerAccounting().getGlobalMemstoreSize()) + + ", Region memstore size=" + + humanReadableInt(regionToFlush.memstoreSize.get())); + flushedOne = flushRegion(regionToFlush, true, true); + + if (!flushedOne) { + LOG.info("Excluding unflushable region " + regionToFlush + + " - trying to find a different region to flush."); + excludedRegions.add(regionToFlush); + } } } return true; @@ -281,6 +312,33 @@ class MemStoreFlusher implements FlushRequester { return null; } + private HRegion getBiggestMemstoreOfRegionReplica(SortedMap regionsBySize, + Set excludedRegions) { + synchronized (regionsInQueue) { + for (HRegion region : regionsBySize.values()) { + if (excludedRegions.contains(region)) { + continue; + } + + if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo())) { + continue; + } + + return region; + } + } + return null; + } + + private boolean refreshStoreFilesAndReclaimMemory(HRegion region) { + try { + return region.refreshStoreFiles(); + } catch (IOException e) { + LOG.warn("Refreshing store files failed with exception", e); + } + return false; + } + /** * Return true if global memory usage is above the high watermark */ diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java index 710698b2bba..45517220575 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java @@ -68,6 +68,25 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { = "hbase.region.replica.wait.for.primary.flush"; private static final boolean DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH = true; + /** + * Enables or disables refreshing store files of secondary region replicas when the memory is + * above the global memstore lower limit. Refreshing the store files means that we will do a file + * list of the primary regions store files, and pick up new files. Also depending on the store + * files, we can drop some memstore contents which will free up memory. + */ + public static final String REGION_REPLICA_STORE_FILE_REFRESH + = "hbase.region.replica.storefile.refresh"; + private static final boolean DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH = true; + + /** + * The multiplier to use when we want to refresh a secondary region instead of flushing a primary + * region. Default value assumes that for doing the file refresh, the biggest secondary should be + * 4 times bigger than the biggest primary. + */ + public static final String REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER + = "hbase.region.replica.storefile.refresh.memstore.multiplier"; + private static final double DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER = 4; + /** * Returns the regionInfo object to use for interacting with the file system. * @return An HRegionInfo object to interact with the filesystem @@ -163,6 +182,16 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil { DEFAULT_REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH); } + public static boolean isRegionReplicaStoreFileRefreshEnabled(Configuration conf) { + return conf.getBoolean(REGION_REPLICA_STORE_FILE_REFRESH, + DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH); + } + + public static double getRegionReplicaStoreFileRefreshMultiplier(Configuration conf) { + return conf.getDouble(REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER, + DEFAULT_REGION_REPLICA_STORE_FILE_REFRESH_MEMSTORE_MULTIPLIER); + } + /** * Return the peer id used for replicating to secondary region replicas */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index dfa999b4357..b877d5a4a74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.DefaultWALProvider; import org.apache.hadoop.hbase.wal.WAL; @@ -1326,6 +1327,104 @@ public class TestHRegionReplayEvents { secondaryRegion.get(new Get(Bytes.toBytes(0))); } + @Test + public void testRefreshStoreFiles() throws IOException { + assertEquals(0, primaryRegion.getStoreFileList(families).size()); + assertEquals(0, secondaryRegion.getStoreFileList(families).size()); + + // Test case 1: refresh with an empty region + secondaryRegion.refreshStoreFiles(); + assertEquals(0, secondaryRegion.getStoreFileList(families).size()); + + // do one flush + putDataWithFlushes(primaryRegion, 100, 100, 0); + int numRows = 100; + + // refresh the store file list, and ensure that the files are picked up. + secondaryRegion.refreshStoreFiles(); + assertPathListsEqual(primaryRegion.getStoreFileList(families), + secondaryRegion.getStoreFileList(families)); + assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); + + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + + // Test case 2: 3 some more flushes + putDataWithFlushes(primaryRegion, 100, 300, 0); + numRows = 300; + + // refresh the store file list, and ensure that the files are picked up. + secondaryRegion.refreshStoreFiles(); + assertPathListsEqual(primaryRegion.getStoreFileList(families), + secondaryRegion.getStoreFileList(families)); + assertEquals(families.length * 4, secondaryRegion.getStoreFileList(families).size()); + + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + + if (FSUtils.WINDOWS) { + // compaction cannot move files while they are open in secondary on windows. Skip remaining. + return; + } + + // Test case 3: compact primary files + primaryRegion.compactStores(); + secondaryRegion.refreshStoreFiles(); + assertPathListsEqual(primaryRegion.getStoreFileList(families), + secondaryRegion.getStoreFileList(families)); + assertEquals(families.length, secondaryRegion.getStoreFileList(families).size()); + + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + + LOG.info("-- Replaying edits in secondary"); + + // Test case 4: replay some edits, ensure that memstore is dropped. + assertTrue(secondaryRegion.getMemstoreSize().get() == 0); + putDataWithFlushes(primaryRegion, 400, 400, 0); + numRows = 400; + + reader = createWALReaderForPrimary(); + while (true) { + WAL.Entry entry = reader.next(); + if (entry == null) { + break; + } + FlushDescriptor flush = WALEdit.getFlushDescriptor(entry.getEdit().getCells().get(0)); + if (flush != null) { + // do not replay flush + } else { + replayEdit(secondaryRegion, entry); + } + } + + assertTrue(secondaryRegion.getMemstoreSize().get() > 0); + + secondaryRegion.refreshStoreFiles(); + + assertTrue(secondaryRegion.getMemstoreSize().get() == 0); + + LOG.info("-- Verifying edits from primary"); + verifyData(primaryRegion, 0, numRows, cq, families); + LOG.info("-- Verifying edits from secondary"); + verifyData(secondaryRegion, 0, numRows, cq, families); + } + + /** + * Paths can be qualified or not. This does the assertion using String->Path conversion. + */ + private void assertPathListsEqual(List list1, List list2) { + List l1 = new ArrayList<>(list1.size()); + for (String path : list1) { + l1.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); + } + List l2 = new ArrayList<>(list2.size()); + for (String path : list2) { + l2.add(Path.getPathWithoutSchemeAndAuthority(new Path(path))); + } + assertEquals(l1, l2); + } + private void disableReads(HRegion region) { region.setReadsEnabled(false); try {