diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index 7915ac3cef8..68ba7047633 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -109,9 +109,13 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti */ @InterfaceAudience.Private public abstract class AbstractFSWAL implements WAL { - private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class); + private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec"; + private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900; + /** Don't log blocking regions more frequently than this. */ + private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5); + protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min @@ -172,6 +176,8 @@ public abstract class AbstractFSWAL implements WAL { private final long walSyncTimeoutNs; + private final long walTooOldNs; + // If > than this size, roll the log. protected final long logrollsize; @@ -230,6 +236,9 @@ public abstract class AbstractFSWAL implements WAL { protected volatile boolean closed = false; protected final AtomicBoolean shutdown = new AtomicBoolean(false); + + private long nextLogTooOldNs = System.nanoTime(); + /** * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws * an IllegalArgumentException if used to compare paths from different wals. @@ -251,9 +260,15 @@ public abstract class AbstractFSWAL implements WAL { */ public final long logSize; + /** + * The nanoTime of the log rolling, used to determine the time interval that has passed since. + */ + public final long rollTimeNs; + public WalProps(Map encodedName2HighestSequenceId, long logSize) { this.encodedName2HighestSequenceId = encodedName2HighestSequenceId; this.logSize = logSize; + this.rollTimeNs = System.nanoTime(); } } @@ -425,6 +440,8 @@ public abstract class AbstractFSWAL implements WAL { } }; this.implClassName = getClass().getSimpleName(); + this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt( + SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT)); } /** @@ -613,12 +630,23 @@ public abstract class AbstractFSWAL implements WAL { */ private void cleanOldLogs() throws IOException { List> logsToArchive = null; + long now = System.nanoTime(); + boolean mayLogTooOld = nextLogTooOldNs > now; + ArrayList regionsBlockingWal = null; // For each log file, look at its Map of regions to highest sequence id; if all sequence ids // are older than what is currently in memory, the WAL can be GC'd. for (Map.Entry e : this.walFile2Props.entrySet()) { Path log = e.getKey(); + ArrayList regionsBlockingThisWal = null; + long ageNs = now - e.getValue().rollTimeNs; + if (ageNs > walTooOldNs) { + if (mayLogTooOld && regionsBlockingWal == null) { + regionsBlockingWal = new ArrayList<>(); + } + regionsBlockingThisWal = regionsBlockingWal; + } Map sequenceNums = e.getValue().encodedName2HighestSequenceId; - if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { + if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) { if (logsToArchive == null) { logsToArchive = new ArrayList<>(); } @@ -626,6 +654,20 @@ public abstract class AbstractFSWAL implements WAL { if (LOG.isTraceEnabled()) { LOG.trace("WAL file ready for archiving " + log); } + } else if (regionsBlockingThisWal != null) { + StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ") + .append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: "); + boolean isFirst = true; + for (byte[] region : regionsBlockingThisWal) { + if (!isFirst) { + sb.append("; "); + } + isFirst = false; + sb.append(Bytes.toString(region)); + } + LOG.warn(sb.toString()); + nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS; + regionsBlockingThisWal.clear(); } } if (logsToArchive != null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index e14ce0c9274..4e3ca176ce0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -377,9 +378,11 @@ class SequenceIdAccounting { * sequenceids, sequenceids we are holding on to in this accounting instance. * @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make sense for it to * be null). + * @param keysBlocking An optional collection that is used to return the specific keys that are + * causing this method to return false. * @return true if all sequenceids are lower, older than, the old sequenceids in this instance. */ - boolean areAllLower(Map sequenceids) { + boolean areAllLower(Map sequenceids, Collection keysBlocking) { Map flushing = null; Map unflushed = null; synchronized (this.tieLock) { @@ -388,6 +391,7 @@ class SequenceIdAccounting { flushing = flattenToLowestSequenceId(this.flushingSequenceIds); unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds); } + boolean result = true; for (Map.Entry e : sequenceids.entrySet()) { long oldestFlushing = Long.MAX_VALUE; long oldestUnflushed = Long.MAX_VALUE; @@ -399,10 +403,15 @@ class SequenceIdAccounting { } long min = Math.min(oldestFlushing, oldestUnflushed); if (min <= e.getValue()) { - return false; + if (keysBlocking == null) { + return false; + } + result = false; + keysBlocking.add(e.getKey()); + // Continue examining the map so we could log all regions blocking this WAL. } } - return true; + return result; } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java index 754aedb2c22..ff20ab5849f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java @@ -17,10 +17,12 @@ */ package org.apache.hadoop.hbase.regionserver.wal; +import static org.junit.Assert.assertArrayEquals; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -77,34 +79,38 @@ public class TestSequenceIdAccounting { sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME); Map m = new HashMap<>(); m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); - assertTrue(sida.areAllLower(m)); + assertTrue(sida.areAllLower(m, null)); long sequenceid = 1; sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); - assertTrue(sida.areAllLower(m)); + assertTrue(sida.areAllLower(m, null)); m.put(ENCODED_REGION_NAME, sequenceid); - assertFalse(sida.areAllLower(m)); + assertFalse(sida.areAllLower(m, null)); + ArrayList regions = new ArrayList<>(); + assertFalse(sida.areAllLower(m, regions)); + assertEquals(1, regions.size()); + assertArrayEquals(ENCODED_REGION_NAME, regions.get(0)); long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); assertEquals("Lowest should be first sequence id inserted", 1, lowest); m.put(ENCODED_REGION_NAME, lowest); - assertFalse(sida.areAllLower(m)); + assertFalse(sida.areAllLower(m, null)); // Now make sure above works when flushing. sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); - assertFalse(sida.areAllLower(m)); + assertFalse(sida.areAllLower(m, null)); m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); - assertTrue(sida.areAllLower(m)); + assertTrue(sida.areAllLower(m, null)); // Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits sida.completeCacheFlush(ENCODED_REGION_NAME); m.put(ENCODED_REGION_NAME, sequenceid); - assertTrue(sida.areAllLower(m)); + assertTrue(sida.areAllLower(m, null)); // Flush again but add sequenceids while we are flushing. sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true); lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME); m.put(ENCODED_REGION_NAME, lowest); - assertFalse(sida.areAllLower(m)); + assertFalse(sida.areAllLower(m, null)); sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); // The cache flush will clear out all sequenceid accounting by region. assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); @@ -116,7 +122,7 @@ public class TestSequenceIdAccounting { sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); - assertTrue(sida.areAllLower(m)); + assertTrue(sida.areAllLower(m, null)); } @Test