HBASE-15684 Fix the broken log file size accounting
This commit is contained in:
parent
ac415f85f3
commit
2f4c91e41a
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.DrainBarrier;
|
import org.apache.hadoop.hbase.util.DrainBarrier;
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
import org.apache.hadoop.hbase.util.Pair;
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hadoop.hbase.wal.WALKey;
|
import org.apache.hadoop.hbase.wal.WALKey;
|
||||||
|
@ -222,12 +223,31 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
private static final class WalProps {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map the encoded region name to the highest sequence id. Contain all the regions it has entries of
|
||||||
|
*/
|
||||||
|
public final Map<byte[], Long> encodedName2HighestSequenceId;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The log file size. Notice that the size may not be accurate if we do asynchronous close in
|
||||||
|
* sub classes.
|
||||||
|
*/
|
||||||
|
public final long logSize;
|
||||||
|
|
||||||
|
public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
|
||||||
|
this.encodedName2HighestSequenceId = encodedName2HighestSequenceId;
|
||||||
|
this.logSize = logSize;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map of WAL log file to the latest sequence ids of all regions it has entries of. The map is
|
* Map of WAL log file to properties. The map is sorted by the log file creation timestamp
|
||||||
* sorted by the log file creation timestamp (contained in the log file name).
|
* (contained in the log file name).
|
||||||
*/
|
*/
|
||||||
protected ConcurrentNavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds =
|
protected ConcurrentNavigableMap<Path, WalProps> walFile2Props = new ConcurrentSkipListMap<>(
|
||||||
new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR);
|
LOG_NAME_COMPARATOR);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures.
|
* Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures.
|
||||||
|
@ -503,7 +523,7 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
// public only until class moves to o.a.h.h.wal
|
// public only until class moves to o.a.h.h.wal
|
||||||
/** @return the number of rolled log files */
|
/** @return the number of rolled log files */
|
||||||
public int getNumRolledLogFiles() {
|
public int getNumRolledLogFiles() {
|
||||||
return byWalRegionSequenceIds.size();
|
return walFile2Props.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
// public only until class moves to o.a.h.h.wal
|
// public only until class moves to o.a.h.h.wal
|
||||||
|
@ -523,8 +543,9 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
byte[][] regions = null;
|
byte[][] regions = null;
|
||||||
int logCount = getNumRolledLogFiles();
|
int logCount = getNumRolledLogFiles();
|
||||||
if (logCount > this.maxLogs && logCount > 0) {
|
if (logCount > this.maxLogs && logCount > 0) {
|
||||||
Map.Entry<Path, Map<byte[], Long>> firstWALEntry = this.byWalRegionSequenceIds.firstEntry();
|
Map.Entry<Path, WalProps> firstWALEntry = this.walFile2Props.firstEntry();
|
||||||
regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue());
|
regions = this.sequenceIdAccounting
|
||||||
|
.findLower(firstWALEntry.getValue().encodedName2HighestSequenceId);
|
||||||
}
|
}
|
||||||
if (regions != null) {
|
if (regions != null) {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
|
@ -544,27 +565,27 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
|
* Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed.
|
||||||
*/
|
*/
|
||||||
private void cleanOldLogs() throws IOException {
|
private void cleanOldLogs() throws IOException {
|
||||||
List<Path> logsToArchive = null;
|
List<Pair<Path, Long>> logsToArchive = null;
|
||||||
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
|
// For each log file, look at its Map of regions to highest sequence id; if all sequence ids
|
||||||
// are older than what is currently in memory, the WAL can be GC'd.
|
// are older than what is currently in memory, the WAL can be GC'd.
|
||||||
for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) {
|
for (Map.Entry<Path, WalProps> e : this.walFile2Props.entrySet()) {
|
||||||
Path log = e.getKey();
|
Path log = e.getKey();
|
||||||
Map<byte[], Long> sequenceNums = e.getValue();
|
Map<byte[], Long> sequenceNums = e.getValue().encodedName2HighestSequenceId;
|
||||||
if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
|
if (this.sequenceIdAccounting.areAllLower(sequenceNums)) {
|
||||||
if (logsToArchive == null) {
|
if (logsToArchive == null) {
|
||||||
logsToArchive = new ArrayList<Path>();
|
logsToArchive = new ArrayList<>();
|
||||||
}
|
}
|
||||||
logsToArchive.add(log);
|
logsToArchive.add(Pair.newPair(log, e.getValue().logSize));
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
LOG.trace("WAL file ready for archiving " + log);
|
LOG.trace("WAL file ready for archiving " + log);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (logsToArchive != null) {
|
if (logsToArchive != null) {
|
||||||
for (Path p : logsToArchive) {
|
for (Pair<Path, Long> logAndSize : logsToArchive) {
|
||||||
this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen());
|
this.totalLogSize.addAndGet(-logAndSize.getSecond());
|
||||||
archiveLogFile(p);
|
archiveLogFile(logAndSize.getFirst());
|
||||||
this.byWalRegionSequenceIds.remove(p);
|
this.walFile2Props.remove(logAndSize.getFirst());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -617,12 +638,12 @@ public abstract class AbstractFSWAL<W> implements WAL {
|
||||||
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
|
Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException {
|
||||||
TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
|
TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
|
||||||
try {
|
try {
|
||||||
long oldFileLen = 0L;
|
long oldFileLen = doReplaceWriter(oldPath, newPath, nextWriter);
|
||||||
doReplaceWriter(oldPath, newPath, nextWriter);
|
|
||||||
int oldNumEntries = this.numEntries.get();
|
int oldNumEntries = this.numEntries.get();
|
||||||
final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
|
final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath));
|
||||||
if (oldPath != null) {
|
if (oldPath != null) {
|
||||||
this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest());
|
this.walFile2Props.put(oldPath,
|
||||||
|
new WalProps(this.sequenceIdAccounting.resetHighest(), oldFileLen));
|
||||||
this.totalLogSize.addAndGet(oldFileLen);
|
this.totalLogSize.addAndGet(oldFileLen);
|
||||||
LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
|
LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries
|
||||||
+ ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
|
+ ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString);
|
||||||
|
|
|
@ -169,7 +169,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
|
* iff the given WALFactory is using the DefaultWALProvider for meta and/or non-meta, count the
|
||||||
* size of files (rolled and active). if either of them aren't, count 0 for that provider.
|
* size of files (only rolled). if either of them aren't, count 0 for that provider.
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public long getLogFileSize() {
|
public long getLogFileSize() {
|
||||||
|
@ -185,6 +185,14 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen
|
||||||
return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
|
return ((AbstractFSWAL<?>) wal).getNumRolledLogFiles();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* returns the size of rolled WAL files.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public static long getLogFileSize(WAL wal) {
|
||||||
|
return ((AbstractFSWAL<?>) wal).getLogFileSize();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* return the current filename from the current wal.
|
* return the current filename from the current wal.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.regionserver.Store;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||||
import org.apache.hadoop.hbase.wal.FSHLogProvider;
|
|
||||||
import org.apache.hadoop.hbase.wal.WAL;
|
import org.apache.hadoop.hbase.wal.WAL;
|
||||||
import org.apache.hadoop.hbase.wal.WALFactory;
|
import org.apache.hadoop.hbase.wal.WALFactory;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
@ -172,6 +171,14 @@ public abstract class AbstractTestLogRolling {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void assertLogFileSize(WAL log) {
|
||||||
|
if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) {
|
||||||
|
assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0);
|
||||||
|
} else {
|
||||||
|
assertEquals(0, AbstractFSWALProvider.getLogFileSize(log));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests that logs are deleted
|
* Tests that logs are deleted
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
@ -182,23 +189,24 @@ public abstract class AbstractTestLogRolling {
|
||||||
this.tableName = getName();
|
this.tableName = getName();
|
||||||
// TODO: Why does this write data take for ever?
|
// TODO: Why does this write data take for ever?
|
||||||
startAndWriteData();
|
startAndWriteData();
|
||||||
HRegionInfo region =
|
HRegionInfo region = server.getOnlineRegions(TableName.valueOf(tableName)).get(0)
|
||||||
server.getOnlineRegions(TableName.valueOf(tableName)).get(0).getRegionInfo();
|
.getRegionInfo();
|
||||||
final WAL log = server.getWAL(region);
|
final WAL log = server.getWAL(region);
|
||||||
LOG.info("after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) +
|
LOG.info("after writing there are " + AbstractFSWALProvider.getNumRolledLogFiles(log) + " log files");
|
||||||
" log files");
|
assertLogFileSize(log);
|
||||||
|
|
||||||
// flush all regions
|
// flush all regions
|
||||||
for (Region r: server.getOnlineRegionsLocalContext()) {
|
for (Region r : server.getOnlineRegionsLocalContext()) {
|
||||||
r.flush(true);
|
r.flush(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Now roll the log
|
// Now roll the log
|
||||||
log.rollWriter();
|
log.rollWriter();
|
||||||
|
|
||||||
int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
|
int count = AbstractFSWALProvider.getNumRolledLogFiles(log);
|
||||||
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
|
LOG.info("after flushing all regions and rolling logs there are " + count + " log files");
|
||||||
assertTrue(("actual count: " + count), count <= 2);
|
assertTrue(("actual count: " + count), count <= 2);
|
||||||
|
assertLogFileSize(log);
|
||||||
}
|
}
|
||||||
|
|
||||||
protected String getName() {
|
protected String getName() {
|
||||||
|
|
Loading…
Reference in New Issue