HBASE-19293 Support add a disabled state replication peer directly
This commit is contained in:
parent
719a935b9d
commit
c978f8ab23
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
|
@ -2466,111 +2465,97 @@ public interface Admin extends Abortable, Closeable {
|
|||
* Add a new replication peer for replicating data to slave cluster.
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
* @throws IOException
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws IOException {
|
||||
addReplicationPeer(peerId, peerConfig, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new replication peer for replicating data to slave cluster.
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
* @param enabled peer state, true if ENABLED and false if DISABLED
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Remove a peer and stop the replication.
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @throws IOException
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default void removeReplicationPeer(String peerId) throws IOException {
|
||||
}
|
||||
void removeReplicationPeer(String peerId) throws IOException;
|
||||
|
||||
/**
|
||||
* Restart the replication stream to the specified peer.
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @throws IOException
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default void enableReplicationPeer(String peerId) throws IOException {
|
||||
}
|
||||
void enableReplicationPeer(String peerId) throws IOException;
|
||||
|
||||
/**
|
||||
* Stop the replication stream to the specified peer.
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @throws IOException
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default void disableReplicationPeer(String peerId) throws IOException {
|
||||
}
|
||||
void disableReplicationPeer(String peerId) throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the configured ReplicationPeerConfig for the specified peer.
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @return ReplicationPeerConfig for the peer
|
||||
* @throws IOException
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws IOException {
|
||||
return new ReplicationPeerConfig();
|
||||
}
|
||||
ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws IOException;
|
||||
|
||||
/**
|
||||
* Update the peerConfig for the specified peer.
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @param peerConfig new config for the peer
|
||||
* @throws IOException
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default void updateReplicationPeerConfig(String peerId,
|
||||
ReplicationPeerConfig peerConfig) throws IOException {
|
||||
}
|
||||
void updateReplicationPeerConfig(String peerId,
|
||||
ReplicationPeerConfig peerConfig) throws IOException;
|
||||
|
||||
/**
|
||||
* Append the replicable table column family config from the specified peer.
|
||||
* @param id a short that identifies the cluster
|
||||
* @param tableCfs A map from tableName to column family names
|
||||
* @throws ReplicationException
|
||||
* @throws IOException
|
||||
* @throws ReplicationException if tableCfs has conflict with existing config
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default void appendReplicationPeerTableCFs(String id,
|
||||
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
|
||||
IOException {
|
||||
}
|
||||
void appendReplicationPeerTableCFs(String id,
|
||||
Map<TableName, ? extends Collection<String>> tableCfs)
|
||||
throws ReplicationException, IOException;
|
||||
|
||||
/**
|
||||
* Remove some table-cfs from config of the specified peer.
|
||||
* @param id a short name that identifies the cluster
|
||||
* @param tableCfs A map from tableName to column family names
|
||||
* @throws ReplicationException
|
||||
* @throws IOException
|
||||
* @throws ReplicationException if tableCfs has conflict with existing config
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default void removeReplicationPeerTableCFs(String id,
|
||||
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
|
||||
IOException {
|
||||
}
|
||||
void removeReplicationPeerTableCFs(String id,
|
||||
Map<TableName, ? extends Collection<String>> tableCfs)
|
||||
throws ReplicationException, IOException;
|
||||
|
||||
/**
|
||||
* Return a list of replication peers.
|
||||
* @return a list of replication peers description
|
||||
* @throws IOException
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a list of replication peers.
|
||||
* @param regex The regular expression to match peer id
|
||||
* @return a list of replication peers description
|
||||
* @throws IOException
|
||||
* @deprecated since 2.0 version and will be removed in 3.0 version. Use
|
||||
* {@link #listReplicationPeers(Pattern)} instead.
|
||||
*/
|
||||
@Deprecated
|
||||
default List<ReplicationPeerDescription> listReplicationPeers(String regex) throws IOException {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
List<ReplicationPeerDescription> listReplicationPeers() throws IOException;
|
||||
|
||||
/**
|
||||
* Return a list of replication peers.
|
||||
* @param pattern The compiled regular expression to match peer id
|
||||
* @return a list of replication peers description
|
||||
* @throws IOException
|
||||
* @throws IOException if a remote or network exception occurs
|
||||
*/
|
||||
default List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException {
|
||||
return new ArrayList<>();
|
||||
}
|
||||
List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern) throws IOException;
|
||||
|
||||
/**
|
||||
* Mark region server(s) as decommissioned to prevent additional regions from getting
|
||||
|
|
|
@ -499,8 +499,19 @@ public interface AsyncAdmin {
|
|||
* @param peerId a short name that identifies the peer
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
*/
|
||||
default CompletableFuture<Void> addReplicationPeer(String peerId,
|
||||
ReplicationPeerConfig peerConfig) {
|
||||
return addReplicationPeer(peerId, peerConfig, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new replication peer for replicating data to slave cluster
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
* @param enabled peer state, true if ENABLED and false if DISABLED
|
||||
*/
|
||||
CompletableFuture<Void> addReplicationPeer(String peerId,
|
||||
ReplicationPeerConfig peerConfig);
|
||||
ReplicationPeerConfig peerConfig, boolean enabled);
|
||||
|
||||
/**
|
||||
* Remove a peer and stop the replication
|
||||
|
|
|
@ -375,9 +375,9 @@ class AsyncHBaseAdmin implements AsyncAdmin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public CompletableFuture<Void>
|
||||
addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig) {
|
||||
return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig));
|
||||
public CompletableFuture<Void> addReplicationPeer(String peerId,
|
||||
ReplicationPeerConfig peerConfig, boolean enabled) {
|
||||
return wrap(rawAdmin.addReplicationPeer(peerId, peerConfig, enabled));
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -103,7 +103,6 @@ import org.apache.hadoop.ipc.RemoteException;
|
|||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.yetus.audience.InterfaceStability;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||
|
@ -3850,13 +3849,13 @@ public class HBaseAdmin implements Admin {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws IOException {
|
||||
executeCallable(new MasterCallable<Void>(getConnection(), getRpcControllerFactory()) {
|
||||
@Override
|
||||
protected Void rpcCall() throws Exception {
|
||||
master.addReplicationPeer(getRpcController(),
|
||||
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig));
|
||||
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled));
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
@ -3953,11 +3952,6 @@ public class HBaseAdmin implements Admin {
|
|||
return listReplicationPeers((Pattern)null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ReplicationPeerDescription> listReplicationPeers(String regex) throws IOException {
|
||||
return listReplicationPeers(Pattern.compile(regex));
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<ReplicationPeerDescription> listReplicationPeers(Pattern pattern)
|
||||
throws IOException {
|
||||
|
|
|
@ -1501,14 +1501,14 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
|
|||
|
||||
@Override
|
||||
public CompletableFuture<Void> addReplicationPeer(String peerId,
|
||||
ReplicationPeerConfig peerConfig) {
|
||||
ReplicationPeerConfig peerConfig, boolean enabled) {
|
||||
return this
|
||||
.<Void> newMasterCaller()
|
||||
.action(
|
||||
(controller, stub) -> this
|
||||
.<AddReplicationPeerRequest, AddReplicationPeerResponse, Void> call(controller, stub,
|
||||
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig), (s, c, req,
|
||||
done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
|
||||
RequestConverter.buildAddReplicationPeerRequest(peerId, peerConfig, enabled), (s,
|
||||
c, req, done) -> s.addReplicationPeer(c, req, done), (resp) -> null)).call();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1618,10 +1618,15 @@ public final class RequestConverter {
|
|||
}
|
||||
|
||||
public static ReplicationProtos.AddReplicationPeerRequest buildAddReplicationPeerRequest(
|
||||
String peerId, ReplicationPeerConfig peerConfig) {
|
||||
String peerId, ReplicationPeerConfig peerConfig, boolean enabled) {
|
||||
AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder();
|
||||
builder.setPeerId(peerId);
|
||||
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
|
||||
ReplicationProtos.ReplicationState.Builder stateBuilder =
|
||||
ReplicationProtos.ReplicationState.newBuilder();
|
||||
stateBuilder.setState(enabled ? ReplicationProtos.ReplicationState.State.ENABLED
|
||||
: ReplicationProtos.ReplicationState.State.DISABLED);
|
||||
builder.setPeerState(stateBuilder.build());
|
||||
return builder.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -77,6 +77,7 @@ message ReplicationHLogPosition {
|
|||
message AddReplicationPeerRequest {
|
||||
required string peer_id = 1;
|
||||
required ReplicationPeer peer_config = 2;
|
||||
required ReplicationState peer_state = 3;
|
||||
}
|
||||
|
||||
message AddReplicationPeerResponse {
|
||||
|
|
|
@ -51,7 +51,18 @@ public interface ReplicationPeers {
|
|||
* @param peerId a short that identifies the cluster
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
*/
|
||||
void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
default void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
throws ReplicationException {
|
||||
registerPeer(peerId, peerConfig, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a new remote slave cluster for replication.
|
||||
* @param peerId a short that identifies the cluster
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
* @param enabled peer state, true if ENABLED and false if DISABLED
|
||||
*/
|
||||
void registerPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -105,7 +105,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
|||
}
|
||||
|
||||
@Override
|
||||
public void registerPeer(String id, ReplicationPeerConfig peerConfig)
|
||||
public void registerPeer(String id, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException {
|
||||
try {
|
||||
if (peerExists(id)) {
|
||||
|
@ -130,19 +130,18 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
|||
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
|
||||
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<>(2);
|
||||
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
|
||||
ZKUtilOp op1 =
|
||||
ZKUtilOp.createAndFailSilent(getPeerNode(id),
|
||||
ReplicationPeerConfigUtil.toByteArray(peerConfig));
|
||||
// b/w PeerWatcher and ReplicationZookeeper#add method to create the
|
||||
// peer-state znode. This happens while adding a peer
|
||||
// The peer state data is set as "ENABLED" by default.
|
||||
ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
|
||||
ZKUtilOp op2 =
|
||||
ZKUtilOp.createAndFailSilent(getPeerStateNode(id), enabled ? ENABLED_ZNODE_BYTES
|
||||
: DISABLED_ZNODE_BYTES);
|
||||
listOfOps.add(op1);
|
||||
listOfOps.add(op2);
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||
// A peer is enabled by default
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not add peer with id=" + id
|
||||
+ ", peerConfif=>" + peerConfig, e);
|
||||
throw new ReplicationException("Could not add peer with id=" + id + ", peerConfif=>"
|
||||
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -3322,14 +3322,14 @@ public class HMaster extends HRegionServer implements MasterServices {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException, IOException {
|
||||
if (cpHost != null) {
|
||||
cpHost.preAddReplicationPeer(peerId, peerConfig);
|
||||
}
|
||||
LOG.info(getClientIdAuditPrefix() + " creating replication peer, id=" + peerId + ", config="
|
||||
+ peerConfig);
|
||||
this.replicationManager.addReplicationPeer(peerId, peerConfig);
|
||||
+ peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"));
|
||||
this.replicationManager.addReplicationPeer(peerId, peerConfig, enabled);
|
||||
if (cpHost != null) {
|
||||
cpHost.postAddReplicationPeer(peerId, peerConfig);
|
||||
}
|
||||
|
|
|
@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.util.ForeignExceptionUtil;
|
|||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
|
||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
||||
|
@ -267,6 +266,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListR
|
|||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ListReplicationPeersResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.RemoveReplicationPeerResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.ReplicationState;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigRequest;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.UpdateReplicationPeerConfigResponse;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription;
|
||||
|
@ -1809,7 +1809,8 @@ public class MasterRpcServices extends RSRpcServices
|
|||
AddReplicationPeerRequest request) throws ServiceException {
|
||||
try {
|
||||
master.addReplicationPeer(request.getPeerId(),
|
||||
ReplicationPeerConfigUtil.convert(request.getPeerConfig()));
|
||||
ReplicationPeerConfigUtil.convert(request.getPeerConfig()), request.getPeerState()
|
||||
.getState().equals(ReplicationState.State.ENABLED));
|
||||
return AddReplicationPeerResponse.newBuilder().build();
|
||||
} catch (ReplicationException | IOException e) {
|
||||
throw new ServiceException(e);
|
||||
|
|
|
@ -428,8 +428,9 @@ public interface MasterServices extends Server {
|
|||
* Add a new replication peer for replicating data to slave cluster
|
||||
* @param peerId a short name that identifies the peer
|
||||
* @param peerConfig configuration for the replication slave cluster
|
||||
* @param enabled peer state, true if ENABLED and false if DISABLED
|
||||
*/
|
||||
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException, IOException;
|
||||
|
||||
/**
|
||||
|
|
|
@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
|
|||
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
|
@ -40,6 +39,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Manages and performs all replication admin operations.
|
||||
|
@ -69,12 +69,12 @@ public class ReplicationManager {
|
|||
}
|
||||
}
|
||||
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException, IOException {
|
||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
||||
peerConfig.getTableCFsMap());
|
||||
checkConfiguredWALEntryFilters(peerConfig);
|
||||
replicationPeers.registerPeer(peerId, peerConfig);
|
||||
replicationPeers.registerPeer(peerId, peerConfig, enabled);
|
||||
replicationPeers.peerConnected(peerId);
|
||||
}
|
||||
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.HashSet;
|
|||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -141,6 +142,21 @@ public class TestReplicationAdmin {
|
|||
assertEquals(0, admin.getPeersCount());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAddPeerWithState() throws Exception {
|
||||
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
|
||||
rpc1.setClusterKey(KEY_ONE);
|
||||
hbaseAdmin.addReplicationPeer(ID_ONE, rpc1, true);
|
||||
assertTrue(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_ONE)).get(0).isEnabled());
|
||||
hbaseAdmin.removeReplicationPeer(ID_ONE);
|
||||
|
||||
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
|
||||
rpc2.setClusterKey(KEY_SECOND);
|
||||
hbaseAdmin.addReplicationPeer(ID_SECOND, rpc2, false);
|
||||
assertFalse(hbaseAdmin.listReplicationPeers(Pattern.compile(ID_SECOND)).get(0).isEnabled());
|
||||
hbaseAdmin.removeReplicationPeer(ID_SECOND);
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests that the peer configuration used by ReplicationAdmin contains all
|
||||
* the peer's properties.
|
||||
|
|
|
@ -393,7 +393,7 @@ public class MockNoopMasterServices implements MasterServices, Server {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig)
|
||||
public void addReplicationPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
|
||||
throws ReplicationException {
|
||||
}
|
||||
|
||||
|
|
|
@ -65,6 +65,7 @@ module Hbase
|
|||
data = args.fetch(DATA, nil)
|
||||
table_cfs = args.fetch(TABLE_CFS, nil)
|
||||
namespaces = args.fetch(NAMESPACES, nil)
|
||||
peer_state = args.fetch(STATE, nil)
|
||||
|
||||
# Create and populate a ReplicationPeerConfig
|
||||
replication_peer_config = ReplicationPeerConfig.new
|
||||
|
@ -102,7 +103,12 @@ module Hbase
|
|||
end
|
||||
replication_peer_config.set_table_cfs_map(map)
|
||||
end
|
||||
@admin.addReplicationPeer(id, replication_peer_config)
|
||||
|
||||
enabled = true
|
||||
unless peer_state.nil?
|
||||
enabled = false if peer_state == 'DISABLED'
|
||||
end
|
||||
@admin.addReplicationPeer(id, replication_peer_config, enabled)
|
||||
else
|
||||
raise(ArgumentError, 'args must be a Hash')
|
||||
end
|
||||
|
|
|
@ -79,6 +79,7 @@ module HBaseConstants
|
|||
CLUSTER_KEY = 'CLUSTER_KEY'.freeze
|
||||
TABLE_CFS = 'TABLE_CFS'.freeze
|
||||
NAMESPACES = 'NAMESPACES'.freeze
|
||||
STATE = 'STATE'.freeze
|
||||
CONFIG = 'CONFIG'.freeze
|
||||
DATA = 'DATA'.freeze
|
||||
SERVER_NAME = 'SERVER_NAME'.freeze
|
||||
|
|
|
@ -28,6 +28,8 @@ must be specified to identify the peer.
|
|||
For a HBase cluster peer, a cluster key must be provided and is composed like this:
|
||||
hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
|
||||
This gives a full path for HBase to connect to another HBase cluster.
|
||||
An optional parameter for state identifies the replication peer's state is enabled or disabled.
|
||||
And the default state is enabled.
|
||||
An optional parameter for namespaces identifies which namespace's tables will be replicated
|
||||
to the peer cluster.
|
||||
An optional parameter for table column families identifies which tables and/or column families
|
||||
|
@ -40,6 +42,8 @@ then you can't set this namespace's tables in the peer config again.
|
|||
Examples:
|
||||
|
||||
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase"
|
||||
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "ENABLED"
|
||||
hbase> add_peer '1', CLUSTER_KEY => "server1.cie.com:2181:/hbase", STATE => "DISABLED"
|
||||
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
|
||||
TABLE_CFS => { "table1" => [], "table2" => ["cf1"], "table3" => ["cf1", "cf2"] }
|
||||
hbase> add_peer '2', CLUSTER_KEY => "zk1,zk2,zk3:2182:/hbase-prod",
|
||||
|
|
|
@ -93,7 +93,7 @@ module Hbase
|
|||
command(:remove_peer, @peer_id)
|
||||
end
|
||||
|
||||
define_test "add_peer: single zk cluster key - peer config" do
|
||||
define_test "add_peer: single zk cluster key with enabled/disabled state" do
|
||||
cluster_key = "server1.cie.com:2181:/hbase"
|
||||
|
||||
args = { CLUSTER_KEY => cluster_key }
|
||||
|
@ -101,9 +101,26 @@ module Hbase
|
|||
|
||||
assert_equal(1, command(:list_peers).length)
|
||||
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
|
||||
assert_equal(cluster_key, command(:list_peers).get(0).getPeerConfig.getClusterKey)
|
||||
assert_equal(true, command(:list_peers).get(0).isEnabled)
|
||||
|
||||
command(:remove_peer, @peer_id)
|
||||
|
||||
enable_args = { CLUSTER_KEY => cluster_key, STATE => 'ENABLED' }
|
||||
command(:add_peer, @peer_id, enable_args)
|
||||
|
||||
assert_equal(1, command(:list_peers).length)
|
||||
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
|
||||
assert_equal(true, command(:list_peers).get(0).isEnabled)
|
||||
|
||||
command(:remove_peer, @peer_id)
|
||||
|
||||
disable_args = { CLUSTER_KEY => cluster_key, STATE => 'DISABLED' }
|
||||
command(:add_peer, @peer_id, disable_args)
|
||||
|
||||
assert_equal(1, command(:list_peers).length)
|
||||
assert_equal(@peer_id, command(:list_peers).get(0).getPeerId)
|
||||
assert_equal(false, command(:list_peers).get(0).isEnabled)
|
||||
|
||||
# cleanup for future tests
|
||||
command(:remove_peer, @peer_id)
|
||||
end
|
||||
|
||||
|
|
|
@ -1371,6 +1371,7 @@ add_peer <ID> <CLUSTER_KEY>::
|
|||
Adds a replication relationship between two clusters. +
|
||||
* ID -- a unique string, which must not contain a hyphen.
|
||||
* CLUSTER_KEY: composed using the following template, with appropriate place-holders: `hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent`
|
||||
* STATE(optional): ENABLED or DISABLED, default value is ENABLED
|
||||
list_peers:: list all replication relationships known by this cluster
|
||||
enable_peer <ID>::
|
||||
Enable a previously-disabled replication relationship
|
||||
|
|
Loading…
Reference in New Issue