HBASE-15507 Online modification of enabled ReplicationPeerConfig (Geoffrey Jacoby)

This commit is contained in:
tedyu 2016-04-08 21:26:31 -07:00
parent 6ea4994569
commit e0f31ba6e6
15 changed files with 301 additions and 12 deletions

View File

@ -47,7 +47,6 @@ import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
@ -201,7 +200,11 @@ public class ReplicationAdmin implements Closeable {
public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
}
public void updatePeerConfig(String id, ReplicationPeerConfig peerConfig)
throws ReplicationException {
this.replicationPeers.updatePeerConfig(id, peerConfig);
}
/**
* Removes a peer cluster and stops the replication to it.
* @param id a short name that identifies the cluster
@ -549,6 +552,11 @@ public class ReplicationAdmin implements Closeable {
}
}
@VisibleForTesting
public void peerAdded(String id) throws ReplicationException {
this.replicationPeers.peerAdded(id);
}
@VisibleForTesting
List<ReplicationPeer> listReplicationPeers() {
Map<String, ReplicationPeerConfig> peers = listPeerConfigs();

View File

@ -71,4 +71,6 @@ public interface ReplicationPeer {
*/
public Map<TableName, List<String>> getTableCFs();
void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
}

View File

@ -0,0 +1,33 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface ReplicationPeerConfigListener {
/** Callback method for when users update the ReplicationPeerConfig for this peer
*
* @param rpc The updated ReplicationPeerConfig
*/
void peerConfigUpdated(ReplicationPeerConfig rpc);
}

View File

@ -109,7 +109,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
this.readPeerConfig();
}
private void readPeerConfig() {
private ReplicationPeerConfig readPeerConfig() {
try {
byte[] data = peerConfigTracker.getData(false);
if (data != null) {
@ -118,6 +118,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
} catch (DeserializationException e) {
LOG.error("", e);
}
return this.peerConfig;
}
@Override
@ -162,6 +163,13 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
return this.tableCFs;
}
@Override
public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
if (this.peerConfigTracker != null){
this.peerConfigTracker.setListener(listener);
}
}
@Override
public void abort(String why, Throwable e) {
LOG.fatal("The ReplicationPeer coresponding to peer " + peerConfig
@ -260,24 +268,36 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
*/
public class PeerConfigTracker extends ZooKeeperNodeTracker {
ReplicationPeerConfigListener listener;
public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
Abortable abortable) {
super(watcher, peerConfigNode, abortable);
}
public synchronized void setListener(ReplicationPeerConfigListener listener){
this.listener = listener;
}
@Override
public synchronized void nodeCreated(String path) {
if (path.equals(node)) {
super.nodeCreated(path);
readPeerConfig();
ReplicationPeerConfig config = readPeerConfig();
if (listener != null){
listener.peerConfigUpdated(config);
}
}
}
@Override
public synchronized void nodeDataChanged(String path) {
//superclass calls nodeCreated
if (path.equals(node)) {
super.nodeDataChanged(path);
}
}
}
}

View File

@ -151,4 +151,6 @@ public interface ReplicationPeers {
* @return the configuration for the peer cluster, null if it was unable to get the configuration
*/
Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
}

View File

@ -351,6 +351,45 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
return new Pair<ReplicationPeerConfig, Configuration>(peerConfig, otherConf);
}
@Override
public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
throws ReplicationException {
ReplicationPeer peer = getPeer(id);
if (peer == null){
throw new ReplicationException("Could not find peer Id " + id);
}
ReplicationPeerConfig existingConfig = peer.getPeerConfig();
if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() &&
!newConfig.getClusterKey().equals(existingConfig.getClusterKey())){
throw new ReplicationException("Changing the cluster key on an existing peer is not allowed."
+ " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '"
+ newConfig.getClusterKey() +
"'");
}
String existingEndpointImpl = existingConfig.getReplicationEndpointImpl();
if (newConfig.getReplicationEndpointImpl() != null &&
!newConfig.getReplicationEndpointImpl().isEmpty() &&
!newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){
throw new ReplicationException("Changing the replication endpoint implementation class " +
"on an existing peer is not allowed. Existing class '"
+ existingConfig.getReplicationEndpointImpl()
+ "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
}
//Update existingConfig's peer config and peer data with the new values, but don't touch config
// or data that weren't explicitly changed
existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
existingConfig.getPeerData().putAll(newConfig.getPeerData());
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),
ReplicationSerDeHelper.toByteArray(existingConfig));
}
catch(KeeperException ke){
throw new ReplicationException("There was a problem trying to save changes to the " +
"replication peer " + id, ke);
}
}
/**
* List all registered peer clusters and set a watch on their znodes.
*/

