HBASE-19576 Introduce builder for ReplicationPeerConfig and make it immutable

This commit is contained in:
Guanghao Zhang 2017-12-22 11:42:40 +08:00
parent 96ae9add68
commit b552eb2ff9
12 changed files with 383 additions and 76 deletions

View File

@ -2527,7 +2527,7 @@ public interface Admin extends Abortable, Closeable {
* @throws IOException if a remote or network exception occurs
*/
void appendReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs)
Map<TableName, List<String>> tableCfs)
throws ReplicationException, IOException;
/**
@ -2538,7 +2538,7 @@ public interface Admin extends Abortable, Closeable {
* @throws IOException if a remote or network exception occurs
*/
void removeReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs)
Map<TableName, List<String>> tableCfs)
throws ReplicationException, IOException;
/**

View File

@ -597,7 +597,7 @@ public interface AsyncAdmin {
* @param tableCfs A map from tableName to column family names
*/
CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs);
Map<TableName, List<String>> tableCfs);
/**
* Remove some table-cfs from config of the specified peer
@ -605,7 +605,7 @@ public interface AsyncAdmin {
* @param tableCfs A map from tableName to column family names
*/
CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs);
Map<TableName, List<String>> tableCfs);
/**
* Return a list of replication peers.

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.client;
import com.google.protobuf.RpcChannel;
import java.util.Collection;
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@ -409,13 +408,13 @@ class AsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs) {
Map<TableName, List<String>> tableCfs) {
return wrap(rawAdmin.appendReplicationPeerTableCFs(peerId, tableCfs));
}
@Override
public CompletableFuture<Void> removeReplicationPeerTableCFs(String peerId,
Map<TableName, ? extends Collection<String>> tableCfs) {
Map<TableName, List<String>> tableCfs) {
return wrap(rawAdmin.removeReplicationPeerTableCFs(peerId, tableCfs));
}

View File

@ -26,7 +26,6 @@ import java.io.InterruptedIOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
@ -3926,26 +3925,28 @@ public class HBaseAdmin implements Admin {
@Override
public void appendReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
IOException {
Map<TableName, List<String>> tableCfs)
throws ReplicationException, IOException {
if (tableCfs == null) {
throw new ReplicationException("tableCfs is null");
}
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
ReplicationPeerConfig newPeerConfig =
ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
updateReplicationPeerConfig(id, peerConfig);
updateReplicationPeerConfig(id, newPeerConfig);
}
@Override
public void removeReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException,
IOException {
Map<TableName, List<String>> tableCfs)
throws ReplicationException, IOException {
if (tableCfs == null) {
throw new ReplicationException("tableCfs is null");
}
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
ReplicationPeerConfig newPeerConfig =
ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
updateReplicationPeerConfig(id, peerConfig);
updateReplicationPeerConfig(id, newPeerConfig);
}
@Override

View File

@ -26,7 +26,6 @@ import com.google.protobuf.RpcChannel;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@ -1593,7 +1592,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> appendReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) {
Map<TableName, List<String>> tableCfs) {
if (tableCfs == null) {
return failedFuture(new ReplicationException("tableCfs is null"));
}
@ -1601,8 +1600,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
CompletableFuture<Void> future = new CompletableFuture<Void>();
getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
ReplicationPeerConfig newPeerConfig =
ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
if (!completeExceptionally(future, error)) {
future.complete(result);
}
@ -1614,7 +1614,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
@Override
public CompletableFuture<Void> removeReplicationPeerTableCFs(String id,
Map<TableName, ? extends Collection<String>> tableCfs) {
Map<TableName, List<String>> tableCfs) {
if (tableCfs == null) {
return failedFuture(new ReplicationException("tableCfs is null"));
}
@ -1623,14 +1623,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
getReplicationPeerConfig(id).whenComplete(
(peerConfig, error) -> {
if (!completeExceptionally(future, error)) {
ReplicationPeerConfig newPeerConfig = null;
try {
ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig,
id);
newPeerConfig = ReplicationPeerConfigUtil
.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
} catch (ReplicationException e) {
future.completeExceptionally(e);
return;
}
updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> {
if (!completeExceptionally(future, error)) {
future.complete(result);
}

View File

@ -28,6 +28,7 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.regex.Pattern;
import org.apache.curator.shaded.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
@ -251,7 +252,7 @@ public class ReplicationAdmin implements Closeable {
@Deprecated
public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException, IOException {
this.admin.appendReplicationPeerTableCFs(id, tableCfs);
this.admin.appendReplicationPeerTableCFs(id, copyTableCFs(tableCfs));
}
/**
@ -279,7 +280,17 @@ public class ReplicationAdmin implements Closeable {
@Deprecated
public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
throws ReplicationException, IOException {
this.admin.removeReplicationPeerTableCFs(id, tableCfs);
this.admin.removeReplicationPeerTableCFs(id, copyTableCFs(tableCfs));
}
private Map<TableName, List<String>>
copyTableCFs(Map<TableName, ? extends Collection<String>> tableCfs) {
Map<TableName, List<String>> newTableCfs = new HashMap<>();
if (tableCfs != null) {
tableCfs.forEach(
(table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
}
return newTableCfs;
}
/**

View File

@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Strings;
@ -260,63 +262,66 @@ public final class ReplicationPeerConfigUtil {
return convert(peer);
} else {
if (bytes.length > 0) {
return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes));
return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build();
}
return new ReplicationPeerConfig().setClusterKey("");
return ReplicationPeerConfig.newBuilder().setClusterKey("").build();
}
}
public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) {
ReplicationPeerConfig peerConfig = new ReplicationPeerConfig();
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder();
if (peer.hasClusterkey()) {
peerConfig.setClusterKey(peer.getClusterkey());
builder.setClusterKey(peer.getClusterkey());
}
if (peer.hasReplicationEndpointImpl()) {
peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl());
}
Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) {
peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
peerData.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray());
}
builder.setPeerData(peerData);
Map<String, String> configuration = new HashMap<>();
for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) {
peerConfig.getConfiguration().put(pair.getName(), pair.getValue());
configuration.put(pair.getName(), pair.getValue());
}
builder.setConfiguration(configuration);
Map<TableName, ? extends Collection<String>> tableCFsMap = convert2Map(
Map<TableName, List<String>> tableCFsMap = convert2Map(
peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()]));
if (tableCFsMap != null) {
peerConfig.setTableCFsMap(tableCFsMap);
builder.setTableCFsMap(tableCFsMap);
}
List<ByteString> namespacesList = peer.getNamespacesList();
if (namespacesList != null && namespacesList.size() != 0) {
peerConfig.setNamespaces(
builder.setNamespaces(
namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
}
if (peer.hasBandwidth()) {
peerConfig.setBandwidth(peer.getBandwidth());
builder.setBandwidth(peer.getBandwidth());
}
if (peer.hasReplicateAll()) {
peerConfig.setReplicateAllUserTables(peer.getReplicateAll());
builder.setReplicateAllUserTables(peer.getReplicateAll());
}
Map<TableName, ? extends Collection<String>> excludeTableCFsMap =
convert2Map(peer.getExcludeTableCfsList()
Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
.toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
if (excludeTableCFsMap != null) {
peerConfig.setExcludeTableCFsMap(excludeTableCFsMap);
builder.setExcludeTableCFsMap(excludeTableCFsMap);
}
List<ByteString> excludeNamespacesList = peer.getExcludeNamespacesList();
if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) {
peerConfig.setExcludeNamespaces(
builder.setExcludeNamespaces(
excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet()));
}
return peerConfig;
return builder.build();
}
public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
@ -408,56 +413,69 @@ public final class ReplicationPeerConfigUtil {
return builder.build();
}
public static void appendTableCFsToReplicationPeerConfig(
Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig) {
public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig(
Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig) {
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
if (preTableCfs == null) {
peerConfig.setTableCFsMap(tableCfs);
builder.setTableCFsMap(tableCfs);
} else {
Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
Collection<String> appendCfs = entry.getValue();
if (preTableCfs.containsKey(table)) {
List<String> cfs = preTableCfs.get(table);
if (newTableCfs.containsKey(table)) {
List<String> cfs = newTableCfs.get(table);
if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
preTableCfs.put(table, null);
newTableCfs.put(table, null);
} else {
Set<String> cfSet = new HashSet<String>(cfs);
cfSet.addAll(appendCfs);
preTableCfs.put(table, Lists.newArrayList(cfSet));
newTableCfs.put(table, Lists.newArrayList(cfSet));
}
} else {
if (appendCfs == null || appendCfs.isEmpty()) {
preTableCfs.put(table, null);
newTableCfs.put(table, null);
} else {
preTableCfs.put(table, Lists.newArrayList(appendCfs));
newTableCfs.put(table, Lists.newArrayList(appendCfs));
}
}
}
builder.setTableCFsMap(newTableCfs);
}
return builder.build();
}
public static void removeTableCFsFromReplicationPeerConfig(
Map<TableName, ? extends Collection<String>> tableCfs, ReplicationPeerConfig peerConfig,
private static Map<TableName, List<String>>
copyTableCFsMap(Map<TableName, List<String>> preTableCfs) {
Map<TableName, List<String>> newTableCfs = new HashMap<>();
preTableCfs.forEach(
(table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null));
return newTableCfs;
}
public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig(
Map<TableName, List<String>> tableCfs, ReplicationPeerConfig peerConfig,
String id) throws ReplicationException {
Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
if (preTableCfs == null) {
throw new ReplicationException("Table-Cfs for peer: " + id + " is null");
}
Map<TableName, List<String>> newTableCfs = copyTableCFsMap(preTableCfs);
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
TableName table = entry.getKey();
Collection<String> removeCfs = entry.getValue();
if (preTableCfs.containsKey(table)) {
List<String> cfs = preTableCfs.get(table);
if (newTableCfs.containsKey(table)) {
List<String> cfs = newTableCfs.get(table);
if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
preTableCfs.remove(table);
newTableCfs.remove(table);
} else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
Set<String> cfSet = new HashSet<String>(cfs);
cfSet.removeAll(removeCfs);
if (cfSet.isEmpty()) {
preTableCfs.remove(table);
newTableCfs.remove(table);
} else {
preTableCfs.put(table, Lists.newArrayList(cfSet));
newTableCfs.put(table, Lists.newArrayList(cfSet));
}
} else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
throw new ReplicationException("Cannot remove cf of table: " + table
@ -467,10 +485,13 @@ public final class ReplicationPeerConfigUtil {
+ " which has specified cfs from table-cfs config in peer: " + id);
}
} else {
throw new ReplicationException("No table: "
+ table + " in table-cfs config of peer: " + id);
throw new ReplicationException(
"No table: " + table + " in table-cfs config of peer: " + id);
}
}
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig);
builder.setTableCFsMap(newTableCfs);
return builder.build();
}
/**

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.replication;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -41,12 +42,44 @@ public class ReplicationPeerConfig {
private final Map<String, String> configuration;
private Map<TableName, ? extends Collection<String>> tableCFsMap = null;
private Set<String> namespaces = null;
private long bandwidth = 0;
// Default value is true, means replicate all user tables to peer cluster.
private boolean replicateAllUserTables = true;
private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
private Set<String> excludeNamespaces = null;
private long bandwidth = 0;
private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
this.clusterKey = builder.clusterKey;
this.replicationEndpointImpl = builder.replicationEndpointImpl;
this.peerData = Collections.unmodifiableMap(builder.peerData);
this.configuration = Collections.unmodifiableMap(builder.configuration);
this.tableCFsMap =
builder.tableCFsMap != null ? unmodifiableTableCFsMap(builder.tableCFsMap) : null;
this.namespaces =
builder.namespaces != null ? Collections.unmodifiableSet(builder.namespaces) : null;
this.replicateAllUserTables = builder.replicateAllUserTables;
this.excludeTableCFsMap =
builder.excludeTableCFsMap != null ? unmodifiableTableCFsMap(builder.excludeTableCFsMap)
: null;
this.excludeNamespaces =
builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces)
: null;
this.bandwidth = builder.bandwidth;
}
private Map<TableName, List<String>>
unmodifiableTableCFsMap(Map<TableName, List<String>> tableCFsMap) {
Map<TableName, List<String>> newTableCFsMap = new HashMap<>();
tableCFsMap.forEach((table, cfs) -> newTableCFsMap.put(table,
cfs != null ? Collections.unmodifiableList(cfs) : null));
return Collections.unmodifiableMap(newTableCFsMap);
}
/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder} to create new ReplicationPeerConfig.
*/
@Deprecated
public ReplicationPeerConfig() {
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
this.configuration = new HashMap<>(0);
@ -55,7 +88,10 @@ public class ReplicationPeerConfig {
/**
* Set the clusterKey which is the concatenation of the slave cluster's:
* hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setClusterKey(String)} instead.
*/
@Deprecated
public ReplicationPeerConfig setClusterKey(String clusterKey) {
this.clusterKey = clusterKey;
return this;
@ -64,7 +100,10 @@ public class ReplicationPeerConfig {
/**
* Sets the ReplicationEndpoint plugin class for this peer.
* @param replicationEndpointImpl a class implementing ReplicationEndpoint
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setReplicationEndpointImpl(String)} instead.
*/
@Deprecated
public ReplicationPeerConfig setReplicationEndpointImpl(String replicationEndpointImpl) {
this.replicationEndpointImpl = replicationEndpointImpl;
return this;
@ -90,6 +129,11 @@ public class ReplicationPeerConfig {
return (Map<TableName, List<String>>) tableCFsMap;
}
/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setTableCFsMap(Map)} instead.
*/
@Deprecated
public ReplicationPeerConfig setTableCFsMap(Map<TableName,
? extends Collection<String>> tableCFsMap) {
this.tableCFsMap = tableCFsMap;
@ -100,6 +144,11 @@ public class ReplicationPeerConfig {
return this.namespaces;
}
/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setNamespaces(Set)} instead.
*/
@Deprecated
public ReplicationPeerConfig setNamespaces(Set<String> namespaces) {
this.namespaces = namespaces;
return this;
@ -109,6 +158,11 @@ public class ReplicationPeerConfig {
return this.bandwidth;
}
/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setBandwidth(long)} instead.
*/
@Deprecated
public ReplicationPeerConfig setBandwidth(long bandwidth) {
this.bandwidth = bandwidth;
return this;
@ -118,6 +172,11 @@ public class ReplicationPeerConfig {
return this.replicateAllUserTables;
}
/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setReplicateAllUserTables(boolean)} instead.
*/
@Deprecated
public ReplicationPeerConfig setReplicateAllUserTables(boolean replicateAllUserTables) {
this.replicateAllUserTables = replicateAllUserTables;
return this;
@ -127,6 +186,11 @@ public class ReplicationPeerConfig {
return (Map<TableName, List<String>>) excludeTableCFsMap;
}
/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setExcludeTableCFsMap(Map)} instead.
*/
@Deprecated
public ReplicationPeerConfig setExcludeTableCFsMap(Map<TableName,
? extends Collection<String>> tableCFsMap) {
this.excludeTableCFsMap = tableCFsMap;
@ -137,11 +201,124 @@ public class ReplicationPeerConfig {
return this.excludeNamespaces;
}
/**
* @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use
* {@link ReplicationPeerConfigBuilder#setExcludeNamespaces(Set)} instead.
*/
@Deprecated
public ReplicationPeerConfig setExcludeNamespaces(Set<String> namespaces) {
this.excludeNamespaces = namespaces;
return this;
}
public static ReplicationPeerConfigBuilder newBuilder() {
return new ReplicationPeerConfigBuilderImpl();
}
public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) {
ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
builder.setClusterKey(peerConfig.getClusterKey())
.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
.setPeerData(peerConfig.getPeerData()).setConfiguration(peerConfig.getConfiguration())
.setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
.setReplicateAllUserTables(peerConfig.replicateAllUserTables())
.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
.setExcludeNamespaces(peerConfig.getExcludeNamespaces())
.setBandwidth(peerConfig.getBandwidth());
return builder;
}
static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBuilder {
private String clusterKey;
private String replicationEndpointImpl;
private Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private Map<String, String> configuration = new HashMap<>();
private Map<TableName, List<String>> tableCFsMap = null;
private Set<String> namespaces = null;
// Default value is true, means replicate all user tables to peer cluster.
private boolean replicateAllUserTables = true;
private Map<TableName, List<String>> excludeTableCFsMap = null;
private Set<String> excludeNamespaces = null;
private long bandwidth = 0;
@Override
public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
this.clusterKey = clusterKey;
return this;
}
@Override
public ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl) {
this.replicationEndpointImpl = replicationEndpointImpl;
return this;
}
@Override
public ReplicationPeerConfigBuilder setPeerData(Map<byte[], byte[]> peerData) {
this.peerData = peerData;
return this;
}
@Override
public ReplicationPeerConfigBuilder setConfiguration(Map<String, String> configuration) {
this.configuration = configuration;
return this;
}
@Override
public ReplicationPeerConfigBuilder
setTableCFsMap(Map<TableName, List<String>> tableCFsMap) {
this.tableCFsMap = tableCFsMap;
return this;
}
@Override
public ReplicationPeerConfigBuilder setNamespaces(Set<String> namespaces) {
this.namespaces = namespaces;
return this;
}
@Override
public ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateAllUserTables) {
this.replicateAllUserTables = replicateAllUserTables;
return this;
}
@Override
public ReplicationPeerConfigBuilder
setExcludeTableCFsMap(Map<TableName, List<String>> excludeTableCFsMap) {
this.excludeTableCFsMap = excludeTableCFsMap;
return this;
}
@Override
public ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> excludeNamespaces) {
this.excludeNamespaces = excludeNamespaces;
return this;
}
@Override
public ReplicationPeerConfigBuilder setBandwidth(long bandwidth) {
this.bandwidth = bandwidth;
return this;
}
@Override
public ReplicationPeerConfig build() {
return new ReplicationPeerConfig(this);
}
}
@Override
public String toString() {
StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(",");

View File

@ -0,0 +1,64 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
/**
* For creating {@link ReplicationPeerConfig}.
*/
@InterfaceAudience.Public
public interface ReplicationPeerConfigBuilder {
/**
* Set the clusterKey which is the concatenation of the slave cluster's:
* hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
*/
ReplicationPeerConfigBuilder setClusterKey(String clusterKey);
/**
* Sets the ReplicationEndpoint plugin class for this peer.
* @param replicationEndpointImpl a class implementing ReplicationEndpoint
*/
ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl);
ReplicationPeerConfigBuilder setPeerData(Map<byte[], byte[]> peerData);
ReplicationPeerConfigBuilder setConfiguration(Map<String, String> configuration);
ReplicationPeerConfigBuilder
setTableCFsMap(Map<TableName, List<String>> tableCFsMap);
ReplicationPeerConfigBuilder setNamespaces(Set<String> namespaces);
ReplicationPeerConfigBuilder setBandwidth(long bandwidth);
ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateAllUserTables);
ReplicationPeerConfigBuilder setExcludeTableCFsMap(Map<TableName, List<String>> tableCFsMap);
ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> namespaces);
ReplicationPeerConfig build();
}

