HBASE-25924 Re-compute size of WAL file while removing from WALEntryStream (#3314)

Signed-off-by: Andrew Purtell <apurtell@apache.org>
Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
Rushabh Shah 2021-05-26 13:40:44 -04:00 committed by GitHub
parent 19fd42b1dc
commit a22e418cf6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 43 additions and 0 deletions

View File

@ -165,6 +165,12 @@ public class MetricsReplicationGlobalSourceSourceImpl
public void incrUncleanlyClosedWALs() { public void incrUncleanlyClosedWALs() {
uncleanlyClosedWAL.incr(1L); uncleanlyClosedWAL.incr(1L);
} }
@Override
public long getUncleanlyClosedWALs() {
return uncleanlyClosedWAL.value();
}
@Override @Override
public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
uncleanlyClosedSkippedBytes.incr(bytes); uncleanlyClosedSkippedBytes.incr(bytes);

View File

@ -69,6 +69,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void decrSizeOfHFileRefsQueue(long size); void decrSizeOfHFileRefsQueue(long size);
void incrUnknownFileLengthForClosedWAL(); void incrUnknownFileLengthForClosedWAL();
void incrUncleanlyClosedWALs(); void incrUncleanlyClosedWALs();
long getUncleanlyClosedWALs();
void incrBytesSkippedInUncleanlyClosedWALs(final long bytes); void incrBytesSkippedInUncleanlyClosedWALs(final long bytes);
void incrRestartedWALReading(); void incrRestartedWALReading();
void incrRepeatedFileBytes(final long bytes); void incrRepeatedFileBytes(final long bytes);

View File

@ -233,6 +233,11 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
uncleanlyClosedWAL.incr(1L); uncleanlyClosedWAL.incr(1L);
} }
@Override
public long getUncleanlyClosedWALs() {
return uncleanlyClosedWAL.value();
}
@Override @Override
public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) { public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
uncleanlyClosedSkippedBytes.incr(bytes); uncleanlyClosedSkippedBytes.incr(bytes);

View File

@ -208,6 +208,10 @@ public class ProtobufLogReader extends ReaderBase {
private String initInternal(FSDataInputStream stream, boolean isFirst) private String initInternal(FSDataInputStream stream, boolean isFirst)
throws IOException { throws IOException {
close(); close();
if (!isFirst) {
// Re-compute the file length.
this.fileLength = fs.getFileStatus(path).getLen();
}
long expectedPos = PB_WAL_MAGIC.length; long expectedPos = PB_WAL_MAGIC.length;
if (stream == null) { if (stream == null) {
stream = fs.open(path); stream = fs.open(path);

View File

@ -300,6 +300,15 @@ public class MetricsSource implements BaseSource {
return singleSourceSource.getSizeOfLogQueue(); return singleSourceSource.getSizeOfLogQueue();
} }
/**
* Get the value of uncleanlyClosedWAL counter
* @return uncleanlyClosedWAL
*/
public long getUncleanlyClosedWALs() {
return singleSourceSource.getUncleanlyClosedWALs();
}
/** /**
* Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one * Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
* @return lastTimestampForAge * @return lastTimestampForAge

View File

@ -862,4 +862,22 @@ public class TestWALEntryStream {
// After removing one wal, size of log queue will be 1 again. // After removing one wal, size of log queue will be 1 again.
assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue()); assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
} }
/**
* Tests that wals are closed cleanly and we read the trailer when we remove wal
* from WALEntryStream.
*/
@Test
public void testCleanClosedWALs() throws Exception {
try (WALEntryStream entryStream = new WALEntryStream(
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
appendToLogAndSync();
assertNotNull(entryStream.next());
log.rollWriter();
appendToLogAndSync();
assertNotNull(entryStream.next());
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
}
}
} }