HBASE-25924 Re-compute size of WAL file while removing from WALEntryStream (#3315)
Signed-off-by: Andrew Purtell <apurtell@apache.org> Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
This commit is contained in:
parent
f7358cd618
commit
e265eccf20
|
@ -72,6 +72,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
|
|||
void decrSizeOfHFileRefsQueue(long size);
|
||||
void incrUnknownFileLengthForClosedWAL();
|
||||
void incrUncleanlyClosedWALs();
|
||||
long getUncleanlyClosedWALs();
|
||||
void incrBytesSkippedInUncleanlyClosedWALs(final long bytes);
|
||||
void incrRestartedWALReading();
|
||||
void incrRepeatedFileBytes(final long bytes);
|
||||
|
|
|
@ -189,6 +189,12 @@ public class MetricsReplicationGlobalSourceSourceImpl
|
|||
public void incrUncleanlyClosedWALs() {
|
||||
uncleanlyClosedWAL.incr(1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getUncleanlyClosedWALs() {
|
||||
return uncleanlyClosedWAL.value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
|
||||
uncleanlyClosedSkippedBytes.incr(bytes);
|
||||
|
|
|
@ -245,6 +245,11 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
|
|||
uncleanlyClosedWAL.incr(1L);
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getUncleanlyClosedWALs() {
|
||||
return uncleanlyClosedWAL.value();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void incrBytesSkippedInUncleanlyClosedWALs(final long bytes) {
|
||||
uncleanlyClosedSkippedBytes.incr(bytes);
|
||||
|
|
|
@ -208,6 +208,10 @@ public class ProtobufLogReader extends ReaderBase {
|
|||
private String initInternal(FSDataInputStream stream, boolean isFirst)
|
||||
throws IOException {
|
||||
close();
|
||||
if (!isFirst) {
|
||||
// Re-compute the file length.
|
||||
this.fileLength = fs.getFileStatus(path).getLen();
|
||||
}
|
||||
long expectedPos = PB_WAL_MAGIC.length;
|
||||
if (stream == null) {
|
||||
stream = fs.open(path);
|
||||
|
|
|
@ -311,6 +311,15 @@ public class MetricsSource implements BaseSource {
|
|||
return getTimestampOfLastShippedOp();
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get the value of uncleanlyClosedWAL counter
|
||||
* @return uncleanlyClosedWAL
|
||||
*/
|
||||
public long getUncleanlyClosedWALs() {
|
||||
return singleSourceSource.getUncleanlyClosedWALs();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the timestampsOfLastShippedOp, if there are multiple groups, return the latest one
|
||||
* @return lastTimestampForAge
|
||||
|
|
|
@ -862,4 +862,22 @@ public class TestWALEntryStream {
|
|||
// After removing one wal, size of log queue will be 1 again.
|
||||
assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that wals are closed cleanly and we read the trailer when we remove wal
|
||||
* from WALEntryStream.
|
||||
*/
|
||||
@Test
|
||||
public void testCleanClosedWALs() throws Exception {
|
||||
try (WALEntryStream entryStream = new WALEntryStream(
|
||||
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
|
||||
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
|
||||
appendToLogAndSync();
|
||||
assertNotNull(entryStream.next());
|
||||
log.rollWriter();
|
||||
appendToLogAndSync();
|
||||
assertNotNull(entryStream.next());
|
||||
assertEquals(0, logQueue.getMetrics().getUncleanlyClosedWALs());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue