HBASE-25924 Re-compute size of WAL file while removing from WALEntryStream (#3316)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Rushabh Shah 2021-05-26 13:42:52 -04:00 committed by GitHub
parent 4bcec1776b
commit 782e24bd9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 43 additions and 0 deletions

View File

@ -70,6 +70,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void decrSizeOfHFileRefsQueue(long size);
void incrUnknownFileLengthForClosedWAL();
void incrUncleanlyClosedWALs();
long getUncleanlyClosedWALs();
void incrBytesSkippedInUncleanlyClosedWALs(final long bytes);
void incrRestartedWALReading();
void incrRepeatedFileBytes(final long bytes);

View File

@ -175,6 +175,12 @@ public class MetricsReplicationGlobalSourceSource implements MetricsReplicationS
public void incrUncleanlyClosedWALs() {
uncleanlyClosedWAL.incr(1L);
}
@Override
public long getUncleanlyClosedWALs() {
return uncleanlyClosedWAL.value();
}
@Override
public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
uncleanlyClosedSkippedBytes.incr(bytes);

View File

@ -241,6 +241,11 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
uncleanlyClosedWAL.incr(1L);
}
@Override
public long getUncleanlyClosedWALs() {
return uncleanlyClosedWAL.value();
}
@Override
public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
uncleanlyClosedSkippedBytes.incr(bytes);

View File

@ -203,6 +203,10 @@ public class ProtobufLogReader extends ReaderBase {
private String initInternal(FSDataInputStream stream, boolean isFirst)
throws IOException {
close();
if (!isFirst) {
// Re-compute the file length.
this.fileLength = fs.getFileStatus(path).getLen();
}
long expectedPos = PB_WAL_MAGIC.length;
if (stream == null) {
stream = fs.open(path);

View File

@ -306,6 +306,15 @@ public class MetricsSource implements BaseSource {
return lastTimestamp;
}
/**
* Get the value of uncleanlyClosedWAL counter
* @return uncleanlyClosedWAL
*/
public long getUncleanlyClosedWALs() {
return singleSourceSource.getUncleanlyClosedWALs();
}
/**
* Get the source initializing counts
* @return number of replication sources getting initialized

View File

@ -899,4 +899,22 @@ public class TestWALEntryStream {
// After removing one wal, size of log queue will be 1 again.
assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
}
/**
* Tests that wals are closed cleanly and we read the trailer when we remove wal
* from WALEntryStream.
*/
@Test
public void testCleanClosedWALs() throws Exception {
try (WALEntryStream entryStream = new WALEntryStream(
logQueue, fs, conf, logQueue.getMetrics(), fakeWalGroupId)) {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
appendToLog();
assertNotNull(entryStream.next());
log.rollWriter();
appendToLog();
assertNotNull(entryStream.next());
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
}
}
}