HBASE-25679 Size of log queue metric is incorrect (#3071)
Co-authored-by: Rushabh <rushabh.shah@salesforce.com> Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
3c815bef41
commit
27e9c281da
|
@ -186,4 +186,8 @@ public class ReplicationSourceLogQueue {
|
||||||
}
|
}
|
||||||
return oldestWalTimestamp;
|
return oldestWalTimestamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public MetricsSource getMetrics() {
|
||||||
|
return metrics;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -256,7 +256,6 @@ class WALEntryStream implements Closeable {
|
||||||
logQueue.remove(walGroupId);
|
logQueue.remove(walGroupId);
|
||||||
setCurrentPath(null);
|
setCurrentPath(null);
|
||||||
setPosition(0);
|
setPosition(0);
|
||||||
metrics.decrSizeOfLogQueue();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -135,7 +135,10 @@ public class TestWALEntryStream {
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
ReplicationSource source = mock(ReplicationSource.class);
|
ReplicationSource source = mock(ReplicationSource.class);
|
||||||
logQueue = new ReplicationSourceLogQueue(CONF, new MetricsSource("2"), source);
|
MetricsSource metricsSource = new MetricsSource("2");
|
||||||
|
// Source with the same id is shared and carries values from the last run
|
||||||
|
metricsSource.clear();
|
||||||
|
logQueue = new ReplicationSourceLogQueue(CONF, metricsSource, source);
|
||||||
pathWatcher = new PathWatcher();
|
pathWatcher = new PathWatcher();
|
||||||
final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
|
final WALFactory wals = new WALFactory(CONF, tn.getMethodName());
|
||||||
wals.getWALProvider().addWALActionsListener(pathWatcher);
|
wals.getWALProvider().addWALActionsListener(pathWatcher);
|
||||||
|
@ -766,4 +769,29 @@ public class TestWALEntryStream {
|
||||||
}
|
}
|
||||||
writer.close();
|
writer.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests size of log queue is incremented and decremented properly.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSizeOfLogQueue() throws Exception {
|
||||||
|
// There should be always 1 log which is current wal.
|
||||||
|
assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
|
||||||
|
appendToLogAndSync();
|
||||||
|
|
||||||
|
log.rollWriter();
|
||||||
|
// After rolling there will be 2 wals in the queue
|
||||||
|
assertEquals(2, logQueue.getMetrics().getSizeOfLogQueue());
|
||||||
|
|
||||||
|
try (WALEntryStream entryStream = new WALEntryStream(
|
||||||
|
logQueue, CONF, 0, log, null, logQueue.getMetrics(), fakeWalGroupId)) {
|
||||||
|
// There's one edit in the log, read it.
|
||||||
|
assertTrue(entryStream.hasNext());
|
||||||
|
WAL.Entry entry = entryStream.next();
|
||||||
|
assertNotNull(entry);
|
||||||
|
assertFalse(entryStream.hasNext());
|
||||||
|
}
|
||||||
|
// After removing one wal, size of log queue will be 1 again.
|
||||||
|
assertEquals(1, logQueue.getMetrics().getSizeOfLogQueue());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue