HBASE-21626 : log the regions blocking WAL from being archived
This commit is contained in:
parent
63a1fcbe46
commit
e69ab24552
|
@ -109,9 +109,13 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AbstractFSWAL.class);
|
||||||
|
|
||||||
|
private static final String SURVIVED_TOO_LONG_SEC_KEY = "hbase.regionserver.wal.too.old.sec";
|
||||||
|
private static final int SURVIVED_TOO_LONG_SEC_DEFAULT = 900;
|
||||||
|
/** Don't log blocking regions more frequently than this. */
|
||||||
|
private static final long SURVIVED_TOO_LONG_LOG_INTERVAL_NS = TimeUnit.MINUTES.toNanos(5);
|
||||||
|
|
||||||
protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
|
protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms
|
||||||
|
|
||||||
private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
|
private static final int DEFAULT_WAL_SYNC_TIMEOUT_MS = 5 * 60 * 1000; // in ms, 5min
|
||||||
|
@ -172,6 +176,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
|
|
||||||
private final long walSyncTimeoutNs;
|
private final long walSyncTimeoutNs;
|
||||||
|
|
||||||
|
private final long walTooOldNs;
|
||||||
|
|
||||||
// If > than this size, roll the log.
|
// If > than this size, roll the log.
|
||||||
protected final long logrollsize;
|
protected final long logrollsize;
|
||||||
|
|
||||||
|
@ -230,6 +236,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
protected volatile boolean closed = false;
|
protected volatile boolean closed = false;
|
||||||
|
|
||||||
protected final AtomicBoolean shutdown = new AtomicBoolean(false);
|
protected final AtomicBoolean shutdown = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private long nextLogTooOldNs = System.nanoTime();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
|
* WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws
|
||||||
* an IllegalArgumentException if used to compare paths from different wals.
|
* an IllegalArgumentException if used to compare paths from different wals.
|
||||||
|
@ -251,9 +260,15 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
*/
|
*/
|
||||||
public final long logSize;
|
public final long logSize;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The nanoTime of the log rolling, used to determine the time interval that has passed since.
|
||||||
|
*/
|
||||||
|
public final long rollTimeNs;
|
||||||
|
|
||||||
public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
|
public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
|
||||||
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
|
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
|
||||||
this.logSize = logSize;
|
this.logSize = logSize;
|
||||||
|
this.rollTimeNs = System.nanoTime();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -425,6 +440,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
this.implClassName = getClass().getSimpleName();
|
this.implClassName = getClass().getSimpleName();
|
||||||
|
this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
|
||||||
|
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -613,12 +630,23 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
*/
|
*/
|
||||||
private void cleanOldLogs() throws IOException {
|
private void cleanOldLogs() throws IOException {
|
||||||
List<Pair<Path, Long>> logsToArchive = null;
|
List<Pair<Path, Long>> logsToArchive = null;
|
||||||
|
long now = System.nanoTime();
|
||||||
|
boolean mayLogTooOld = nextLogTooOldNs > now;
|
||||||
|
ArrayList<byte[]> regionsBlockingWal = null;
|
||||||
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
|
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
|
||||||
// are older than what is currently in memory, the WAL can be GC'd.
|
// are older than what is currently in memory, the WAL can be GC'd.
|
||||||
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
|
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
|
||||||
Path log = e.getKey();
|
Path log = e.getKey();
|
||||||
|
ArrayList<byte[]> regionsBlockingThisWal = null;
|
||||||
|
long ageNs = now - e.getValue().rollTimeNs;
|
||||||
|
if (ageNs > walTooOldNs) {
|
||||||
|
if (mayLogTooOld && regionsBlockingWal == null) {
|
||||||
|
regionsBlockingWal = new ArrayList<>();
|
||||||
|
}
|
||||||
|
regionsBlockingThisWal = regionsBlockingWal;
|
||||||
|
}
|
||||||
Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
|
Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
|
||||||
if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
|
if (this.sequenceIdAccounting.areAllLower(sequenceNums, regionsBlockingThisWal)) {
|
||||||
if (logsToArchive == null) {
|
if (logsToArchive == null) {
|
||||||
logsToArchive = new ArrayList<>();
|
logsToArchive = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
@ -626,6 +654,20 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("WAL file ready for archiving " + log);
|
LOG.trace("WAL file ready for archiving " + log);
|
||||||
}
|
}
|
||||||
|
} else if (regionsBlockingThisWal != null) {
|
||||||
|
StringBuilder sb = new StringBuilder(log.toString()).append(" has not been archived for ")
|
||||||
|
.append(TimeUnit.NANOSECONDS.toSeconds(ageNs)).append(" seconds; blocked by: ");
|
||||||
|
boolean isFirst = true;
|
||||||
|
for (byte[] region : regionsBlockingThisWal) {
|
||||||
|
if (!isFirst) {
|
||||||
|
sb.append("; ");
|
||||||
|
}
|
||||||
|
isFirst = false;
|
||||||
|
sb.append(Bytes.toString(region));
|
||||||
|
}
|
||||||
|
LOG.warn(sb.toString());
|
||||||
|
nextLogTooOldNs = now + SURVIVED_TOO_LONG_LOG_INTERVAL_NS;
|
||||||
|
regionsBlockingThisWal.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (logsToArchive != null) {
|
if (logsToArchive != null) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -377,9 +378,11 @@ class SequenceIdAccounting {
|
||||||
* sequenceids, sequenceids we are holding on to in this accounting instance.
|
* sequenceids, sequenceids we are holding on to in this accounting instance.
|
||||||
* @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make sense for it to
|
* @param sequenceids Keyed by encoded region name. Cannot be null (doesn't make sense for it to
|
||||||
* be null).
|
* be null).
|
||||||
|
* @param keysBlocking An optional collection that is used to return the specific keys that are
|
||||||
|
* causing this method to return false.
|
||||||
* @return true if all sequenceids are lower, older than, the old sequenceids in this instance.
|
* @return true if all sequenceids are lower, older than, the old sequenceids in this instance.
|
||||||
*/
|
*/
|
||||||
boolean areAllLower(Map<byte[], Long> sequenceids) {
|
boolean areAllLower(Map<byte[], Long> sequenceids, Collection<byte[]> keysBlocking) {
|
||||||
Map<byte[], Long> flushing = null;
|
Map<byte[], Long> flushing = null;
|
||||||
Map<byte[], Long> unflushed = null;
|
Map<byte[], Long> unflushed = null;
|
||||||
synchronized (this.tieLock) {
|
synchronized (this.tieLock) {
|
||||||
|
@ -388,6 +391,7 @@ class SequenceIdAccounting {
|
||||||
flushing = flattenToLowestSequenceId(this.flushingSequenceIds);
|
flushing = flattenToLowestSequenceId(this.flushingSequenceIds);
|
||||||
unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds);
|
unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds);
|
||||||
}
|
}
|
||||||
|
boolean result = true;
|
||||||
for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
|
for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
|
||||||
long oldestFlushing = Long.MAX_VALUE;
|
long oldestFlushing = Long.MAX_VALUE;
|
||||||
long oldestUnflushed = Long.MAX_VALUE;
|
long oldestUnflushed = Long.MAX_VALUE;
|
||||||
|
@ -399,10 +403,15 @@ class SequenceIdAccounting {
|
||||||
}
|
}
|
||||||
long min = Math.min(oldestFlushing, oldestUnflushed);
|
long min = Math.min(oldestFlushing, oldestUnflushed);
|
||||||
if (min <= e.getValue()) {
|
if (min <= e.getValue()) {
|
||||||
|
if (keysBlocking == null) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
result = false;
|
||||||
|
keysBlocking.add(e.getKey());
|
||||||
|
// Continue examining the map so we could log all regions blocking this WAL.
|
||||||
}
|
}
|
||||||
return true;
|
}
|
||||||
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -17,10 +17,12 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.regionserver.wal;
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -77,34 +79,38 @@ public class TestSequenceIdAccounting {
|
||||||
sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
|
sida.getOrCreateLowestSequenceIds(ENCODED_REGION_NAME);
|
||||||
Map<byte[], Long> m = new HashMap<>();
|
Map<byte[], Long> m = new HashMap<>();
|
||||||
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
|
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
|
||||||
assertTrue(sida.areAllLower(m));
|
assertTrue(sida.areAllLower(m, null));
|
||||||
long sequenceid = 1;
|
long sequenceid = 1;
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true);
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
||||||
assertTrue(sida.areAllLower(m));
|
assertTrue(sida.areAllLower(m, null));
|
||||||
m.put(ENCODED_REGION_NAME, sequenceid);
|
m.put(ENCODED_REGION_NAME, sequenceid);
|
||||||
assertFalse(sida.areAllLower(m));
|
assertFalse(sida.areAllLower(m, null));
|
||||||
|
ArrayList<byte[]> regions = new ArrayList<>();
|
||||||
|
assertFalse(sida.areAllLower(m, regions));
|
||||||
|
assertEquals(1, regions.size());
|
||||||
|
assertArrayEquals(ENCODED_REGION_NAME, regions.get(0));
|
||||||
long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
|
long lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
|
||||||
assertEquals("Lowest should be first sequence id inserted", 1, lowest);
|
assertEquals("Lowest should be first sequence id inserted", 1, lowest);
|
||||||
m.put(ENCODED_REGION_NAME, lowest);
|
m.put(ENCODED_REGION_NAME, lowest);
|
||||||
assertFalse(sida.areAllLower(m));
|
assertFalse(sida.areAllLower(m, null));
|
||||||
// Now make sure above works when flushing.
|
// Now make sure above works when flushing.
|
||||||
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
|
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
|
||||||
assertFalse(sida.areAllLower(m));
|
assertFalse(sida.areAllLower(m, null));
|
||||||
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
|
m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM);
|
||||||
assertTrue(sida.areAllLower(m));
|
assertTrue(sida.areAllLower(m, null));
|
||||||
// Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits
|
// Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits
|
||||||
sida.completeCacheFlush(ENCODED_REGION_NAME);
|
sida.completeCacheFlush(ENCODED_REGION_NAME);
|
||||||
m.put(ENCODED_REGION_NAME, sequenceid);
|
m.put(ENCODED_REGION_NAME, sequenceid);
|
||||||
assertTrue(sida.areAllLower(m));
|
assertTrue(sida.areAllLower(m, null));
|
||||||
// Flush again but add sequenceids while we are flushing.
|
// Flush again but add sequenceids while we are flushing.
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid++, true);
|
||||||
lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
|
lowest = sida.getLowestSequenceId(ENCODED_REGION_NAME);
|
||||||
m.put(ENCODED_REGION_NAME, lowest);
|
m.put(ENCODED_REGION_NAME, lowest);
|
||||||
assertFalse(sida.areAllLower(m));
|
assertFalse(sida.areAllLower(m, null));
|
||||||
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
|
sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES);
|
||||||
// The cache flush will clear out all sequenceid accounting by region.
|
// The cache flush will clear out all sequenceid accounting by region.
|
||||||
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
|
assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME));
|
||||||
|
@ -116,7 +122,7 @@ public class TestSequenceIdAccounting {
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
|
||||||
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
|
sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true);
|
||||||
assertTrue(sida.areAllLower(m));
|
assertTrue(sida.areAllLower(m, null));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
Loading…
Reference in New Issue