HBASE-19633 Clean up the replication queues in the postPeerModification stage when removing a peer
This commit is contained in:
parent
f89920a60f
commit
19707a85dd
|
@ -27,8 +27,8 @@ import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A configuration for the replication peer cluster.
|
* A configuration for the replication peer cluster.
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.mapreduce.replication;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.conf.Configured;
|
import org.apache.hadoop.conf.Configured;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
|
@ -45,13 +44,14 @@ import org.apache.hadoop.hbase.filter.PrefixFilter;
|
||||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
|
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
import org.apache.hadoop.hbase.mapreduce.TableMapper;
|
||||||
|
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
|
||||||
import org.apache.hadoop.hbase.mapreduce.TableSplit;
|
import org.apache.hadoop.hbase.mapreduce.TableSplit;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||||
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
|
@ -66,6 +66,7 @@ import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -333,19 +334,24 @@ public class VerifyReplication extends Configured implements Tool {
|
||||||
final Configuration conf, String peerId) throws IOException {
|
final Configuration conf, String peerId) throws IOException {
|
||||||
ZKWatcher localZKW = null;
|
ZKWatcher localZKW = null;
|
||||||
try {
|
try {
|
||||||
localZKW = new ZKWatcher(conf, "VerifyReplication",
|
localZKW = new ZKWatcher(conf, "VerifyReplication", new Abortable() {
|
||||||
new Abortable() {
|
@Override
|
||||||
@Override public void abort(String why, Throwable e) {}
|
public void abort(String why, Throwable e) {
|
||||||
@Override public boolean isAborted() {return false;}
|
}
|
||||||
});
|
|
||||||
|
|
||||||
ReplicationPeers rp = ReplicationFactory.getReplicationPeers(localZKW, conf);
|
@Override
|
||||||
rp.init();
|
public boolean isAborted() {
|
||||||
|
return false;
|
||||||
return Pair.newPair(rp.getPeerConfig(peerId), rp.getPeerClusterConfiguration(peerId));
|
}
|
||||||
|
});
|
||||||
|
ReplicationPeerStorage storage =
|
||||||
|
ReplicationStorageFactory.getReplicationPeerStorage(localZKW, conf);
|
||||||
|
ReplicationPeerConfig peerConfig = storage.getPeerConfig(peerId);
|
||||||
|
return Pair.newPair(peerConfig,
|
||||||
|
ReplicationPeers.getPeerClusterConfiguration(peerConfig, conf));
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
throw new IOException(
|
throw new IOException("An error occurred while trying to connect to the remove peer cluster",
|
||||||
"An error occurred while trying to connect to the remove peer cluster", e);
|
e);
|
||||||
} finally {
|
} finally {
|
||||||
if (localZKW != null) {
|
if (localZKW != null) {
|
||||||
localZKW.close();
|
localZKW.close();
|
||||||
|
|
|
@ -27,8 +27,6 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
|
@ -39,20 +37,22 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class ReplicationPeers {
|
public class ReplicationPeers {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeers.class);
|
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
|
|
||||||
// Map of peer clusters keyed by their id
|
// Map of peer clusters keyed by their id
|
||||||
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
|
private final ConcurrentMap<String, ReplicationPeerImpl> peerCache;
|
||||||
private final ReplicationPeerStorage peerStorage;
|
private final ReplicationPeerStorage peerStorage;
|
||||||
|
|
||||||
protected ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
|
ReplicationPeers(ZKWatcher zookeeper, Configuration conf) {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.peerCache = new ConcurrentHashMap<>();
|
this.peerCache = new ConcurrentHashMap<>();
|
||||||
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
|
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Configuration getConf() {
|
||||||
|
return conf;
|
||||||
|
}
|
||||||
|
|
||||||
public void init() throws ReplicationException {
|
public void init() throws ReplicationException {
|
||||||
// Loading all existing peerIds into peer cache.
|
// Loading all existing peerIds into peer cache.
|
||||||
for (String peerId : this.peerStorage.listPeerIds()) {
|
for (String peerId : this.peerStorage.listPeerIds()) {
|
||||||
|
@ -120,22 +120,13 @@ public class ReplicationPeers {
|
||||||
return peerCache.keySet();
|
return peerCache.keySet();
|
||||||
}
|
}
|
||||||
|
|
||||||
public ReplicationPeerConfig getPeerConfig(String peerId) {
|
public static Configuration getPeerClusterConfiguration(ReplicationPeerConfig peerConfig,
|
||||||
ReplicationPeer replicationPeer = this.peerCache.get(peerId);
|
Configuration baseConf) throws ReplicationException {
|
||||||
if (replicationPeer == null) {
|
|
||||||
throw new IllegalArgumentException("Peer with id= " + peerId + " is not cached");
|
|
||||||
}
|
|
||||||
return replicationPeer.getPeerConfig();
|
|
||||||
}
|
|
||||||
|
|
||||||
public Configuration getPeerClusterConfiguration(String peerId) throws ReplicationException {
|
|
||||||
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
|
||||||
|
|
||||||
Configuration otherConf;
|
Configuration otherConf;
|
||||||
try {
|
try {
|
||||||
otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
|
otherConf = HBaseConfiguration.createClusterConf(baseConf, peerConfig.getClusterKey());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ReplicationException("Can't get peer configuration for peerId=" + peerId, e);
|
throw new ReplicationException("Can't get peer configuration for peer " + peerConfig, e);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!peerConfig.getConfiguration().isEmpty()) {
|
if (!peerConfig.getConfiguration().isEmpty()) {
|
||||||
|
@ -172,8 +163,9 @@ public class ReplicationPeers {
|
||||||
* @return object representing the peer
|
* @return object representing the peer
|
||||||
*/
|
*/
|
||||||
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
|
private ReplicationPeerImpl createPeer(String peerId) throws ReplicationException {
|
||||||
ReplicationPeerConfig peerConf = peerStorage.getPeerConfig(peerId);
|
ReplicationPeerConfig peerConfig = peerStorage.getPeerConfig(peerId);
|
||||||
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
boolean enabled = peerStorage.isPeerEnabled(peerId);
|
||||||
return new ReplicationPeerImpl(getPeerClusterConfiguration(peerId), peerId, enabled, peerConf);
|
return new ReplicationPeerImpl(getPeerClusterConfiguration(peerConfig, conf), peerId, enabled,
|
||||||
|
peerConfig);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,7 +27,6 @@ import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
@ -50,7 +49,7 @@ import org.apache.zookeeper.data.Stat;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ZK based replication queue storage.
|
* ZK based replication queue storage.
|
||||||
|
|
|
@ -19,13 +19,13 @@ package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
import java.io.ByteArrayOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedOutputStream;
|
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.replication;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
@ -238,12 +237,6 @@ public abstract class TestReplicationStateBasic {
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
assertNull(rp.getPeerClusterConfiguration("bogus"));
|
|
||||||
fail("Should have thrown an ReplicationException when passed a bogus peerId");
|
|
||||||
} catch (ReplicationException e) {
|
|
||||||
}
|
|
||||||
|
|
||||||
assertNumberOfPeers(0);
|
assertNumberOfPeers(0);
|
||||||
|
|
||||||
// Add some peers
|
// Add some peers
|
||||||
|
@ -258,7 +251,8 @@ public abstract class TestReplicationStateBasic {
|
||||||
fail("There are no connected peers, should have thrown an IllegalArgumentException");
|
fail("There are no connected peers, should have thrown an IllegalArgumentException");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
}
|
}
|
||||||
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerClusterConfiguration(ID_ONE)));
|
assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationPeers
|
||||||
|
.getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
|
||||||
rp.getPeerStorage().removePeer(ID_ONE);
|
rp.getPeerStorage().removePeer(ID_ONE);
|
||||||
rp.removePeer(ID_ONE);
|
rp.removePeer(ID_ONE);
|
||||||
assertNumberOfPeers(1);
|
assertNumberOfPeers(1);
|
||||||
|
|
|
@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.master;
|
||||||
|
|
||||||
import com.google.protobuf.Descriptors;
|
import com.google.protobuf.Descriptors;
|
||||||
import com.google.protobuf.Service;
|
import com.google.protobuf.Service;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.lang.reflect.Constructor;
|
import java.lang.reflect.Constructor;
|
||||||
|
@ -51,12 +50,10 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
import javax.servlet.http.HttpServlet;
|
import javax.servlet.http.HttpServlet;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.ClusterMetrics;
|
import org.apache.hadoop.hbase.ClusterMetrics;
|
||||||
|
@ -207,6 +204,7 @@ import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionServerInfo;
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.master.replication;
|
package org.apache.hadoop.hbase.master.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
@ -74,8 +73,8 @@ public class AddPeerProcedure extends ModifyPeerProcedure {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
||||||
LOG.info("Successfully added " + (enabled ? "ENABLED" : "DISABLED") + " peer " + peerId +
|
LOG.info("Successfully added {} peer {}, config {}", enabled ? "ENABLED" : "DISABLED", peerId,
|
||||||
", config " + peerConfig);
|
peerConfig);
|
||||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig);
|
env.getMasterCoprocessorHost().postAddReplicationPeer(peerId, peerConfig);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.master.replication;
|
package org.apache.hadoop.hbase.master.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
|
@ -62,7 +61,7 @@ public class DisablePeerProcedure extends ModifyPeerProcedure {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
||||||
LOG.info("Successfully disabled peer " + peerId);
|
LOG.info("Successfully disabled peer {}", peerId);
|
||||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.postDisableReplicationPeer(peerId);
|
cpHost.postDisableReplicationPeer(peerId);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.master.replication;
|
package org.apache.hadoop.hbase.master.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
|
@ -62,7 +61,7 @@ public class EnablePeerProcedure extends ModifyPeerProcedure {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
||||||
LOG.info("Successfully enabled peer " + peerId);
|
LOG.info("Successfully enabled peer {}", peerId);
|
||||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.postEnableReplicationPeer(peerId);
|
cpHost.postEnableReplicationPeer(peerId);
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.master.replication;
|
package org.apache.hadoop.hbase.master.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
||||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||||
|
@ -84,10 +83,13 @@ public abstract class ModifyPeerProcedure
|
||||||
* Called before we finish the procedure. The implementation can do some logging work, and also
|
* Called before we finish the procedure. The implementation can do some logging work, and also
|
||||||
* call the coprocessor hook if any.
|
* call the coprocessor hook if any.
|
||||||
* <p>
|
* <p>
|
||||||
* Notice that, since we have already done the actual work, throwing exception here will not fail
|
* Notice that, since we have already done the actual work, throwing {@code IOException} here will
|
||||||
* this procedure, we will just ignore it and finish the procedure as suceeded.
|
* not fail this procedure, we will just ignore it and finish the procedure as suceeded. If
|
||||||
|
* {@code ReplicationException} is thrown we will retry since this usually means we fails to
|
||||||
|
* update the peer storage.
|
||||||
*/
|
*/
|
||||||
protected abstract void postPeerModification(MasterProcedureEnv env) throws IOException;
|
protected abstract void postPeerModification(MasterProcedureEnv env)
|
||||||
|
throws IOException, ReplicationException;
|
||||||
|
|
||||||
private void releaseLatch() {
|
private void releaseLatch() {
|
||||||
ProcedurePrepareLatch.releaseLatch(latch, this);
|
ProcedurePrepareLatch.releaseLatch(latch, this);
|
||||||
|
@ -101,16 +103,14 @@ public abstract class ModifyPeerProcedure
|
||||||
try {
|
try {
|
||||||
prePeerModification(env);
|
prePeerModification(env);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(
|
LOG.warn("{} failed to call pre CP hook or the pre check is failed for peer {}, " +
|
||||||
getClass().getName() + " failed to call CP hook or the pre check is failed for peer " +
|
"mark the procedure as failure and give up", getClass().getName(), peerId, e);
|
||||||
peerId + ", mark the procedure as failure and give up",
|
|
||||||
e);
|
|
||||||
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
|
setFailure("master-" + getPeerOperationType().name().toLowerCase() + "-peer", e);
|
||||||
releaseLatch();
|
releaseLatch();
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
|
LOG.warn("{} failed to call prePeerModification for peer {}, retry", getClass().getName(),
|
||||||
", retry", e);
|
peerId, e);
|
||||||
throw new ProcedureYieldException();
|
throw new ProcedureYieldException();
|
||||||
}
|
}
|
||||||
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
|
setNextState(PeerModificationState.UPDATE_PEER_STORAGE);
|
||||||
|
@ -119,8 +119,8 @@ public abstract class ModifyPeerProcedure
|
||||||
try {
|
try {
|
||||||
updatePeerStorage(env);
|
updatePeerStorage(env);
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
LOG.warn(
|
LOG.warn("{} update peer storage for peer {} failed, retry", getClass().getName(), peerId,
|
||||||
getClass().getName() + " update peer storage for peer " + peerId + " failed, retry", e);
|
e);
|
||||||
throw new ProcedureYieldException();
|
throw new ProcedureYieldException();
|
||||||
}
|
}
|
||||||
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
|
setNextState(PeerModificationState.REFRESH_PEER_ON_RS);
|
||||||
|
@ -134,9 +134,13 @@ public abstract class ModifyPeerProcedure
|
||||||
case POST_PEER_MODIFICATION:
|
case POST_PEER_MODIFICATION:
|
||||||
try {
|
try {
|
||||||
postPeerModification(env);
|
postPeerModification(env);
|
||||||
|
} catch (ReplicationException e) {
|
||||||
|
LOG.warn("{} failed to call postPeerModification for peer {}, retry",
|
||||||
|
getClass().getName(), peerId, e);
|
||||||
|
throw new ProcedureYieldException();
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn(getClass().getName() + " failed to call prePeerModification for peer " + peerId +
|
LOG.warn("{} failed to call post CP hook for peer {}, " +
|
||||||
", ignore since the procedure has already done", e);
|
"ignore since the procedure has already done", getClass().getName(), peerId, e);
|
||||||
}
|
}
|
||||||
releaseLatch();
|
releaseLatch();
|
||||||
return Flow.NO_MORE_STATE;
|
return Flow.NO_MORE_STATE;
|
||||||
|
@ -175,7 +179,7 @@ public abstract class ModifyPeerProcedure
|
||||||
throws IOException, InterruptedException {
|
throws IOException, InterruptedException {
|
||||||
if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
|
if (state == PeerModificationState.PRE_PEER_MODIFICATION) {
|
||||||
// actually the peer related operations has no rollback, but if we haven't done any
|
// actually the peer related operations has no rollback, but if we haven't done any
|
||||||
// modifications on the peer storage, we can just return.
|
// modifications on the peer storage yet, we can just return.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
throw new UnsupportedOperationException();
|
throw new UnsupportedOperationException();
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.master.replication;
|
package org.apache.hadoop.hbase.master.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
import org.apache.hadoop.hbase.master.procedure.PeerProcedureInterface;
|
||||||
|
@ -122,17 +121,15 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
|
||||||
|
|
||||||
private void complete(MasterProcedureEnv env, Throwable error) {
|
private void complete(MasterProcedureEnv env, Throwable error) {
|
||||||
if (event == null) {
|
if (event == null) {
|
||||||
LOG.warn("procedure event for " + getProcId() +
|
LOG.warn("procedure event for {} is null, maybe the procedure is created when recovery",
|
||||||
" is null, maybe the procedure is created when recovery",
|
getProcId());
|
||||||
new Exception());
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (error != null) {
|
if (error != null) {
|
||||||
LOG.warn("Refresh peer " + peerId + " for " + type + " on " + targetServer + " failed",
|
LOG.warn("Refresh peer {} for {} on {} failed", peerId, type, targetServer, error);
|
||||||
error);
|
|
||||||
this.succ = false;
|
this.succ = false;
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Refresh peer " + peerId + " for " + type + " on " + targetServer + " suceeded");
|
LOG.info("Refresh peer {} for {} on {} suceeded", peerId, type, targetServer);
|
||||||
this.succ = true;
|
this.succ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -168,9 +165,9 @@ public class RefreshPeerProcedure extends Procedure<MasterProcedureEnv>
|
||||||
dispatched = false;
|
dispatched = false;
|
||||||
}
|
}
|
||||||
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
if (!env.getRemoteDispatcher().addOperationToNode(targetServer, this)) {
|
||||||
LOG.info("Can not add remote operation for refreshing peer " + peerId + " for " + type +
|
LOG.info("Can not add remote operation for refreshing peer {} for {} to {}, " +
|
||||||
" to " + targetServer + ", this usually because the server is already dead," +
|
"this usually because the server is already dead, " +
|
||||||
" give up and mark the procedure as complete");
|
"give up and mark the procedure as complete", peerId, type, targetServer);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
dispatched = true;
|
dispatched = true;
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.master.replication;
|
package org.apache.hadoop.hbase.master.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
|
@ -61,8 +60,10 @@ public class RemovePeerProcedure extends ModifyPeerProcedure {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
protected void postPeerModification(MasterProcedureEnv env)
|
||||||
LOG.info("Successfully removed peer " + peerId);
|
throws IOException, ReplicationException {
|
||||||
|
env.getReplicationPeerManager().removeAllQueuesAndHFileRefs(peerId);
|
||||||
|
LOG.info("Successfully removed peer {}", peerId);
|
||||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.postRemoveReplicationPeer(peerId);
|
cpHost.postRemoveReplicationPeer(peerId);
|
||||||
|
|
|
@ -28,7 +28,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
|
@ -217,6 +216,36 @@ public class ReplicationPeerManager {
|
||||||
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
|
return desc != null ? Optional.of(desc.getPeerConfig()) : Optional.empty();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void removeAllQueues0(String peerId) throws ReplicationException {
|
||||||
|
for (ServerName replicator : queueStorage.getListOfReplicators()) {
|
||||||
|
List<String> queueIds = queueStorage.getAllQueues(replicator);
|
||||||
|
for (String queueId : queueIds) {
|
||||||
|
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
|
||||||
|
if (queueInfo.getPeerId().equals(peerId)) {
|
||||||
|
queueStorage.removeQueue(replicator, queueId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
queueStorage.removeReplicatorIfQueueIsEmpty(replicator);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void removeAllQueuesAndHFileRefs(String peerId) throws ReplicationException {
|
||||||
|
// Here we need two passes to address the problem of claimQueue. Maybe a claimQueue is still
|
||||||
|
// on-going when the refresh peer config procedure is done, if a RS which has already been
|
||||||
|
// scanned claims the queue of a RS which has not been scanned yet, we will miss that queue in
|
||||||
|
// the scan here, and if the RS who has claimed the queue crashed before creating recovered
|
||||||
|
// source, then the queue will leave there until the another RS detects the crash and helps
|
||||||
|
// removing the queue.
|
||||||
|
// A two pass scan can solve the problem. Anyway, the queue will not disappear during the
|
||||||
|
// claiming, it will either under the old RS or under the new RS, and a queue can only be
|
||||||
|
// claimed once after the refresh peer procedure done(as the next claim queue will just delete
|
||||||
|
// it), so we can make sure that a two pass scan will finally find the queue and remove it,
|
||||||
|
// unless it has already been removed by others.
|
||||||
|
removeAllQueues0(peerId);
|
||||||
|
removeAllQueues0(peerId);
|
||||||
|
queueStorage.removePeerFromHFileRefs(peerId);
|
||||||
|
}
|
||||||
|
|
||||||
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
|
private void checkPeerConfig(ReplicationPeerConfig peerConfig) throws DoNotRetryIOException {
|
||||||
checkClusterKey(peerConfig.getClusterKey());
|
checkClusterKey(peerConfig.getClusterKey());
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,6 @@
|
||||||
package org.apache.hadoop.hbase.master.replication;
|
package org.apache.hadoop.hbase.master.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
@ -70,7 +69,7 @@ public class UpdatePeerConfigProcedure extends ModifyPeerProcedure {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
protected void postPeerModification(MasterProcedureEnv env) throws IOException {
|
||||||
LOG.info("Successfully updated peer config of " + peerId + " to " + peerConfig);
|
LOG.info("Successfully updated peer config of {} to {}", peerId, peerConfig);
|
||||||
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);
|
cpHost.postUpdateReplicationPeerConfig(peerId, peerConfig);
|
||||||
|
|
|
@ -177,6 +177,15 @@ import org.slf4j.LoggerFactory;
|
||||||
import sun.misc.Signal;
|
import sun.misc.Signal;
|
||||||
import sun.misc.SignalHandler;
|
import sun.misc.SignalHandler;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
|
||||||
|
@ -207,15 +216,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProto
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
* HRegionServer makes a set of HRegions available to clients. It checks in with
|
||||||
* the HMaster. There are many HRegionServers in a single HBase deployment.
|
* the HMaster. There are many HRegionServers in a single HBase deployment.
|
||||||
|
|
|
@ -28,7 +28,8 @@ import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.TextFormat;
|
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RemoteProcedureResult;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportProcedureDoneRequest;
|
||||||
|
|
||||||
|
|
|
@ -20,11 +20,12 @@ package org.apache.hadoop.hbase.replication.regionserver;
|
||||||
import org.apache.hadoop.hbase.executor.EventType;
|
import org.apache.hadoop.hbase.executor.EventType;
|
||||||
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
|
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
|
||||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
|
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
|
import org.apache.hbase.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.PeerModificationType;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.RefreshPeerParameter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -59,7 +59,6 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationListener;
|
import org.apache.hadoop.hbase.replication.ReplicationListener;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||||
|
@ -306,9 +305,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
|
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
|
||||||
ReplicationPeerConfig peerConfig = replicationPeers.getPeerConfig(id);
|
|
||||||
ReplicationPeer peer = replicationPeers.getPeer(id);
|
ReplicationPeer peer = replicationPeers.getPeer(id);
|
||||||
ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
|
ReplicationSourceInterface src = getReplicationSource(id, peer);
|
||||||
synchronized (this.walsById) {
|
synchronized (this.walsById) {
|
||||||
this.sources.add(src);
|
this.sources.add(src);
|
||||||
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
|
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
|
||||||
|
@ -499,8 +497,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
* @param peerId the id of the peer cluster
|
* @param peerId the id of the peer cluster
|
||||||
* @return the created source
|
* @return the created source
|
||||||
*/
|
*/
|
||||||
private ReplicationSourceInterface getReplicationSource(String peerId,
|
private ReplicationSourceInterface getReplicationSource(String peerId, ReplicationPeer peer)
|
||||||
ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException {
|
throws IOException {
|
||||||
RegionServerCoprocessorHost rsServerHost = null;
|
RegionServerCoprocessorHost rsServerHost = null;
|
||||||
TableDescriptors tableDescriptors = null;
|
TableDescriptors tableDescriptors = null;
|
||||||
if (server instanceof HRegionServer) {
|
if (server instanceof HRegionServer) {
|
||||||
|
@ -512,24 +510,24 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
|
|
||||||
ReplicationEndpoint replicationEndpoint = null;
|
ReplicationEndpoint replicationEndpoint = null;
|
||||||
try {
|
try {
|
||||||
String replicationEndpointImpl = peerConfig.getReplicationEndpointImpl();
|
String replicationEndpointImpl = peer.getPeerConfig().getReplicationEndpointImpl();
|
||||||
if (replicationEndpointImpl == null) {
|
if (replicationEndpointImpl == null) {
|
||||||
// Default to HBase inter-cluster replication endpoint
|
// Default to HBase inter-cluster replication endpoint
|
||||||
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
|
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
|
||||||
}
|
}
|
||||||
replicationEndpoint = Class.forName(replicationEndpointImpl)
|
replicationEndpoint = Class.forName(replicationEndpointImpl)
|
||||||
.asSubclass(ReplicationEndpoint.class).newInstance();
|
.asSubclass(ReplicationEndpoint.class).newInstance();
|
||||||
if(rsServerHost != null) {
|
if (rsServerHost != null) {
|
||||||
ReplicationEndpoint newReplicationEndPoint = rsServerHost
|
ReplicationEndpoint newReplicationEndPoint =
|
||||||
.postCreateReplicationEndPoint(replicationEndpoint);
|
rsServerHost.postCreateReplicationEndPoint(replicationEndpoint);
|
||||||
if(newReplicationEndPoint != null) {
|
if (newReplicationEndPoint != null) {
|
||||||
// Override the newly created endpoint from the hook with configured end point
|
// Override the newly created endpoint from the hook with configured end point
|
||||||
replicationEndpoint = newReplicationEndPoint;
|
replicationEndpoint = newReplicationEndPoint;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.warn("Passed replication endpoint implementation throws errors"
|
LOG.warn("Passed replication endpoint implementation throws errors" +
|
||||||
+ " while initializing ReplicationSource for peer: " + peerId, e);
|
" while initializing ReplicationSource for peer: " + peerId, e);
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -539,8 +537,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
replicationEndpoint, walFileLengthProvider, metrics);
|
replicationEndpoint, walFileLengthProvider, metrics);
|
||||||
|
|
||||||
// init replication endpoint
|
// init replication endpoint
|
||||||
replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(),
|
replicationEndpoint.init(new ReplicationEndpoint.Context(conf, peer.getConfiguration(), fs,
|
||||||
fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
|
peerId, clusterId, peer, metrics, tableDescriptors, server));
|
||||||
|
|
||||||
return src;
|
return src;
|
||||||
}
|
}
|
||||||
|
@ -736,17 +734,6 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
|
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
ReplicationPeerConfig peerConfig = null;
|
|
||||||
try {
|
|
||||||
peerConfig = replicationPeers.getPeerConfig(actualPeerId);
|
|
||||||
} catch (Exception e) {
|
|
||||||
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS
|
|
||||||
+ ", failed to read peer config", e);
|
|
||||||
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// track sources in walsByIdRecoveredQueues
|
// track sources in walsByIdRecoveredQueues
|
||||||
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
|
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
|
||||||
walsByIdRecoveredQueues.put(peerId, walsByGroup);
|
walsByIdRecoveredQueues.put(peerId, walsByGroup);
|
||||||
|
@ -761,7 +748,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueue sources
|
// enqueue sources
|
||||||
ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer);
|
ReplicationSourceInterface src = getReplicationSource(peerId, peer);
|
||||||
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
|
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
|
||||||
// see removePeer
|
// see removePeer
|
||||||
synchronized (oldsources) {
|
synchronized (oldsources) {
|
||||||
|
|
|
@ -15,19 +15,15 @@
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.client.replication;
|
package org.apache.hadoop.hbase.client.replication;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
import org.apache.hadoop.hbase.replication.TestReplicationBase;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableList;
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.ImmutableMap;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
@ -39,6 +35,9 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
|
||||||
|
|
||||||
@Category({ MediumTests.class, ClientTests.class })
|
@Category({ MediumTests.class, ClientTests.class })
|
||||||
public class TestReplicationAdminUsingProcedure extends TestReplicationBase {
|
public class TestReplicationAdminUsingProcedure extends TestReplicationBase {
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue