HBASE-19633 Clean up the replication queues in the postPeerModification stage when removing a peer

This commit is contained in:
zhangduo 2018-01-02 09:57:23 +08:00
parent 1e36a84afc
commit 525fef572e
18 changed files with 124 additions and 117 deletions

View File

@ -27,8 +27,8 @@ import java.util.Set;
import java.util.TreeMap; import java.util.TreeMap;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
/** /**
* A configuration for the replication peer cluster. * A configuration for the replication peer cluster.

View File

@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.mapreduce.replication;
import java.io.IOException; import java.io.IOException;
import java.util.Arrays; import java.util.Arrays;
import java.util.UUID; import java.util.UUID;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -45,13 +44,14 @@ import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper; import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableSplit; import org.apache.hadoop.hbase.mapreduce.TableSplit;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper; import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.FSUtils;
@ -66,6 +66,7 @@ import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
@ -333,19 +334,24 @@ public class VerifyReplication extends Configured implements Tool {
final Configuration conf, String peerId) throws IOException { final Configuration conf, String peerId) throws IOException {
ZKWatcher localZKW = null; ZKWatcher localZKW = null;
try { try {
localZKW = new ZKWatcher(conf, "VerifyReplication", localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
new Abortable() { @Override
@Override public void abort(String why, Throwable e) {} public void abort(String why, Throwable e) {
@Override public boolean isAborted() {return false;} }
@Override
public boolean isAborted() {
return false;
}
}); });
ReplicationPeerStorage storage =
ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf); ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
rp.init(); ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
return Pair.newPair(peerConfig,
return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId)); ReplicationPeers.getPeerClusterConfiguration(peerConfig, conf));
} catch (ReplicationException e) { } catch (ReplicationException e) {
throw new IOException( throw new IOException("An error occurred while trying to connect to the remove peer cluster",
"An error occurred while trying to connect to the remove peer cluster", e); e);
} finally { } finally {
if (localZKW != null) { if (localZKW != null) {
localZKW.close(); localZKW.close();

View File

@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
@ -39,20 +37,22 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationPeers { public class ReplicationPeers {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
private final Configuration conf; private final Configuration conf;
// Map of peer clusters keyed by their id // Map of peer clusters keyed by their id
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache; private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
private final ReplicationPeerStorage peerStorage; private final ReplicationPeerStorage peerStorage;
protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) { ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
this.conf = conf; this.conf = conf;
this.peerCache = new ConcurrentHashMap<>(); this.peerCache = new ConcurrentHashMap<>();
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf); this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
} }
public Configuration getConf() {
return conf;
}
public void init() throws ReplicationException { public void init() throws ReplicationException {
// Loading all existing peerIds into peer cache. // Loading all existing peerIds into peer cache.
for (String peerId : this.peerStorage.listPeerIds()) { for (String peerId : this.peerStorage.listPeerIds()) {
@ -120,22 +120,13 @@ public class ReplicationPeers {
return peerCache.keySet(); return peerCache.keySet();
} }
public ReplicationPeerConfig getPeerConfig(String peerId) { public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
ReplicationPeer replicationPeer = this.peerCache.get(peerId); Configuration baseConf) throws ReplicationException {
if (replicationPeer == null) {
throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
}
return replicationPeer.getPeerConfig();
}
public Configuration getPeerClusterConfiguration(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
Configuration otherConf; Configuration otherConf;
try { try {
otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
} catch (IOException e) { } catch (IOException e) {
throw new ReplicationException("Can't get peer configuration for peerId=" + peerId, e); throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
} }
if (!peerConfig.getConfiguration().isEmpty()) { if (!peerConfig.getConfiguration().isEmpty()) {
@ -179,8 +170,9 @@ public class ReplicationPeers {
>>>>>>> HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface >>>>>>> HBASE-19622 Reimplement ReplicationPeers with the new replication storage interface
*/ */
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException { private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId); ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
boolean enabled = peerStorage.isPeerEnabled(peerId); boolean enabled = peerStorage.isPeerEnabled(peerId);
return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), peerId, enabled, peerConf); return new ReplicationPeerImpl(getPeerClusterConfiguration(peerConfig, conf), peerId, enabled,
peerConfig);
} }
} }

View File

@ -27,7 +27,6 @@ import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.SortedSet; import java.util.SortedSet;
import java.util.TreeSet; import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
@ -50,7 +49,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
* ZK based replication queue storage. * ZK based replication queue storage.