View File

@ -21,10 +21,13 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractService;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
/**
* A Base implementation for {@link ReplicationEndpoint}s. Users should consider extending this
@ -35,11 +38,30 @@ import com.google.common.util.concurrent.AbstractService;
public abstract class BaseReplicationEndpoint extends AbstractService
implements ReplicationEndpoint {
private static final Log LOG = LogFactory.getLog(BaseReplicationEndpoint.class);
protected Context ctx;
@Override
public void init(Context context) throws IOException {
this.ctx = context;
if (this.ctx != null){
ReplicationPeer peer = this.ctx.getReplicationPeer();
if (peer != null){
peer.trackPeerConfigChanges(this);
} else {
LOG.warn("Not tracking replication peer config changes for Peer Id " + this.ctx.getPeerId() +
" because there's no such peer");
}
}
}
@Override
/**
* No-op implementation for subclasses to override if they wish to execute logic if their config changes
*/
public void peerConfigUpdated(ReplicationPeerConfig rpc){
}
/** Returns a default set of filters */

View File

@ -46,14 +46,13 @@ import com.google.common.util.concurrent.Service;
* and persisting of the WAL entries in the other cluster.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
public interface ReplicationEndpoint extends Service {
public interface ReplicationEndpoint extends Service, ReplicationPeerConfigListener {
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class Context {
private final Configuration conf;
private final FileSystem fs;
private final TableDescriptors tableDescriptors;
private final ReplicationPeerConfig peerConfig;
private final ReplicationPeer replicationPeer;
private final String peerId;
private final UUID clusterId;
@ -63,13 +62,11 @@ public interface ReplicationEndpoint extends Service {
public Context(
final Configuration conf,
final FileSystem fs,
final ReplicationPeerConfig peerConfig,
final String peerId,
final UUID clusterId,
final ReplicationPeer replicationPeer,
final MetricsSource metrics,
final TableDescriptors tableDescriptors) {
this.peerConfig = peerConfig;
this.conf = conf;
this.fs = fs;
this.clusterId = clusterId;
@ -91,7 +88,7 @@ public interface ReplicationEndpoint extends Service {
return peerId;
}
public ReplicationPeerConfig getPeerConfig() {
return peerConfig;
return replicationPeer.getPeerConfig();
}
public ReplicationPeer getReplicationPeer() {
return replicationPeer;

View File

@ -494,7 +494,7 @@ public class ReplicationSourceManager implements ReplicationListener {
// init replication endpoint
replicationEndpoint.init(new ReplicationEndpoint.Context(replicationPeer.getConfiguration(),
fs, peerConfig, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors));
return src;
}

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hbase.TagUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@ -56,6 +57,11 @@ public class VisibilityReplicationEndpoint implements ReplicationEndpoint {
delegator.init(context);
}
@Override
public void peerConfigUpdated(ReplicationPeerConfig rpc){
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
if (!delegator.canReplicateToSameCluster()) {

View File

@ -18,6 +18,7 @@ import static org.junit.Assert.fail;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@ -27,6 +28,8 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.TestReplicationBase;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@ -195,4 +198,59 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
adminExt.disableTableRep(tableName);
}
}
@Test(timeout=300000)
public void testReplicationPeerConfigUpdateCallback() throws Exception {
String peerId = "1";
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
rpc.getConfiguration().put("key1", "value1");
admin.addPeer(peerId, rpc);
admin.peerAdded(peerId);
rpc.getConfiguration().put("key1", "value2");
admin.updatePeerConfig(peerId, rpc);
if (!TestUpdatableReplicationEndpoint.hasCalledBack()) {
synchronized(TestUpdatableReplicationEndpoint.class) {
TestUpdatableReplicationEndpoint.class.wait(2000L);
}
}
assertEquals(true, TestUpdatableReplicationEndpoint.hasCalledBack());
}
public static class TestUpdatableReplicationEndpoint extends BaseReplicationEndpoint {
private static boolean calledBack = false;
public static boolean hasCalledBack(){
return calledBack;
}
@Override
public synchronized void peerConfigUpdated(ReplicationPeerConfig rpc){
calledBack = true;
notifyAll();
}
@Override
protected void doStart() {
notifyStarted();
}
@Override
protected void doStop() {
notifyStopped();
}
@Override
public UUID getPeerUUID() {
return UUID.randomUUID();
}
@Override
public boolean replicate(ReplicateContext replicateContext) {
return false;
}
}
}

View File

@ -92,8 +92,9 @@ module Hbase
table_cfs.each{|key, val|
map.put(org.apache.hadoop.hbase.TableName.valueOf(key), val)
}
replication_peer_config.set_table_cfs_map(map)
end
@replication_admin.add_peer(id, replication_peer_config, map)
@replication_admin.add_peer(id, replication_peer_config)
else
raise(ArgumentError, "args must be a Hash")
end
@ -202,5 +203,31 @@ module Hbase
def get_peer_config(id)
@replication_admin.get_peer_config(id)
end
def peer_added(id)
@replication_admin.peer_added(id)
end
def update_peer_config(id, args={})
# Optional parameters
config = args.fetch(CONFIG, nil)
data = args.fetch(DATA, nil)
# Create and populate a ReplicationPeerConfig
replication_peer_config = ReplicationPeerConfig.new
unless config.nil?
replication_peer_config.get_configuration.put_all(config)
end
unless data.nil?
# Convert Strings to Bytes for peer_data
peer_data = replication_peer_config.get_peer_data
data.each{|key, val|
peer_data.put(Bytes.to_bytes(key), Bytes.to_bytes(val))
}
end
@replication_admin.update_peer_config(id, replication_peer_config)
end
end
end

View File

@ -365,6 +365,7 @@ Shell.load_command_group(
disable_table_replication
get_peer_config
list_peer_configs
update_peer_config
]
)

View File

@ -0,0 +1,49 @@
#
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Shell
module Commands
class UpdatePeerConfig< Command
def help
return <<-EOF
A peer can either be another HBase cluster or a custom replication endpoint. In either case an id
must be specified to identify the peer. This command does not interrupt processing on an enabled replication peer.
Two optional arguments are DATA and CONFIG which can be specified to set different values for either
the peer_data or configuration for a custom replication endpoint. Any existing values not updated by this command
are left unchanged.
CLUSTER_KEY, REPLICATION_ENDPOINT, and TABLE_CFs cannot be updated with this command.
To update TABLE_CFs, see the append_peer_tableCFs and remove_peer_tableCFs commands.
hbase> update_peer_config '1', DATA => { "key1" => 1 }
hbase> update_peer_config '2', CONFIG => { "config1" => "value1", "config2" => "value2" }
hbase> update_peer_config '3', DATA => { "key1" => 1 }, CONFIG => { "config1" => "value1", "config2" => "value2" },
EOF
end
def command(id, args = {})
format_simple_command do
replication_admin.update_peer_config(id, args)
end
end
end
end
end

View File

@ -200,6 +200,31 @@ module Hbase
replication_admin.remove_peer(peer_id_second)
end
define_test "update_peer_config: can update peer config and data" do
repl_impl = "org.apache.hadoop.hbase.replication.ReplicationEndpointForTest"
config_params = { "config1" => "value1", "config2" => "value2" }
data_params = {"data1" => "value1", "data2" => "value2"}
args = { ENDPOINT_CLASSNAME => repl_impl, CONFIG => config_params, DATA => data_params}
replication_admin.add_peer(@peer_id, args)
#Normally the ReplicationSourceManager will call ReplicationPeer#peer_added, but here we have to do it ourselves
replication_admin.peer_added(@peer_id)
new_config_params = { "config1" => "new_value1" }
new_data_params = {"data1" => "new_value1"}
new_args = {CONFIG => new_config_params, DATA => new_data_params}
replication_admin.update_peer_config(@peer_id, new_args)
#Make sure the updated key/value pairs in config and data were successfully updated, and that those we didn't
#update are still there and unchanged
peer_config = replication_admin.get_peer_config(@peer_id)
replication_admin.remove_peer(@peer_id)
assert_equal("new_value1", peer_config.get_configuration.get("config1"))
assert_equal("value2", peer_config.get_configuration.get("config2"))
assert_equal("new_value1", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data1"))))
assert_equal("value2", Bytes.to_string(peer_config.get_peer_data.get(Bytes.toBytes("data2"))))
end
# assert_raise fails on native exceptions - https://jira.codehaus.org/browse/JRUBY-5279
# Can't catch native Java exception with assert_raise in JRuby 1.6.8 as in the test below.
# define_test "add_peer: adding a second peer with same id should error" do