HBASE-19293 Support add a disabled state replication peer directly

This commit is contained in:
Guanghao Zhang 2017-11-18 08:48:40 +08:00
parent 8f806ab486
commit e1133d5201
20 changed files with 147 additions and 94 deletions

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
@ -2466,111 +2465,97 @@ public interface Admin extends Abortable, Closeable {
* Add a new replication peer for replicating data to slave cluster.
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication slave cluster
* @throws IOException
* @throws IOException if a remote or network exception occurs
*/
default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
throws IOException {
addReplicationPeer(peerId, peerConfig, true);
}
/**
* Add a new replication peer for replicating data to slave cluster.
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication slave cluster
* @param enabled peer state, true if ENABLED and false if DISABLED
* @throws IOException if a remote or network exception occurs
*/
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws IOException;
/**
* Remove a peer and stop the replication.
* @param peerId a short name that identifies the peer
* @throws IOException
* @throws IOException if a remote or network exception occurs
*/
default void removeReplicationPeer(String peerId) throws IOException {
}
void removeReplicationPeer(String peerId) throws IOException;
/**
* Restart the replication stream to the specified peer.
* @param peerId a short name that identifies the peer
* @throws IOException
* @throws IOException if a remote or network exception occurs
*/
default void enableReplicationPeer(String peerId) throws IOException {
}
void enableReplicationPeer(String peerId) throws IOException;
/**
* Stop the replication stream to the specified peer.
* @param peerId a short name that identifies the peer
* @throws IOException
* @throws IOException if a remote or network exception occurs
*/
default void disableReplicationPeer(String peerId) throws IOException {
}
void disableReplicationPeer(String peerId) throws IOException;
/**
* Returns the configured ReplicationPeerConfig for the specified peer.
* @param peerId a short name that identifies the peer
* @return ReplicationPeerConfig for the peer
* @throws IOException
* @throws IOException if a remote or network exception occurs
*/
default ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws IOException {
return new ReplicationPeerConfig();
}
ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws IOException;
/**
* Update the peerConfig for the specified peer.
* @param peerId a short name that identifies the peer
* @param peerConfig new config for the peer
* @throws IOException
* @throws IOException if a remote or network exception occurs
*/
default void updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) throws IOException {
}
void updateReplicationPeerConfig(String peerId,
ReplicationPeerConfig peerConfig) throws IOException;
/**
* Append the replicable table column family config from the specified peer.
* @param id a short that identifies the cluster
* @param tableCfs A map from tableName to column family names
* @throws ReplicationException
* @throws IOException
* @throws ReplicationException if tableCfs has conflict with existing config
* @throws IOException if a remote or network exception occurs
*/
default void appendReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
IOException {
}
void appendReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException, IOException;
/**
* Remove some table-cfs from config of the specified peer.
* @param id a short name that identifies the cluster
* @param tableCfs A map from tableName to column family names
* @throws ReplicationException
* @throws IOException
* @throws ReplicationException if tableCfs has conflict with existing config
* @throws IOException if a remote or network exception occurs
*/
default void removeReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
IOException {
}
void removeReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException, IOException;
/**
* Return a list of replication peers.
* @return a list of replication peers description
* @throws IOException
* @throws IOException if a remote or network exception occurs
*/
default List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
return new ArrayList<>();
}
/**
* Return a list of replication peers.
* @param regex The regular expression to match peer id
* @return a list of replication peers description
* @throws IOException
* @deprecated since 2.0 version and will be removed in 3.0 version. Use
* {@link #listReplicationPeers(Pattern)} instead.
*/
@Deprecated
default List<ReplicationPeerDescription> listReplicationPeers(String regex) throws IOException {
return new ArrayList<>();
}
List<ReplicationPeerDescription> listReplicationPeers() throws IOException;
/**
* Return a list of replication peers.
* @param pattern The compiled regular expression to match peer id
* @return a list of replication peers description
* @throws IOException
* @throws IOException if a remote or network exception occurs
*/
default List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException {
return new ArrayList<>();
}
List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException;
/**
* Mark region server(s) as decommissioned to prevent additional regions from getting

View File

@ -499,8 +499,19 @@ public interface AsyncAdmin {
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication slave cluster
*/
default CompletableFuture<Void> addReplicationPeer(String peerId,
ReplicationPeerConfig peerConfig) {
return addReplicationPeer(peerId, peerConfig, true);
}
/**
* Add a new replication peer for replicating data to slave cluster
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication slave cluster
* @param enabled peer state, true if ENABLED and false if DISABLED
*/
CompletableFuture<Void> addReplicationPeer(String peerId,
ReplicationPeerConfig peerConfig);
ReplicationPeerConfig peerConfig, boolean enabled);
/**
* Remove a peer and stop the replication

View File

@ -375,9 +375,9 @@ class AsyncHBaseAdmin implements AsyncAdmin {
}
@Override
public CompletableFuture<Void>
addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) {
return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig));
public CompletableFuture<Void> addReplicationPeer(String peerId,
ReplicationPeerConfig peerConfig, boolean enabled) {
return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig, enabled));
}
@Override

View File

@ -103,7 +103,6 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
@ -3850,13 +3849,13 @@ public class HBaseAdmin implements Admin {
}
@Override
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws IOException {
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
@Override
protected Void rpcCall() throws Exception {
master.addReplicationPeer(getRpcController(),
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig));
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
return null;
}
});
@ -3953,11 +3952,6 @@ public class HBaseAdmin implements Admin {
return listReplicationPeers((Pattern)null);
}
@Override
public List<ReplicationPeerDescription> listReplicationPeers(String regex) throws IOException {
return listReplicationPeers(Pattern.compile(regex));
}
@Override
public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
throws IOException {

View File

@ -1501,14 +1501,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> addReplicationPeer(String peerId,
ReplicationPeerConfig peerConfig) {
ReplicationPeerConfig peerConfig, boolean enabled) {
return this
.<Void> newMasterCaller()
.action(
(controller, stub) -> this
.<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s,
c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
}
@Override

View File

@ -1618,10 +1618,15 @@ public final class RequestConverter {
}
public static ReplicationProtos.AddReplicationPeerRequest buildAddReplicationPeerRequest(
String peerId, ReplicationPeerConfig peerConfig) {
String peerId, ReplicationPeerConfig peerConfig, boolean enabled) {
AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder();
builder.setPeerId(peerId);
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
ReplicationProtos.ReplicationState.Builder stateBuilder =
ReplicationProtos.ReplicationState.newBuilder();
stateBuilder.setState(enabled ? ReplicationProtos.ReplicationState.State.ENABLED
: ReplicationProtos.ReplicationState.State.DISABLED);
builder.setPeerState(stateBuilder.build());
return builder.build();
}

View File

@ -77,6 +77,7 @@ message ReplicationHLogPosition {
message AddReplicationPeerRequest {
required string peer_id = 1;
required ReplicationPeer peer_config = 2;
required ReplicationState peer_state = 3;
}
message AddReplicationPeerResponse {

View File

@ -51,7 +51,18 @@ public interface ReplicationPeers {
* @param peerId a short that identifies the cluster
* @param peerConfig configuration for the replication slave cluster
*/
void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
default void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
throws ReplicationException {
registerPeer(peerId, peerConfig, true);
}
/**
* Add a new remote slave cluster for replication.
* @param peerId a short that identifies the cluster
* @param peerConfig configuration for the replication slave cluster
* @param enabled peer state, true if ENABLED and false if DISABLED
*/
void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException;
/**

View File

@ -105,7 +105,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
@Override
public void registerPeer(String id, ReplicationPeerConfig peerConfig)
public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
try {
if (peerExists(id)) {
@ -130,19 +130,18 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
List<ZKUtilOp> listOfOps = new ArrayList<>(2);
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
ReplicationPeerConfigUtil.toByteArray(peerConfig));
// b/w PeerWatcher and ReplicationZookeeper#add method to create the
// peer-state znode. This happens while adding a peer
// The peer state data is set as "ENABLED" by default.
ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
ZKUtilOp op1 =
ZKUtilOp.createAndFailSilent(getPeerNode(id),
ReplicationPeerConfigUtil.toByteArray(peerConfig));
ZKUtilOp op2 =
ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES
: DISABLED_ZNODE_BYTES);
listOfOps.add(op1);
listOfOps.add(op2);
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
// A peer is enabled by default
} catch (KeeperException e) {
throw new ReplicationException("Could not add peer with id=" + id
+ ", peerConfif=>" + peerConfig, e);
throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>"
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
}
}

View File

@ -3322,14 +3322,14 @@ public class HMaster extends HRegionServer implements MasterServices {
}
@Override
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException, IOException {
if (cpHost != null) {
cpHost.preAddReplicationPeer(peerId, peerConfig);
}
LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config="
+ peerConfig);
this.replicationManager.addReplicationPeer(peerId, peerConfig);
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
this.replicationManager.addReplicationPeer(peerId, peerConfig, enabled);
if (cpHost != null) {
cpHost.postAddReplicationPeer(peerId, peerConfig);
}

View File

@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
@ -267,6 +266,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
@ -1809,7 +1809,8 @@ public class MasterRpcServices extends RSRpcServices
AddReplicationPeerRequest request) throws ServiceException {
try {
master.addReplicationPeer(request.getPeerId(),
ReplicationPeerConfigUtil.convert(request.getPeerConfig()));
ReplicationPeerConfigUtil.convert(request.getPeerConfig()), request.getPeerState()
.getState().equals(ReplicationState.State.ENABLED));
return AddReplicationPeerResponse.newBuilder().build();
} catch (ReplicationException | IOException e) {
throw new ServiceException(e);

View File

@ -428,8 +428,9 @@ public interface MasterServices extends Server {
* Add a new replication peer for replicating data to slave cluster
* @param peerId a short name that identifies the peer
* @param peerConfig configuration for the replication slave cluster
* @param enabled peer state, true if ENABLED and false if DISABLED
*/
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException, IOException;
/**

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Manages and performs all replication admin operations.
@ -69,12 +69,12 @@ public class ReplicationManager {
}
}
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException, IOException {
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
peerConfig.getTableCFsMap());
checkConfiguredWALEntryFilters(peerConfig);
replicationPeers.registerPeer(peerId, peerConfig);
replicationPeers.registerPeer(peerId, peerConfig, enabled);
replicationPeers.peerConnected(peerId);
}

View File

@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -141,6 +142,21 @@ public class TestReplicationAdmin {
assertEquals(0, admin.getPeersCount());
}
@Test
public void testAddPeerWithState() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE);
hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true);
assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled());
hbaseAdmin.removeReplicationPeer(ID_ONE);
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(KEY_SECOND);
hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false);
assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled());
hbaseAdmin.removeReplicationPeer(ID_SECOND);
}
/**
* Tests that the peer configuration used by ReplicationAdmin contains all
* the peer's properties.

View File

@ -393,7 +393,7 @@ public class MockNoopMasterServices implements MasterServices, Server {
}
@Override
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
}
@ -460,4 +460,4 @@ public class MockNoopMasterServices implements MasterServices, Server {
public FileSystem getFileSystem() {
return null;
}
}
}

View File

@ -65,6 +65,7 @@ module Hbase
data = args.fetch(DATA, nil)
table_cfs = args.fetch(TABLE_CFS, nil)
namespaces = args.fetch(NAMESPACES, nil)
peer_state = args.fetch(STATE, nil)
# Create and populate a ReplicationPeerConfig
replication_peer_config = ReplicationPeerConfig.new
@ -102,7 +103,12 @@ module Hbase
end
replication_peer_config.set_table_cfs_map(map)
end
@admin.addReplicationPeer(id, replication_peer_config)
enabled = true
unless peer_state.nil?
enabled = false if peer_state == 'DISABLED'
end
@admin.addReplicationPeer(id, replication_peer_config, enabled)
else
raise(ArgumentError, 'args must be a Hash')
end

View File

@ -79,6 +79,7 @@ module HBaseConstants
CLUSTER_KEY = 'CLUSTER_KEY'.freeze
TABLE_CFS = 'TABLE_CFS'.freeze
NAMESPACES = 'NAMESPACES'.freeze
STATE = 'STATE'.freeze
CONFIG = 'CONFIG'.freeze
DATA = 'DATA'.freeze
SERVER_NAME = 'SERVER_NAME'.freeze

View File

@ -28,6 +28,8 @@ must be specified to identify the peer.
For a HBase cluster peer, a cluster key must be provided and is composed like this:
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
This gives a full path for HBase to connect to another HBase cluster.
An optional parameter for state identifies the replication peer's state is enabled or disabled.
And the default state is enabled.
An optional parameter for namespaces identifies which namespace's tables will be replicated
to the peer cluster.
An optional parameter for table column families identifies which tables and/or column families
@ -40,6 +42,8 @@ then you can't set this namespace's tables in the peer config again.
Examples:
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "ENABLED"
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "DISABLED"
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",

View File

@ -93,7 +93,7 @@ module Hbase
command(:remove_peer, @peer_id)
end
define_test "add_peer: single zk cluster key - peer config" do
define_test "add_peer: single zk cluster key with enabled/disabled state" do
cluster_key = "server1.cie.com:2181:/hbase"
args = { CLUSTER_KEY => cluster_key }
@ -101,9 +101,26 @@ module Hbase
assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
assert_equal(true, command(:list_peers).get(0).isEnabled)
command(:remove_peer, @peer_id)
enable_args = { CLUSTER_KEY => cluster_key, STATE => 'ENABLED' }
command(:add_peer, @peer_id, enable_args)
assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(true, command(:list_peers).get(0).isEnabled)
command(:remove_peer, @peer_id)
disable_args = { CLUSTER_KEY => cluster_key, STATE => 'DISABLED' }
command(:add_peer, @peer_id, disable_args)
assert_equal(1, command(:list_peers).length)
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
assert_equal(false, command(:list_peers).get(0).isEnabled)
# cleanup for future tests
command(:remove_peer, @peer_id)
end

View File

@ -1415,6 +1415,7 @@ add_peer <ID> <CLUSTER_KEY>::
Adds a replication relationship between two clusters. +
* ID -- a unique string, which must not contain a hyphen.
* CLUSTER_KEY: composed using the following template, with appropriate place-holders: `hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent`
* STATE(optional): ENABLED or DISABLED, default value is ENABLED
list_peers:: list all replication relationships known by this cluster
enable_peer <ID>::
Enable a previously-disabled replication relationship