View File

@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.replication;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream; import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
@ -238,12 +237,6 @@ public abstract class TestReplicationStateBasic {
} catch (ReplicationException e) { } catch (ReplicationException e) {
} }
try {
assertNull(rp.getPeerClusterConfiguration("bogus"));
fail("Should have thrown an ReplicationException when passed a bogus peerId");
} catch (ReplicationException e) {
}
assertNumberOfPeers(0); assertNumberOfPeers(0);
// Add some peers // Add some peers
@ -258,7 +251,8 @@ public abstract class TestReplicationStateBasic {
fail("There are no connected peers, should have thrown an IllegalArgumentException"); fail("There are no connected peers, should have thrown an IllegalArgumentException");
} catch (IllegalArgumentException e) { } catch (IllegalArgumentException e) {
} }
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE))); assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
rp.getPeerStorage().removePeer(ID_ONE); rp.getPeerStorage().removePeer(ID_ONE);
rp.removePeer(ID_ONE); rp.removePeer(ID_ONE);
assertNumberOfPeers(1); assertNumberOfPeers(1);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -74,8 +73,8 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
@Override @Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException { protected void postPeerModification(MasterProcedureEnv env) throws IOException {
LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " peer " + peerId + LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId,
", config " + peerConfig); peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig); env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
@ -62,7 +61,7 @@ public class DisablePeerProcedure extends ModifyPeerProcedure {
@Override @Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException { protected void postPeerModification(MasterProcedureEnv env) throws IOException {
LOG.info("Successfully disabled peer " + peerId); LOG.info("Successfully disabled peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
cpHost.postDisableReplicationPeer(peerId); cpHost.postDisableReplicationPeer(peerId);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
@ -62,7 +61,7 @@ public class EnablePeerProcedure extends ModifyPeerProcedure {
@Override @Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException { protected void postPeerModification(MasterProcedureEnv env) throws IOException {
LOG.info("Successfully enabled peer " + peerId); LOG.info("Successfully enabled peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
cpHost.postEnableReplicationPeer(peerId); cpHost.postEnableReplicationPeer(peerId);

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch; import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
@ -84,10 +83,13 @@ public abstract class ModifyPeerProcedure
* Called before we finish the procedure. The implementation can do some logging work, and also * Called before we finish the procedure. The implementation can do some logging work, and also
* call the coprocessor hook if any. * call the coprocessor hook if any.
* <p> * <p>
* Notice that, since we have already done the actual work, throwing exception here will not fail * Notice that, since we have already done the actual work, throwing {@code IOException} here will
* this procedure, we will just ignore it and finish the procedure as suceeded. * not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
* {@code ReplicationException} is thrown we will retry since this usually means we fails to
* update the peer storage.
*/ */
protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException; protected abstract void postPeerModification(MasterProcedureEnv env)
throws IOException, ReplicationException;
private void releaseLatch() { private void releaseLatch() {
ProcedurePrepareLatch.releaseLatch(latch, this); ProcedurePrepareLatch.releaseLatch(latch, this);
@ -101,16 +103,14 @@ public abstract class ModifyPeerProcedure
try { try {
prePeerModification(env); prePeerModification(env);
} catch (IOException e) { } catch (IOException e) {
LOG.warn( LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
getClass().getName() + " failed to call CP hook or the pre check is failed for peer " + "mark the procedure as failure and give up", getClass().getName(), peerId, e);
peerId + ", mark the procedure as failure and give up",
e);
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e); setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
releaseLatch(); releaseLatch();
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
} catch (ReplicationException e) { } catch (ReplicationException e) {
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
", retry", e); peerId, e);
throw new ProcedureYieldException(); throw new ProcedureYieldException();
} }
setNextState(PeerModificationState.UPDATE_PEER_STORAGE); setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
@ -119,8 +119,8 @@ public abstract class ModifyPeerProcedure
try { try {
updatePeerStorage(env); updatePeerStorage(env);
} catch (ReplicationException e) { } catch (ReplicationException e) {
LOG.warn( LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e); e);
throw new ProcedureYieldException(); throw new ProcedureYieldException();
} }
setNextState(PeerModificationState.REFRESH_PEER_ON_RS); setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
@ -134,9 +134,13 @@ public abstract class ModifyPeerProcedure
case POST_PEER_MODIFICATION: case POST_PEER_MODIFICATION:
try { try {
postPeerModification(env); postPeerModification(env);
} catch (ReplicationException e) {
LOG.warn("{} failed to call postPeerModification for peer {}, retry",
getClass().getName(), peerId, e);
throw new ProcedureYieldException();
} catch (IOException e) { } catch (IOException e) {
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId + LOG.warn("{} failed to call post CP hook for peer {}, " +
", ignore since the procedure has already done", e); "ignore since the procedure has already done", getClass().getName(), peerId, e);
} }
releaseLatch(); releaseLatch();
return Flow.NO_MORE_STATE; return Flow.NO_MORE_STATE;
@ -175,7 +179,7 @@ public abstract class ModifyPeerProcedure
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (state == PeerModificationState.PRE_PEER_MODIFICATION) { if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
// actually the peer related operations has no rollback, but if we haven't done any // actually the peer related operations has no rollback, but if we haven't done any
// modifications on the peer storage, we can just return. // modifications on the peer storage yet, we can just return.
return; return;
} }
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface; import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
@ -122,17 +121,15 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
private void complete(MasterProcedureEnv env, Throwable error) { private void complete(MasterProcedureEnv env, Throwable error) {
if (event == null) { if (event == null) {
LOG.warn("procedure event for " + getProcId() + LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
" is null, maybe the procedure is created when recovery", getProcId());
new Exception());
return; return;
} }
if (error != null) { if (error != null) {
LOG.warn("Refresh peer " + peerId + " for " + type + " on " + targetServer + " failed", LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error);
error);
this.succ = false; this.succ = false;
} else { } else {
LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + " suceeded"); LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer);
this.succ = true; this.succ = true;
} }
@ -168,9 +165,9 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
dispatched = false; dispatched = false;
} }
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) { if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type + LOG.info("Can not add remote operation for refreshing peer {} for {} to {}, " +
" to " + targetServer + ", this usually because the server is already dead," + "this usually because the server is already dead, " +
" give up and mark the procedure as complete"); "give up and mark the procedure as complete", peerId, type, targetServer);
return null; return null;
} }
dispatched = true; dispatched = true;

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationException;
@ -61,8 +60,10 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
} }
@Override @Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException { protected void postPeerModification(MasterProcedureEnv env)
LOG.info("Successfully removed peer " + peerId); throws IOException, ReplicationException {
env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
LOG.info("Successfully removed peer {}", peerId);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
cpHost.postRemoveReplicationPeer(peerId); cpHost.postRemoveReplicationPeer(peerId);

View File

@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.DoNotRetryIOException;
@ -217,6 +216,36 @@ public class ReplicationPeerManager {
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty(); return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
} }
private void removeAllQueues0(String peerId) throws ReplicationException {
for (ServerName replicator : queueStorage.getListOfReplicators()) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
queueStorage.removeQueue(replicator, queueId);
}
}
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
}
}
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
// on-going when the refresh peer config procedure is done, if a RS which has already been
// scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
// the scan here, and if the RS who has claimed the queue crashed before creating recovered
// source, then the queue will leave there until the another RS detects the crash and helps
// removing the queue.
// A two pass scan can solve the problem. Anyway, the queue will not disappear during the
// claiming, it will either under the old RS or under the new RS, and a queue can only be
// claimed once after the refresh peer procedure done(as the next claim queue will just delete
// it), so we can make sure that a two pass scan will finally find the queue and remove it,
// unless it has already been removed by others.
removeAllQueues0(peerId);
removeAllQueues0(peerId);
queueStorage.removePeerFromHFileRefs(peerId);
}
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException { private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
checkClusterKey(peerConfig.getClusterKey()); checkClusterKey(peerConfig.getClusterKey());

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.master.replication; package org.apache.hadoop.hbase.master.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
@ -70,7 +69,7 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
@Override @Override
protected void postPeerModification(MasterProcedureEnv env) throws IOException { protected void postPeerModification(MasterProcedureEnv env) throws IOException {
LOG.info("Successfully updated peer config of " + peerId + " to " + peerConfig); LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig);
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost(); MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
if (cpHost != null) { if (cpHost != null) {
cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig); cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);

View File

@ -28,7 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;

View File

@ -20,11 +20,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
import org.apache.hadoop.hbase.executor.EventType; import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable; import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.log4j.Logger; import org.apache.log4j.Logger;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException; import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
/** /**

View File

@ -56,7 +56,6 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener; import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeer; import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@ -296,9 +295,8 @@ public class ReplicationSourceManager implements ReplicationListener {
*/ */
@VisibleForTesting @VisibleForTesting
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException { ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
ReplicationPeer peer = replicationPeers.getPeer(id); ReplicationPeer peer = replicationPeers.getPeer(id);
ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer); ReplicationSourceInterface src = getReplicationSource(id, peer);
synchronized (this.walsById) { synchronized (this.walsById) {
this.sources.add(src); this.sources.add(src);
Map<String, SortedSet<String>> walsByGroup = new HashMap<>(); Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@ -493,8 +491,8 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param peerId the id of the peer cluster * @param peerId the id of the peer cluster
* @return the created source * @return the created source
*/ */
private ReplicationSourceInterface getReplicationSource(String peerId, private ReplicationSourceInterface getReplicationSource(String peerId, ReplicationPeer peer)
ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException { throws IOException {
RegionServerCoprocessorHost rsServerHost = null; RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null; TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) { if (server instanceof HRegionServer) {
@ -506,7 +504,7 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationEndpoint replicationEndpoint = null; ReplicationEndpoint replicationEndpoint = null;
try { try {
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl(); String replicationEndpointImpl = peer.getPeerConfig().getReplicationEndpointImpl();
if (replicationEndpointImpl == null) { if (replicationEndpointImpl == null) {
// Default to HBase inter-cluster replication endpoint // Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName(); replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
@ -514,16 +512,16 @@ public class ReplicationSourceManager implements ReplicationListener {
replicationEndpoint = Class.forName(replicationEndpointImpl) replicationEndpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).newInstance(); .asSubclass(ReplicationEndpoint.class).newInstance();
if (rsServerHost != null) { if (rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint = rsServerHost ReplicationEndpoint newReplicationEndPoint =
.postCreateReplicationEndPoint(replicationEndpoint); rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
if (newReplicationEndPoint != null) { if (newReplicationEndPoint != null) {
// Override the newly created endpoint from the hook with configured end point // Override the newly created endpoint from the hook with configured end point
replicationEndpoint = newReplicationEndPoint; replicationEndpoint = newReplicationEndPoint;
} }
} }
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Passed replication endpoint implementation throws errors" LOG.warn("Passed replication endpoint implementation throws errors" +
+ " while initializing ReplicationSource for peer: " + peerId, e); " while initializing ReplicationSource for peer: " + peerId, e);
throw new IOException(e); throw new IOException(e);
} }
@ -533,8 +531,8 @@ public class ReplicationSourceManager implements ReplicationListener {
replicationEndpoint, walFileLengthProvider, metrics); replicationEndpoint, walFileLengthProvider, metrics);
// init replication endpoint // init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), replicationEndpoint.init(new ReplicationEndpoint.Context(conf, peer.getConfiguration(), fs,
fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server)); peerId, clusterId, peer, metrics, tableDescriptors, server));
return src; return src;
} }
@ -730,16 +728,6 @@ public class ReplicationSourceManager implements ReplicationListener {
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId)); abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
continue; continue;
} }
ReplicationPeerConfig peerConfig = null;
try {
peerConfig = replicationPeers.getPeerConfig(actualPeerId);
} catch (Exception e) {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
+ ", failed to read peer config", e);
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
continue;
}
if (server instanceof ReplicationSyncUp.DummyServer if (server instanceof ReplicationSyncUp.DummyServer
&& peer.getPeerState().equals(PeerState.DISABLED)) { && peer.getPeerState().equals(PeerState.DISABLED)) {
LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip " LOG.warn("Peer {} is disbaled. ReplicationSyncUp tool will skip "
@ -761,7 +749,7 @@ public class ReplicationSourceManager implements ReplicationListener {
} }
// enqueue sources // enqueue sources
ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer); ReplicationSourceInterface src = getReplicationSource(peerId, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer // synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer // see removePeer
synchronized (oldsources) { synchronized (oldsources) {

View File

@ -15,19 +15,15 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.apache.hadoop.hbase.client.replication; package org.apache.hadoop.hbase.client.replication;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase; import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -39,6 +35,9 @@ import org.junit.BeforeClass;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@Category({ MediumTests.class, ClientTests.class }) @Category({ MediumTests.class, ClientTests.class })
public class TestReplicationAdminUsingProcedure extends TestReplicationBase { public class TestReplicationAdminUsingProcedure extends TestReplicationBase {