HBASE-19590 Remove the duplicate code in deprecated ReplicationAdmin
This commit is contained in:
parent
11ea19a101
commit
3a210d514c
|
@ -23,10 +23,8 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
@ -44,7 +42,6 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
|
@ -131,8 +128,6 @@ public class ReplicationAdmin implements Closeable {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException,
|
public void addPeer(String id, ReplicationPeerConfig peerConfig) throws ReplicationException,
|
||||||
IOException {
|
IOException {
|
||||||
checkNamespacesAndTableCfsConfigConflict(peerConfig.getNamespaces(),
|
|
||||||
peerConfig.getTableCFsMap());
|
|
||||||
this.admin.addReplicationPeer(id, peerConfig);
|
this.admin.addReplicationPeer(id, peerConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -256,36 +251,7 @@ public class ReplicationAdmin implements Closeable {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
|
public void appendPeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
|
||||||
throws ReplicationException, IOException {
|
throws ReplicationException, IOException {
|
||||||
if (tableCfs == null) {
|
this.admin.appendReplicationPeerTableCFs(id, tableCfs);
|
||||||
throw new ReplicationException("tableCfs is null");
|
|
||||||
}
|
|
||||||
ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id);
|
|
||||||
Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
|
|
||||||
if (preTableCfs == null) {
|
|
||||||
setPeerTableCFs(id, tableCfs);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
|
||||||
TableName table = entry.getKey();
|
|
||||||
Collection<String> appendCfs = entry.getValue();
|
|
||||||
if (preTableCfs.containsKey(table)) {
|
|
||||||
List<String> cfs = preTableCfs.get(table);
|
|
||||||
if (cfs == null || appendCfs == null || appendCfs.isEmpty()) {
|
|
||||||
preTableCfs.put(table, null);
|
|
||||||
} else {
|
|
||||||
Set<String> cfSet = new HashSet<>(cfs);
|
|
||||||
cfSet.addAll(appendCfs);
|
|
||||||
preTableCfs.put(table, Lists.newArrayList(cfSet));
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
if (appendCfs == null || appendCfs.isEmpty()) {
|
|
||||||
preTableCfs.put(table, null);
|
|
||||||
} else {
|
|
||||||
preTableCfs.put(table, Lists.newArrayList(appendCfs));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
updatePeerConfig(id, peerConfig);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -313,42 +279,7 @@ public class ReplicationAdmin implements Closeable {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
|
public void removePeerTableCFs(String id, Map<TableName, ? extends Collection<String>> tableCfs)
|
||||||
throws ReplicationException, IOException {
|
throws ReplicationException, IOException {
|
||||||
if (tableCfs == null) {
|
this.admin.removeReplicationPeerTableCFs(id, tableCfs);
|
||||||
throw new ReplicationException("tableCfs is null");
|
|
||||||
}
|
|
||||||
ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id);
|
|
||||||
Map<TableName, List<String>> preTableCfs = peerConfig.getTableCFsMap();
|
|
||||||
if (preTableCfs == null) {
|
|
||||||
throw new ReplicationException("Table-Cfs for peer" + id + " is null");
|
|
||||||
}
|
|
||||||
for (Map.Entry<TableName, ? extends Collection<String>> entry: tableCfs.entrySet()) {
|
|
||||||
|
|
||||||
TableName table = entry.getKey();
|
|
||||||
Collection<String> removeCfs = entry.getValue();
|
|
||||||
if (preTableCfs.containsKey(table)) {
|
|
||||||
List<String> cfs = preTableCfs.get(table);
|
|
||||||
if (cfs == null && (removeCfs == null || removeCfs.isEmpty())) {
|
|
||||||
preTableCfs.remove(table);
|
|
||||||
} else if (cfs != null && (removeCfs != null && !removeCfs.isEmpty())) {
|
|
||||||
Set<String> cfSet = new HashSet<>(cfs);
|
|
||||||
cfSet.removeAll(removeCfs);
|
|
||||||
if (cfSet.isEmpty()) {
|
|
||||||
preTableCfs.remove(table);
|
|
||||||
} else {
|
|
||||||
preTableCfs.put(table, Lists.newArrayList(cfSet));
|
|
||||||
}
|
|
||||||
} else if (cfs == null && (removeCfs != null && !removeCfs.isEmpty())) {
|
|
||||||
throw new ReplicationException("Cannot remove cf of table: " + table
|
|
||||||
+ " which doesn't specify cfs from table-cfs config in peer: " + id);
|
|
||||||
} else if (cfs != null && (removeCfs == null || removeCfs.isEmpty())) {
|
|
||||||
throw new ReplicationException("Cannot remove table: " + table
|
|
||||||
+ " which has specified cfs from table-cfs config in peer: " + id);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
updatePeerConfig(id, peerConfig);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -455,34 +386,4 @@ public class ReplicationAdmin implements Closeable {
|
||||||
List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
|
List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
|
||||||
return admin.listReplicationPeers();
|
return admin.listReplicationPeers();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Set a namespace in the peer config means that all tables in this namespace
|
|
||||||
* will be replicated to the peer cluster.
|
|
||||||
*
|
|
||||||
* 1. If you already have set a namespace in the peer config, then you can't set any table
|
|
||||||
* of this namespace to the peer config.
|
|
||||||
* 2. If you already have set a table in the peer config, then you can't set this table's
|
|
||||||
* namespace to the peer config.
|
|
||||||
*
|
|
||||||
* @param namespaces
|
|
||||||
* @param tableCfs
|
|
||||||
* @throws ReplicationException
|
|
||||||
*/
|
|
||||||
private void checkNamespacesAndTableCfsConfigConflict(Set<String> namespaces,
|
|
||||||
Map<TableName, ? extends Collection<String>> tableCfs) throws ReplicationException {
|
|
||||||
if (namespaces == null || namespaces.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (tableCfs == null || tableCfs.isEmpty()) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
for (Map.Entry<TableName, ? extends Collection<String>> entry : tableCfs.entrySet()) {
|
|
||||||
TableName table = entry.getKey();
|
|
||||||
if (namespaces.contains(table.getNamespaceAsString())) {
|
|
||||||
throw new ReplicationException(
|
|
||||||
"Table-cfs config conflict with namespaces config in peer");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue