HBASE-27871 Meta replication stuck forever if wal it's still reading gets rolled and deleted

This commit is contained in:
Wellington Ramos Chevreuil 2023-06-05 21:33:48 +01:00
parent 50a6249731
commit 7eae2c4707
3 changed files with 58 additions and 1 deletions

View File

@ -25,6 +25,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -79,6 +80,8 @@ class ReplicationSourceWALReader extends Thread {
private long totalBufferQuota;
private final String walGroupId;
AtomicBoolean waitingPeerEnabled = new AtomicBoolean(false);
/**
* Creates a reader worker for a given WAL queue. Reads WAL entries off a given queue, batches the
* entries, and puts them on a batch queue.
@ -130,8 +133,11 @@ class ReplicationSourceWALReader extends Thread {
while (isReaderRunning()) { // loop here to keep reusing stream while we can
batch = null;
if (!source.isPeerEnabled()) {
waitingPeerEnabled.set(true);
Threads.sleep(sleepForRetries);
continue;
} else {
waitingPeerEnabled.set(false);
}
if (!checkQuota()) {
continue;

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.OptionalLong;
import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
@ -386,7 +387,10 @@ class WALEntryStream implements Closeable {
if (archivedLog != null) {
openReader(archivedLog);
} else {
throw fnfe;
// For now, this could happen only when reading meta wal for meta replicas.
// In this case, raising UncheckedIOException will let the endpoint deal with resetting
// the replication source. See HBASE-27871.
throw new UncheckedIOException(fnfe);
}
} catch (NullPointerException npe) {
throw new IOException("NPE resetting reader, likely HDFS-4380", npe);

View File

@ -29,6 +29,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@ -53,6 +55,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
@ -225,6 +228,50 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
}
@Test
public void testCatalogReplicaReplicationWALRolledAndDeleted() throws Exception {
TableName tableName = TableName.valueOf("hbase:meta");
try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
Table table = connection.getTable(tableName)) {
MiniHBaseCluster cluster = HTU.getHBaseCluster();
cluster.getMaster().balanceSwitch(false);
HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
ReplicationSource source = (ReplicationSource) hrs.getReplicationSourceService()
.getReplicationManager().catalogReplicationSource.get();
((ReplicationPeerImpl) source.replicationPeer).setPeerState(false);
// there's small chance source reader has passed the peer state check but not yet read the
// wal, which could allow it to read some added entries before the wal gets deleted,
// so we are making sure here we only proceed once the reader loop has managed to
// detect the peer is disabled.
HTU.waitFor(2000, 100, true, () -> {
MutableObject<Boolean> readerWaiting = new MutableObject<>(true);
source.logQueue.getQueues().keySet()
.forEach(w -> readerWaiting.setValue(readerWaiting.getValue()
&& source.workerThreads.get(w).entryReader.waitingPeerEnabled.get()));
return readerWaiting.getValue();
});
// load the data to the table
for (int i = 0; i < 5; i++) {
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
LOG.info("flushing table");
HTU.flush(tableName);
LOG.info("compacting table");
if (i < 4) {
HTU.compact(tableName, false);
}
}
HTU.getHBaseCluster().getMaster().getLogCleaner().runCleaner();
((ReplicationPeerImpl) source.replicationPeer).setPeerState(true);
// now loads more data without flushing nor compacting
for (int i = 5; i < 10; i++) {
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
}
verifyReplication(tableName, numOfMetaReplica, 0, 10000, HConstants.CATALOG_FAMILY);
}
}
@Test
public void testCatalogReplicaReplicationWithReplicaMoved() throws Exception {
MiniHBaseCluster cluster = HTU.getMiniHBaseCluster();