diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java index de022d35a3c..ff2722e3b33 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java @@ -2527,7 +2527,7 @@ public interface Admin extends Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ void appendReplicationPeerTableCFs(String id, - Map> tableCfs) + Map> tableCfs) throws ReplicationException, IOException; /** @@ -2538,7 +2538,7 @@ public interface Admin extends Abortable, Closeable { * @throws IOException if a remote or network exception occurs */ void removeReplicationPeerTableCFs(String id, - Map> tableCfs) + Map> tableCfs) throws ReplicationException, IOException; /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index c01d5fa1415..e9760334f29 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -597,7 +597,7 @@ public interface AsyncAdmin { * @param tableCfs A map from tableName to column family names */ CompletableFuture appendReplicationPeerTableCFs(String peerId, - Map> tableCfs); + Map> tableCfs); /** * Remove some table-cfs from config of the specified peer @@ -605,7 +605,7 @@ public interface AsyncAdmin { * @param tableCfs A map from tableName to column family names */ CompletableFuture removeReplicationPeerTableCFs(String peerId, - Map> tableCfs); + Map> tableCfs); /** * Return a list of replication peers. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 09fdeffca23..e60e42221e5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.client; import com.google.protobuf.RpcChannel; -import java.util.Collection; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -409,13 +408,13 @@ class AsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture appendReplicationPeerTableCFs(String peerId, - Map> tableCfs) { + Map> tableCfs) { return wrap(rawAdmin.appendReplicationPeerTableCFs(peerId, tableCfs)); } @Override public CompletableFuture removeReplicationPeerTableCFs(String peerId, - Map> tableCfs) { + Map> tableCfs) { return wrap(rawAdmin.removeReplicationPeerTableCFs(peerId, tableCfs)); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index bbcc825a07e..af3916d2e3b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -26,7 +26,6 @@ import java.io.InterruptedIOException; import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; import java.util.Iterator; @@ -3926,26 +3925,28 @@ public class HBaseAdmin implements Admin { @Override public void appendReplicationPeerTableCFs(String id, - Map> tableCfs) throws ReplicationException, - IOException { + Map> tableCfs) + throws ReplicationException, IOException { if (tableCfs == null) { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); - updateReplicationPeerConfig(id, peerConfig); + ReplicationPeerConfig newPeerConfig = + ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + updateReplicationPeerConfig(id, newPeerConfig); } @Override public void removeReplicationPeerTableCFs(String id, - Map> tableCfs) throws ReplicationException, - IOException { + Map> tableCfs) + throws ReplicationException, IOException { if (tableCfs == null) { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); - updateReplicationPeerConfig(id, peerConfig); + ReplicationPeerConfig newPeerConfig = + ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); + updateReplicationPeerConfig(id, newPeerConfig); } @Override diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index 5e1c65445cc..dac83f3183b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -26,7 +26,6 @@ import com.google.protobuf.RpcChannel; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashMap; @@ -1593,7 +1592,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture appendReplicationPeerTableCFs(String id, - Map> tableCfs) { + Map> tableCfs) { if (tableCfs == null) { return failedFuture(new ReplicationException("tableCfs is null")); } @@ -1601,8 +1600,9 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { CompletableFuture future = new CompletableFuture(); getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { if (!completeExceptionally(future, error)) { - ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); - updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + ReplicationPeerConfig newPeerConfig = + ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> { if (!completeExceptionally(future, error)) { future.complete(result); } @@ -1614,7 +1614,7 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { @Override public CompletableFuture removeReplicationPeerTableCFs(String id, - Map> tableCfs) { + Map> tableCfs) { if (tableCfs == null) { return failedFuture(new ReplicationException("tableCfs is null")); } @@ -1623,14 +1623,15 @@ class RawAsyncHBaseAdmin implements AsyncAdmin { getReplicationPeerConfig(id).whenComplete( (peerConfig, error) -> { if (!completeExceptionally(future, error)) { + ReplicationPeerConfig newPeerConfig = null; try { - ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, - id); + newPeerConfig = ReplicationPeerConfigUtil + .removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); } catch (ReplicationException e) { future.completeExceptionally(e); return; } - updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + updateReplicationPeerConfig(id, newPeerConfig).whenComplete((result, err) -> { if (!completeExceptionally(future, error)) { future.complete(result); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 78d4fbbdf2c..ccfc4445e38 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.TreeMap; import java.util.regex.Pattern; +import org.apache.curator.shaded.com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ReplicationPeerNotFoundException; @@ -251,7 +252,7 @@ public class ReplicationAdmin implements Closeable { @Deprecated public void appendPeerTableCFs(String id, Map> tableCfs) throws ReplicationException, IOException { - this.admin.appendReplicationPeerTableCFs(id, tableCfs); + this.admin.appendReplicationPeerTableCFs(id, copyTableCFs(tableCfs)); } /** @@ -279,7 +280,17 @@ public class ReplicationAdmin implements Closeable { @Deprecated public void removePeerTableCFs(String id, Map> tableCfs) throws ReplicationException, IOException { - this.admin.removeReplicationPeerTableCFs(id, tableCfs); + this.admin.removeReplicationPeerTableCFs(id, copyTableCFs(tableCfs)); + } + + private Map> + copyTableCFs(Map> tableCfs) { + Map> newTableCfs = new HashMap<>(); + if (tableCfs != null) { + tableCfs.forEach( + (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null)); + } + return newTableCfs; } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index ec80eca0f57..a50d48fed1a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -26,6 +26,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; @@ -36,6 +37,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; @@ -260,63 +262,66 @@ public final class ReplicationPeerConfigUtil { return convert(peer); } else { if (bytes.length > 0) { - return new ReplicationPeerConfig().setClusterKey(Bytes.toString(bytes)); + return ReplicationPeerConfig.newBuilder().setClusterKey(Bytes.toString(bytes)).build(); } - return new ReplicationPeerConfig().setClusterKey(""); + return ReplicationPeerConfig.newBuilder().setClusterKey("").build(); } } public static ReplicationPeerConfig convert(ReplicationProtos.ReplicationPeer peer) { - ReplicationPeerConfig peerConfig = new ReplicationPeerConfig(); + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(); if (peer.hasClusterkey()) { - peerConfig.setClusterKey(peer.getClusterkey()); + builder.setClusterKey(peer.getClusterkey()); } if (peer.hasReplicationEndpointImpl()) { - peerConfig.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); + builder.setReplicationEndpointImpl(peer.getReplicationEndpointImpl()); } + Map peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR); for (HBaseProtos.BytesBytesPair pair : peer.getDataList()) { - peerConfig.getPeerData().put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); + peerData.put(pair.getFirst().toByteArray(), pair.getSecond().toByteArray()); } + builder.setPeerData(peerData); + Map configuration = new HashMap<>(); for (HBaseProtos.NameStringPair pair : peer.getConfigurationList()) { - peerConfig.getConfiguration().put(pair.getName(), pair.getValue()); + configuration.put(pair.getName(), pair.getValue()); } + builder.setConfiguration(configuration); - Map> tableCFsMap = convert2Map( + Map> tableCFsMap = convert2Map( peer.getTableCfsList().toArray(new ReplicationProtos.TableCF[peer.getTableCfsCount()])); if (tableCFsMap != null) { - peerConfig.setTableCFsMap(tableCFsMap); + builder.setTableCFsMap(tableCFsMap); } List namespacesList = peer.getNamespacesList(); if (namespacesList != null && namespacesList.size() != 0) { - peerConfig.setNamespaces( + builder.setNamespaces( namespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); } if (peer.hasBandwidth()) { - peerConfig.setBandwidth(peer.getBandwidth()); + builder.setBandwidth(peer.getBandwidth()); } if (peer.hasReplicateAll()) { - peerConfig.setReplicateAllUserTables(peer.getReplicateAll()); + builder.setReplicateAllUserTables(peer.getReplicateAll()); } - Map> excludeTableCFsMap = - convert2Map(peer.getExcludeTableCfsList() - .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); + Map> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList() + .toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()])); if (excludeTableCFsMap != null) { - peerConfig.setExcludeTableCFsMap(excludeTableCFsMap); + builder.setExcludeTableCFsMap(excludeTableCFsMap); } List excludeNamespacesList = peer.getExcludeNamespacesList(); if (excludeNamespacesList != null && excludeNamespacesList.size() != 0) { - peerConfig.setExcludeNamespaces( + builder.setExcludeNamespaces( excludeNamespacesList.stream().map(ByteString::toStringUtf8).collect(Collectors.toSet())); } - return peerConfig; + return builder.build(); } public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { @@ -408,56 +413,69 @@ public final class ReplicationPeerConfigUtil { return builder.build(); } - public static void appendTableCFsToReplicationPeerConfig( - Map> tableCfs, ReplicationPeerConfig peerConfig) { + public static ReplicationPeerConfig appendTableCFsToReplicationPeerConfig( + Map> tableCfs, ReplicationPeerConfig peerConfig) { + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); Map> preTableCfs = peerConfig.getTableCFsMap(); if (preTableCfs == null) { - peerConfig.setTableCFsMap(tableCfs); + builder.setTableCFsMap(tableCfs); } else { + Map> newTableCfs = copyTableCFsMap(preTableCfs); for (Map.Entry> entry : tableCfs.entrySet()) { TableName table = entry.getKey(); Collection appendCfs = entry.getValue(); - if (preTableCfs.containsKey(table)) { - List cfs = preTableCfs.get(table); + if (newTableCfs.containsKey(table)) { + List cfs = newTableCfs.get(table); if (cfs == null || appendCfs == null || appendCfs.isEmpty()) { - preTableCfs.put(table, null); + newTableCfs.put(table, null); } else { Set cfSet = new HashSet(cfs); cfSet.addAll(appendCfs); - preTableCfs.put(table, Lists.newArrayList(cfSet)); + newTableCfs.put(table, Lists.newArrayList(cfSet)); } } else { if (appendCfs == null || appendCfs.isEmpty()) { - preTableCfs.put(table, null); + newTableCfs.put(table, null); } else { - preTableCfs.put(table, Lists.newArrayList(appendCfs)); + newTableCfs.put(table, Lists.newArrayList(appendCfs)); } } } + builder.setTableCFsMap(newTableCfs); } + return builder.build(); } - public static void removeTableCFsFromReplicationPeerConfig( - Map> tableCfs, ReplicationPeerConfig peerConfig, + private static Map> + copyTableCFsMap(Map> preTableCfs) { + Map> newTableCfs = new HashMap<>(); + preTableCfs.forEach( + (table, cfs) -> newTableCfs.put(table, cfs != null ? Lists.newArrayList(cfs) : null)); + return newTableCfs; + } + + public static ReplicationPeerConfig removeTableCFsFromReplicationPeerConfig( + Map> tableCfs, ReplicationPeerConfig peerConfig, String id) throws ReplicationException { Map> preTableCfs = peerConfig.getTableCFsMap(); if (preTableCfs == null) { throw new ReplicationException("Table-Cfs for peer: " + id + " is null"); } + Map> newTableCfs = copyTableCFsMap(preTableCfs); for (Map.Entry> entry : tableCfs.entrySet()) { TableName table = entry.getKey(); Collection removeCfs = entry.getValue(); - if (preTableCfs.containsKey(table)) { - List cfs = preTableCfs.get(table); + if (newTableCfs.containsKey(table)) { + List cfs = newTableCfs.get(table); if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) { - preTableCfs.remove(table); + newTableCfs.remove(table); } else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) { Set cfSet = new HashSet(cfs); cfSet.removeAll(removeCfs); if (cfSet.isEmpty()) { - preTableCfs.remove(table); + newTableCfs.remove(table); } else { - preTableCfs.put(table, Lists.newArrayList(cfSet)); + newTableCfs.put(table, Lists.newArrayList(cfSet)); } } else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) { throw new ReplicationException("Cannot remove cf of table: " + table @@ -467,10 +485,13 @@ public final class ReplicationPeerConfigUtil { + " which has specified cfs from table-cfs config in peer: " + id); } } else { - throw new ReplicationException("No table: " - + table + " in table-cfs config of peer: " + id); + throw new ReplicationException( + "No table: " + table + " in table-cfs config of peer: " + id); } } + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(peerConfig); + builder.setTableCFsMap(newTableCfs); + return builder.build(); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index 52a5fe9761b..8f6b93823fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,12 +42,44 @@ public class ReplicationPeerConfig { private final Map configuration; private Map> tableCFsMap = null; private Set namespaces = null; - private long bandwidth = 0; // Default value is true, means replicate all user tables to peer cluster. private boolean replicateAllUserTables = true; private Map> excludeTableCFsMap = null; private Set excludeNamespaces = null; + private long bandwidth = 0; + private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) { + this.clusterKey = builder.clusterKey; + this.replicationEndpointImpl = builder.replicationEndpointImpl; + this.peerData = Collections.unmodifiableMap(builder.peerData); + this.configuration = Collections.unmodifiableMap(builder.configuration); + this.tableCFsMap = + builder.tableCFsMap != null ? unmodifiableTableCFsMap(builder.tableCFsMap) : null; + this.namespaces = + builder.namespaces != null ? Collections.unmodifiableSet(builder.namespaces) : null; + this.replicateAllUserTables = builder.replicateAllUserTables; + this.excludeTableCFsMap = + builder.excludeTableCFsMap != null ? unmodifiableTableCFsMap(builder.excludeTableCFsMap) + : null; + this.excludeNamespaces = + builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces) + : null; + this.bandwidth = builder.bandwidth; + } + + private Map> + unmodifiableTableCFsMap(Map> tableCFsMap) { + Map> newTableCFsMap = new HashMap<>(); + tableCFsMap.forEach((table, cfs) -> newTableCFsMap.put(table, + cfs != null ? Collections.unmodifiableList(cfs) : null)); + return Collections.unmodifiableMap(newTableCFsMap); + } + + /** + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use + * {@link ReplicationPeerConfigBuilder} to create new ReplicationPeerConfig. + */ + @Deprecated public ReplicationPeerConfig() { this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR); this.configuration = new HashMap<>(0); @@ -54,8 +87,11 @@ public class ReplicationPeerConfig { /** * Set the clusterKey which is the concatenation of the slave cluster's: - * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent + * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use + * {@link ReplicationPeerConfigBuilder#setClusterKey(String)} instead. */ + @Deprecated public ReplicationPeerConfig setClusterKey(String clusterKey) { this.clusterKey = clusterKey; return this; @@ -64,7 +100,10 @@ public class ReplicationPeerConfig { /** * Sets the ReplicationEndpoint plugin class for this peer. * @param replicationEndpointImpl a class implementing ReplicationEndpoint + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use + * {@link ReplicationPeerConfigBuilder#setReplicationEndpointImpl(String)} instead. */ + @Deprecated public ReplicationPeerConfig setReplicationEndpointImpl(String replicationEndpointImpl) { this.replicationEndpointImpl = replicationEndpointImpl; return this; @@ -90,6 +129,11 @@ public class ReplicationPeerConfig { return (Map>) tableCFsMap; } + /** + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use + * {@link ReplicationPeerConfigBuilder#setTableCFsMap(Map)} instead. + */ + @Deprecated public ReplicationPeerConfig setTableCFsMap(Map> tableCFsMap) { this.tableCFsMap = tableCFsMap; @@ -100,6 +144,11 @@ public class ReplicationPeerConfig { return this.namespaces; } + /** + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use + * {@link ReplicationPeerConfigBuilder#setNamespaces(Set)} instead. + */ + @Deprecated public ReplicationPeerConfig setNamespaces(Set namespaces) { this.namespaces = namespaces; return this; @@ -109,6 +158,11 @@ public class ReplicationPeerConfig { return this.bandwidth; } + /** + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use + * {@link ReplicationPeerConfigBuilder#setBandwidth(long)} instead. + */ + @Deprecated public ReplicationPeerConfig setBandwidth(long bandwidth) { this.bandwidth = bandwidth; return this; @@ -118,6 +172,11 @@ public class ReplicationPeerConfig { return this.replicateAllUserTables; } + /** + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use + * {@link ReplicationPeerConfigBuilder#setReplicateAllUserTables(boolean)} instead. + */ + @Deprecated public ReplicationPeerConfig setReplicateAllUserTables(boolean replicateAllUserTables) { this.replicateAllUserTables = replicateAllUserTables; return this; @@ -127,6 +186,11 @@ public class ReplicationPeerConfig { return (Map>) excludeTableCFsMap; } + /** + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use + * {@link ReplicationPeerConfigBuilder#setExcludeTableCFsMap(Map)} instead. + */ + @Deprecated public ReplicationPeerConfig setExcludeTableCFsMap(Map> tableCFsMap) { this.excludeTableCFsMap = tableCFsMap; @@ -137,11 +201,124 @@ public class ReplicationPeerConfig { return this.excludeNamespaces; } + /** + * @deprecated as release of 2.0.0, and it will be removed in 3.0.0. Use + * {@link ReplicationPeerConfigBuilder#setExcludeNamespaces(Set)} instead. + */ + @Deprecated public ReplicationPeerConfig setExcludeNamespaces(Set namespaces) { this.excludeNamespaces = namespaces; return this; } + public static ReplicationPeerConfigBuilder newBuilder() { + return new ReplicationPeerConfigBuilderImpl(); + } + + public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) { + ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl(); + builder.setClusterKey(peerConfig.getClusterKey()) + .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl()) + .setPeerData(peerConfig.getPeerData()).setConfiguration(peerConfig.getConfiguration()) + .setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces()) + .setReplicateAllUserTables(peerConfig.replicateAllUserTables()) + .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap()) + .setExcludeNamespaces(peerConfig.getExcludeNamespaces()) + .setBandwidth(peerConfig.getBandwidth()); + return builder; + } + + static class ReplicationPeerConfigBuilderImpl implements ReplicationPeerConfigBuilder { + + private String clusterKey; + + private String replicationEndpointImpl; + + private Map peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR); + + private Map configuration = new HashMap<>(); + + private Map> tableCFsMap = null; + + private Set namespaces = null; + + // Default value is true, means replicate all user tables to peer cluster. + private boolean replicateAllUserTables = true; + + private Map> excludeTableCFsMap = null; + + private Set excludeNamespaces = null; + + private long bandwidth = 0; + + @Override + public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) { + this.clusterKey = clusterKey; + return this; + } + + @Override + public ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl) { + this.replicationEndpointImpl = replicationEndpointImpl; + return this; + } + + @Override + public ReplicationPeerConfigBuilder setPeerData(Map peerData) { + this.peerData = peerData; + return this; + } + + @Override + public ReplicationPeerConfigBuilder setConfiguration(Map configuration) { + this.configuration = configuration; + return this; + } + + @Override + public ReplicationPeerConfigBuilder + setTableCFsMap(Map> tableCFsMap) { + this.tableCFsMap = tableCFsMap; + return this; + } + + @Override + public ReplicationPeerConfigBuilder setNamespaces(Set namespaces) { + this.namespaces = namespaces; + return this; + } + + @Override + public ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateAllUserTables) { + this.replicateAllUserTables = replicateAllUserTables; + return this; + } + + @Override + public ReplicationPeerConfigBuilder + setExcludeTableCFsMap(Map> excludeTableCFsMap) { + this.excludeTableCFsMap = excludeTableCFsMap; + return this; + } + + @Override + public ReplicationPeerConfigBuilder setExcludeNamespaces(Set excludeNamespaces) { + this.excludeNamespaces = excludeNamespaces; + return this; + } + + @Override + public ReplicationPeerConfigBuilder setBandwidth(long bandwidth) { + this.bandwidth = bandwidth; + return this; + } + + @Override + public ReplicationPeerConfig build() { + return new ReplicationPeerConfig(this); + } + } + @Override public String toString() { StringBuilder builder = new StringBuilder("clusterKey=").append(clusterKey).append(","); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java new file mode 100644 index 00000000000..b642acf73f9 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.replication; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.hbase.TableName; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * For creating {@link ReplicationPeerConfig}. + */ +@InterfaceAudience.Public +public interface ReplicationPeerConfigBuilder { + + /** + * Set the clusterKey which is the concatenation of the slave cluster's: + * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent + */ + ReplicationPeerConfigBuilder setClusterKey(String clusterKey); + + /** + * Sets the ReplicationEndpoint plugin class for this peer. + * @param replicationEndpointImpl a class implementing ReplicationEndpoint + */ + ReplicationPeerConfigBuilder setReplicationEndpointImpl(String replicationEndpointImpl); + + ReplicationPeerConfigBuilder setPeerData(Map peerData); + + ReplicationPeerConfigBuilder setConfiguration(Map configuration); + + ReplicationPeerConfigBuilder + setTableCFsMap(Map> tableCFsMap); + + ReplicationPeerConfigBuilder setNamespaces(Set namespaces); + + ReplicationPeerConfigBuilder setBandwidth(long bandwidth); + + ReplicationPeerConfigBuilder setReplicateAllUserTables(boolean replicateAllUserTables); + + ReplicationPeerConfigBuilder setExcludeTableCFsMap(Map> tableCFsMap); + + ReplicationPeerConfigBuilder setExcludeNamespaces(Set namespaces); + + ReplicationPeerConfig build(); +} \ No newline at end of file diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 8f5e8d57e91..ff6c07b12e9 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.replication; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -37,6 +38,7 @@ import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKConfig; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -363,18 +365,19 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } //Update existingConfig's peer config and peer data with the new values, but don't touch config // or data that weren't explicitly changed - existingConfig.getConfiguration().putAll(newConfig.getConfiguration()); - existingConfig.getPeerData().putAll(newConfig.getPeerData()); - existingConfig.setTableCFsMap(newConfig.getTableCFsMap()); - existingConfig.setNamespaces(newConfig.getNamespaces()); - existingConfig.setBandwidth(newConfig.getBandwidth()); - existingConfig.setReplicateAllUserTables(newConfig.replicateAllUserTables()); - existingConfig.setExcludeNamespaces(newConfig.getExcludeNamespaces()); - existingConfig.setExcludeTableCFsMap(newConfig.getExcludeTableCFsMap()); + ReplicationPeerConfigBuilder builder = ReplicationPeerConfig.newBuilder(newConfig); + Map peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR); + existingConfig.getPeerData().forEach(peerData::put); + newConfig.getPeerData().forEach(peerData::put); + builder.setPeerData(peerData); + Map configuration = new HashMap<>(); + existingConfig.getConfiguration().forEach(configuration::put); + newConfig.getConfiguration().forEach(configuration::put); + builder.setConfiguration(configuration); try { ZKUtil.setData(this.zookeeper, getPeerNode(id), - ReplicationPeerConfigUtil.toByteArray(existingConfig)); + ReplicationPeerConfigUtil.toByteArray(builder.build())); } catch(KeeperException ke){ throw new ReplicationException("There was a problem trying to save changes to the " + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index 6591826f5d6..28a7562d054 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.After; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -68,6 +69,20 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); } + @After + public void cleanupPeer() { + try { + admin.removeReplicationPeer(ID_ONE).join(); + } catch (Exception e) { + LOG.debug("Replication peer " + ID_ONE + " may already be removed"); + } + try { + admin.removeReplicationPeer(ID_SECOND).join(); + } catch (Exception e) { + LOG.debug("Replication peer " + ID_SECOND + " may already be removed"); + } + } + @Test public void testAddRemovePeer() throws Exception { ReplicationPeerConfig rpc1 = new ReplicationPeerConfig(); @@ -350,7 +365,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase { // update peer config only contains ns1 rpc = admin.getReplicationPeerConfig(ID_ONE).get(); - namespaces.clear(); + namespaces = new HashSet<>(); namespaces.add(ns1); rpc.setNamespaces(namespaces); admin.updateReplicationPeerConfig(ID_ONE, rpc).join(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index 87702992627..dd33564e0fe 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.junit.After; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Rule; @@ -99,6 +100,20 @@ public class TestReplicationAdmin { TEST_UTIL.shutdownMiniCluster(); } + @After + public void cleanupPeer() { + try { + hbaseAdmin.removeReplicationPeer(ID_ONE); + } catch (Exception e) { + LOG.debug("Replication peer " + ID_ONE + " may already be removed"); + } + try { + hbaseAdmin.removeReplicationPeer(ID_SECOND); + } catch (Exception e) { + LOG.debug("Replication peer " + ID_SECOND + " may already be removed"); + } + } + /** * Simple testing of adding and removing peers, basically shows that * all interactions with ZK work @@ -449,7 +464,7 @@ public class TestReplicationAdmin { assertTrue(namespaces.contains(ns2)); rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - namespaces.clear(); + namespaces = new HashSet<>(); namespaces.add(ns1); rpc.setNamespaces(namespaces); hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc); @@ -505,7 +520,7 @@ public class TestReplicationAdmin { assertTrue(namespaces.contains(ns2)); rpc = hbaseAdmin.getReplicationPeerConfig(ID_ONE); - namespaces.clear(); + namespaces = new HashSet(); namespaces.add(ns1); rpc.setExcludeNamespaces(namespaces); hbaseAdmin.updateReplicationPeerConfig(ID_ONE, rpc);