HBASE-25583: NoNodeException of the peer should call the remove peer workflow (#2970)

Signed-off-by: Bharath Vissapragada <bharathv@apache.org>
Signed-off-by: Andrew Purtell <apurtell@apache.org>
This commit is contained in:
Sandeep Pal 2021-02-26 16:00:37 -08:00 committed by GitHub
parent a7574ec9a0
commit 4cfbf19791
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 122 additions and 140 deletions

View File

@ -61,7 +61,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
@ -784,13 +783,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
private void updateLogPosition(long lastReadPosition) {
try {
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
} catch (ReplicationSourceWithoutPeerException re) {
source.terminate("Replication peer is removed and source should terminate", re);
}
manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
this.replicationQueueInfo.isQueueRecovered(), false);
lastLoggedPosition = lastReadPosition;
}
public void startup() {

View File

@ -189,12 +189,13 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param holdLogInZK if true then the log is retained in ZK
*/
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) throws ReplicationSourceWithoutPeerException {
boolean queueRecovered, boolean holdLogInZK) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
return;
}
cleanOldLogs(fileName, id, queueRecovered);
}
@ -205,8 +206,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param id id of the peer cluster
* @param queueRecovered Whether this is a recovered queue
*/
public void cleanOldLogs(String key, String id, boolean queueRecovered)
throws ReplicationSourceWithoutPeerException {
public void cleanOldLogs(String key, String id, boolean queueRecovered) {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) {
Map<String, SortedSet<String>> walsForPeer = walsByIdRecoveredQueues.get(id);
@ -218,7 +218,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
} else {
synchronized (this.walsById) {
SortedSet<String> wals = walsById.get(id).get(logPrefix);
SortedSet<String> wals = getLogsWithPrefix(id, logPrefix);
if (wals != null && !wals.first().equals(key)) {
cleanOldLogs(wals, key, id);
}
@ -226,16 +226,36 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
private void cleanOldLogs(SortedSet<String> wals, String key, String id)
throws ReplicationSourceWithoutPeerException {
private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
SortedSet<String> walSet = wals.headSet(key);
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
for (String wal : walSet) {
this.replicationQueues.removeLog(id, wal);
try {
for (String wal : walSet) {
this.replicationQueues.removeLog(id, wal);
}
} catch (ReplicationSourceWithoutPeerException rspe) {
// This means the source is running and replication peer have been removed
// We should call the removePeer workflow to terminate the source gracefully
LOG.warn("Replication peer " + id + " has been removed and source is still running", rspe);
String peerId = id;
if (peerId.contains("-")) {
peerId = peerId.split("-")[0];
}
peerRemoved(peerId);
}
walSet.clear();
}
/**
* Get logs with log prefix for the given wal group
* @param walGroupId wal group ID
* @param logPrefix log prefix
* @return logs with the given prefix
*/
public SortedSet<String> getLogsWithPrefix(String walGroupId, String logPrefix) {
return walsById.get(walGroupId).get(logPrefix);
}
/**
* Adds a normal source per registered peer cluster and tries to process all
* old region server wal queues
@ -579,7 +599,7 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
* Thie method first deletes all the recovered sources for the specified
* This method first deletes all the recovered sources for the specified
* id, then deletes the normal source (deleting all related data in ZK).
* @param id The id of the peer cluster
*/

View File

@ -16,11 +16,14 @@
package org.apache.hadoop.hbase.replication;
public class ReplicationSourceDummyWithNoTermination extends ReplicationSourceDummy {
volatile boolean firstTime = true;
@Override
public void terminate(String reason) {
// This is to block the zk listener to close the queues
// to simulate the znodes getting deleted without zk listener getting invoked
throw new RuntimeException(fakeExceptionMessage);
if (firstTime) {
firstTime = false;
throw new RuntimeException(fakeExceptionMessage);
}
}
}

View File

@ -28,20 +28,22 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@ -62,6 +64,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
@ -247,7 +250,6 @@ public class TestReplicationSource {
public boolean evaluate() throws Exception {
return future.isDone();
}
});
}
@ -277,7 +279,7 @@ public class TestReplicationSource {
}
private static final class Mocks {
private final ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
private ReplicationSourceManager manager = mock(ReplicationSourceManager.class);
private final ReplicationQueues queues = mock(ReplicationQueues.class);
private final ReplicationPeers peers = mock(ReplicationPeers.class);
private final MetricsSource metrics = mock(MetricsSource.class);
@ -291,12 +293,32 @@ public class TestReplicationSource {
when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed);
}
// source manager throws the exception while cleaning logs
private void setReplicationSourceWithoutPeerException()
throws ReplicationSourceWithoutPeerException {
doThrow(new ReplicationSourceWithoutPeerException("No peer")).when(manager)
ReplicationSource createReplicationSourceAndManagerWithMocks(ReplicationEndpoint endpoint)
throws Exception {
ReplicationTracker tracker = mock(ReplicationTracker.class);
Server server = mock(Server.class);
FileSystem fs = mock(FileSystem.class);
UUID clusterId = UUID.randomUUID();
String peerId = "testPeerClusterZnode";
manager = Mockito.spy(new ReplicationSourceManager(
queues, peers, tracker, conf, server, fs, logDir, oldLogDir, clusterId));
doCallRealMethod().when(manager).removePeer(Mockito.anyString());
// Mock the failure during cleaning log with node already deleted
doThrow(new ReplicationSourceWithoutPeerException("Peer Removed")).when(queues)
.removeLog(anyString(), anyString());
doCallRealMethod().when(manager)
.logPositionAndCleanOldLogs(Mockito.<Path>anyObject(), Mockito.anyString(),
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
final ReplicationSource source = new ReplicationSource();
endpoint.init(context);
source.init(conf, FS, manager, queues, peers, mock(Stoppable.class),
peerId, clusterId, endpoint, metrics);
manager.getSources().add(source);
SortedSet<String> walsWithPrefix = Sets.newTreeSet(Collections.singletonList("fake"));
doReturn(walsWithPrefix).when(manager).getLogsWithPrefix(anyString(), anyString());
return source;
}
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint,
@ -522,8 +544,7 @@ public class TestReplicationSource {
*/
@Test
public void testReplicationSourceTerminationWhenNoZnodeForPeerAndQueues() throws Exception {
Mocks mocks = new Mocks();
mocks.setReplicationSourceWithoutPeerException();
final Mocks mocks = new Mocks();
// set table cfs to filter all cells out
final TableName replicatedTable = TableName.valueOf("replicated_table");
final Map<TableName, List<String>> cfs =
@ -543,7 +564,7 @@ public class TestReplicationSource {
}
};
final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint, false);
final ReplicationSource source = mocks.createReplicationSourceAndManagerWithMocks(endpoint);
source.run();
source.enqueueLog(log1);
@ -561,10 +582,9 @@ public class TestReplicationSource {
}
});
// After that the source should be terminated
// And the source should be terminated
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() {
// wait until reader read all cells
return !source.isSourceActive();
}
});