View File

@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -363,18 +365,19 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
//Update existingConfig's peer config and peer data with the new values, but don't touch config
// or data that weren't explicitly changed
existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
existingConfig.getPeerData().putAll(newConfig.getPeerData());
existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
existingConfig.setNamespaces(newConfig.getNamespaces());
existingConfig.setBandwidth(newConfig.getBandwidth());
existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables());
existingConfig.setExcludeNamespaces(newConfig.getExcludeNamespaces());
existingConfig.setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap());
ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(newConfig);
Map<byte[], byte[]> peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
existingConfig.getPeerData().forEach(peerData::put);
newConfig.getPeerData().forEach(peerData::put);
builder.setPeerData(peerData);
Map<String, String> configuration = new HashMap<>();
existingConfig.getConfiguration().forEach(configuration::put);
newConfig.getConfiguration().forEach(configuration::put);
builder.setConfiguration(configuration);
try {
ZKUtil.setData(this.zookeeper, getPeerNode(id),
ReplicationPeerConfigUtil.toByteArray(existingConfig));
ReplicationPeerConfigUtil.toByteArray(builder.build()));
}
catch(KeeperException ke){
throw new ReplicationException("There was a problem trying to save changes to the " +

View File

@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@ -68,6 +69,20 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@After
public void cleanupPeer() {
try {
admin.removeReplicationPeer(ID_ONE).join();
} catch (Exception e) {
LOG.debug("Replication peer " + ID_ONE + " may already be removed");
}
try {
admin.removeReplicationPeer(ID_SECOND).join();
} catch (Exception e) {
LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
}
}
@Test
public void testAddRemovePeer() throws Exception {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
@ -350,7 +365,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
// update peer config only contains ns1
rpc = admin.getReplicationPeerConfig(ID_ONE).get();
namespaces.clear();
namespaces = new HashSet<>();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
admin.updateReplicationPeerConfig(ID_ONE, rpc).join();

View File

@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@ -99,6 +100,20 @@ public class TestReplicationAdmin {
TEST_UTIL.shutdownMiniCluster();
}
@After
public void cleanupPeer() {
try {
hbaseAdmin.removeReplicationPeer(ID_ONE);
} catch (Exception e) {
LOG.debug("Replication peer " + ID_ONE + " may already be removed");
}
try {
hbaseAdmin.removeReplicationPeer(ID_SECOND);
} catch (Exception e) {
LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
}
}
/**
* Simple testing of adding and removing peers, basically shows that
* all interactions with ZK work
@ -449,7 +464,7 @@ public class TestReplicationAdmin {
assertTrue(namespaces.contains(ns2));
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
namespaces.clear();
namespaces = new HashSet<>();
namespaces.add(ns1);
rpc.setNamespaces(namespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);
@ -505,7 +520,7 @@ public class TestReplicationAdmin {
assertTrue(namespaces.contains(ns2));
rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE);
namespaces.clear();
namespaces = new HashSet<String>();
namespaces.add(ns1);
rpc.setExcludeNamespaces(namespaces);
hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);