HBASE-20855 PeerConfigTracker only supporting one listener will cause problem when there is a recovered replication queue

Signed-off-by: tedyu <yuzhihong@gmail.com>
This commit is contained in:
jingyuntian 2018-07-19 11:51:54 +08:00 committed by tedyu
parent 61288f843c
commit 15ed2e86e1
7 changed files with 145 additions and 5 deletions

View File

@ -81,4 +81,10 @@ public interface ReplicationPeer {
* @param listener Listener for config changes, usually a replication endpoint * @param listener Listener for config changes, usually a replication endpoint
*/ */
void trackPeerConfigChanges(ReplicationPeerConfigListener listener); void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
/**
* Remove a listener when it is closed or terminated
* @param listener Listener for config changes, usually a replication endpoint
*/
void removeListenerOfPeerConfig(ReplicationPeerConfigListener listener);
} }

View File

@ -21,8 +21,10 @@ package org.apache.hadoop.hbase.replication;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -172,10 +174,21 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep
@Override @Override
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) { public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
if (this.peerConfigTracker != null){ if (this.peerConfigTracker != null){
this.peerConfigTracker.setListener(listener); this.peerConfigTracker.addListener(listener);
} }
} }
@Override
public void removeListenerOfPeerConfig(ReplicationPeerConfigListener listener) {
if (this.peerConfigTracker != null){
this.peerConfigTracker.removeListener(listener);
}
}
PeerConfigTracker getPeerConfigTracker() {
return this.peerConfigTracker;
}
@Override @Override
public void abort(String why, Throwable e) { public void abort(String why, Throwable e) {
LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig
@ -275,15 +288,24 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep
*/ */
public class PeerConfigTracker extends ZooKeeperNodeTracker { public class PeerConfigTracker extends ZooKeeperNodeTracker {
private ReplicationPeerConfigListener listener; private Set<ReplicationPeerConfigListener> listeners;
public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher, public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
Abortable abortable) { Abortable abortable) {
super(watcher, peerConfigNode, abortable); super(watcher, peerConfigNode, abortable);
listeners = new HashSet<>();
} }
public synchronized void setListener(ReplicationPeerConfigListener listener){ public synchronized void addListener(ReplicationPeerConfigListener listener){
this.listener = listener; listeners.add(listener);
}
Set<ReplicationPeerConfigListener> getListeners(){
return this.listeners;
}
public synchronized void removeListener(ReplicationPeerConfigListener listenerToRemove) {
listeners.remove(listenerToRemove);
} }
@Override @Override
@ -291,7 +313,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase implements Rep
if (path.equals(node)) { if (path.equals(node)) {
super.nodeCreated(path); super.nodeCreated(path);
ReplicationPeerConfig config = readPeerConfig(); ReplicationPeerConfig config = readPeerConfig();
if (listener != null){ for (ReplicationPeerConfigListener listener : listeners) {
listener.peerConfigUpdated(config); listener.peerConfigUpdated(config);
} }
} }

View File

@ -111,4 +111,10 @@ public abstract class BaseReplicationEndpoint extends AbstractService
return false; return false;
} }
public void close(){
if(this.ctx != null) {
ReplicationPeer peer = this.ctx.getReplicationPeer();
peer.removeListenerOfPeerConfig(this);
}
}
} }

View File

@ -89,6 +89,7 @@ public abstract class HBaseReplicationEndpoint extends BaseReplicationEndpoint
@Override @Override
protected void doStop() { protected void doStop() {
disconnect(); disconnect();
close();
notifyStopped(); notifyStopped();
} }

View File

@ -446,6 +446,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
"Aborting to prevent Replication from deadlocking. See HBASE-16081."; "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
abortable.abort(errMsg, new IOException(errMsg)); abortable.abort(errMsg, new IOException(errMsg));
} }
close();
notifyStopped(); notifyStopped();
} }

View File

@ -579,6 +579,10 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} }
if (allOtherTaskDone) { if (allOtherTaskDone) {
manager.closeRecoveredQueue(this.source); manager.closeRecoveredQueue(this.source);
// stop replication endpoint
if (source instanceof ReplicationSource) {
((ReplicationSource) source).replicationEndpoint.stop();
}
LOG.info("Finished recovering queue " + peerClusterZnode LOG.info("Finished recovering queue " + peerClusterZnode
+ " with the following stats: " + getStats()); + " with the following stats: " + getStats());
} }

View File

@ -0,0 +1,100 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Threads;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestReplicationConfigTracker extends TestReplicationBase {
private static final Log LOG = LogFactory.getLog(TestReplicationKillRS.class);
@Test
public void testReplicationConfigTracker() throws Exception {
// killing the RS with hbase:meta can result into failed puts until we solve
// IO fencing
int rsToKill1 = utility1.getHBaseCluster().getServerWithMeta() == 0 ? 1 : 0;
int otherRs = rsToKill1 == 0 ? 1 : 0;
final HRegionServer regionServer = utility1.getHBaseCluster().getRegionServer(otherRs);
final Thread listenerTracker = trackListener(utility1, otherRs);
LOG.info("Start loading table");
utility1.loadTable(htable1, famName, true);
LOG.info("Done loading table");
utility1.getHBaseCluster().getRegionServer(rsToKill1).abort("Stopping as part of the test");
utility1.getHBaseCluster().waitOnRegionServer(rsToKill1);
while (utility1.getHBaseCluster().getMaster().getServerManager().areDeadServersInProgress()) {
LOG.info("Waiting on processing of crashed server before proceeding...");
Threads.sleep(1000);
}
Waiter.waitFor(utility1.getConfiguration(), 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return !listenerTracker.isAlive();
}
});
final ReplicationPeerZKImpl.PeerConfigTracker tracker = getPeerConfigTracker(regionServer);
Waiter.waitFor(utility1.getConfiguration(), 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
return tracker.getListeners().size() == 1;
}
});
}
private static Thread trackListener(final HBaseTestingUtility utility, final int rs) {
Thread trackListener = new Thread() {
public void run() {
Replication replication = (Replication) utility.getHBaseCluster().getRegionServer(rs)
.getReplicationSourceService();
ReplicationSourceManager manager = replication.getReplicationManager();
ReplicationPeerZKImpl replicationPeerZK =
(ReplicationPeerZKImpl) manager.getReplicationPeers().getPeer(PEER_ID);
ReplicationPeerZKImpl.PeerConfigTracker peerConfigTracker =
replicationPeerZK.getPeerConfigTracker();
while (peerConfigTracker.getListeners().size() != 2) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
LOG.error("track config failed", e);
}
}
}
};
trackListener.setDaemon(true);
trackListener.start();
return trackListener;
}
private ReplicationPeerZKImpl.PeerConfigTracker getPeerConfigTracker(HRegionServer rs) {
Replication replication = (Replication) rs.getReplicationSourceService();
ReplicationSourceManager manager = replication.getReplicationManager();
ReplicationPeerZKImpl replicationPeerZK =
(ReplicationPeerZKImpl) manager.getReplicationPeers().getPeer(PEER_ID);
ReplicationPeerZKImpl.PeerConfigTracker peerConfigTracker =
replicationPeerZK.getPeerConfigTracker();
return peerConfigTracker;
}
}