View File

@ -35,7 +35,7 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
import org.apache.hadoop.hbase.util.Bytes;
@ -51,10 +51,10 @@ import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
public abstract class TestReplicationSourceBase {
public abstract class TestReplicationSourceManagerBase {
private static final Log LOG =
LogFactory.getLog(TestReplicationSourceBase.class);
LogFactory.getLog(TestReplicationSourceManagerBase.class);
protected static Configuration conf;
protected static HBaseTestingUtility utility;
@ -75,10 +75,12 @@ public abstract class TestReplicationSourceBase {
protected static Path logDir;
protected static DummyServer server;
@BeforeClass public static void setUpBeforeClass() throws Exception {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
ReplicationSourceDummy.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);

View File

@ -25,7 +25,6 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
@ -47,17 +46,13 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
@ -72,91 +67,25 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestReplicationSourceManager extends TestReplicationSourceBase {
public class TestReplicationSourceManagerManager extends TestReplicationSourceManagerBase {
private static final Log LOG =
LogFactory.getLog(TestReplicationSourceManager.class);
private static final TableName test =
TableName.valueOf("test");
private static final String slaveId = "1";
private static CountDownLatch latch;
LogFactory.getLog(TestReplicationSourceManagerManager.class);
private static List<String> files = new ArrayList<>();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummy.class.getCanonicalName());
conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
HConstants.REPLICATION_ENABLE_DEFAULT);
conf.setLong("replication.sleep.before.failover", 2000);
conf.setInt("replication.source.maxretriesmultiplier", 10);
utility = new HBaseTestingUtility(conf);
utility.startMiniZKCluster();
zkw = new ZooKeeperWatcher(conf, "test", null);
ZKUtil.createWithParents(zkw, "/hbase/replication");
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
ZKUtil.setData(zkw, "/hbase/replication/peers/1",
Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
ZKClusterId.setClusterId(zkw, new ClusterId());
FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
fs = FileSystem.get(conf);
oldLogDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME);
server = new DummyServer(conf, "example.hostname.com", zkw);
replication = new Replication(server, fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
manager.addSource(slaveId);
htd = new HTableDescriptor(test);
HColumnDescriptor col = new HColumnDescriptor(f1);
col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
htd.addFamily(col);
col = new HColumnDescriptor(f2);
col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
htd.addFamily(col);
hri = new HRegionInfo(htd.getTableName(), r1, r2);
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
manager.join();
utility.shutdownMiniCluster();
}
private static CountDownLatch latch;
@Test
public void testLogRoll() throws Exception {

View File

@ -21,25 +21,30 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestReplicationSourceWithoutReplicationZnodes extends TestReplicationSourceBase {
public class TestReplicationSourceWithoutReplicationZnodes
extends TestReplicationSourceManagerBase {
@Before
public void removeExistingSourcesFromSourceManager() {
manager.getSources().clear();
manager.getOldSources().clear();
}
/**
* When the peer is removed, hbase remove the peer znodes and there is zk watcher
@ -47,39 +52,47 @@ public class TestReplicationSourceWithoutReplicationZnodes extends TestReplicati
* or a race condition between source deleting the log znode and zk watcher
* terminating the source, we might get the NoNode exception. In that case, the right
* thing is to terminate the replication source.
*
* @throws Exception throws exception
*/
@Test
public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
KeyValue kv = new KeyValue(r1, f1, r1);
WALEdit edit = new WALEdit();
edit.add(kv);
List<WALActionsListener> listeners = new ArrayList<>();
listeners.add(replication);
final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
URLEncoder.encode("regionserver:60020", "UTF8"));
final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
manager.init();
final long txid = wal.append(htd, hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
edit, true);
wal.sync(txid);
wal.rollWriter();
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
ReplicationException exceptionThrown = null;
try {
manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
"1", 0, false, false);
} catch (ReplicationException e) {
exceptionThrown = e;
}
conf.set("replication.replicationsource.implementation",
ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
List<WALActionsListener> listeners = new ArrayList<>();
listeners.add(replication);
final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
URLEncoder.encode("regionserver:60020", "UTF8"));
final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
manager.init();
Assert.assertTrue(exceptionThrown instanceof ReplicationSourceWithoutPeerException);
final long txid = wal.append(htd, hri,
new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc), edit,
true);
wal.sync(txid);
wal.rollWriter();
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/" + server.getServerName() + "/1");
Assert.assertEquals("There should be exactly one source",
1, manager.getSources().size());
Assert.assertEquals("Replication source is not correct",
ReplicationSourceDummyWithNoTermination.class,
manager.getSources().get(0).getClass());
manager
.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false,
false);
Assert.assertTrue("Replication source should be terminated and removed",
manager.getSources().isEmpty());
} finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
}
}
}