HBASE-19009 implement modifyTable and enable/disableTableReplication for AsyncAdmin
This commit is contained in:
parent
fb79e9d4a7
commit
d885e2232d
|
@ -141,6 +141,12 @@ public interface AsyncAdmin {
|
||||||
*/
|
*/
|
||||||
CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys);
|
CompletableFuture<Void> createTable(TableDescriptor desc, byte[][] splitKeys);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Modify an existing table, more IRB friendly version.
|
||||||
|
* @param desc modified description of the table
|
||||||
|
*/
|
||||||
|
CompletableFuture<Void> modifyTable(TableDescriptor desc);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Deletes a table.
|
* Deletes a table.
|
||||||
* @param tableName name of table to delete
|
* @param tableName name of table to delete
|
||||||
|
@ -552,6 +558,18 @@ public interface AsyncAdmin {
|
||||||
*/
|
*/
|
||||||
CompletableFuture<List<TableCFs>> listReplicatedTableCFs();
|
CompletableFuture<List<TableCFs>> listReplicatedTableCFs();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Enable a table's replication switch.
|
||||||
|
* @param tableName name of the table
|
||||||
|
*/
|
||||||
|
CompletableFuture<Void> enableTableReplication(TableName tableName);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Disable a table's replication switch.
|
||||||
|
* @param tableName name of the table
|
||||||
|
*/
|
||||||
|
CompletableFuture<Void> disableTableReplication(TableName tableName);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
|
* Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be
|
||||||
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique
|
* taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique
|
||||||
|
|
|
@ -127,6 +127,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
||||||
return wrap(rawAdmin.createTable(desc, splitKeys));
|
return wrap(rawAdmin.createTable(desc, splitKeys));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
|
||||||
|
return wrap(rawAdmin.modifyTable(desc));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> deleteTable(TableName tableName) {
|
public CompletableFuture<Void> deleteTable(TableName tableName) {
|
||||||
return wrap(rawAdmin.deleteTable(tableName));
|
return wrap(rawAdmin.deleteTable(tableName));
|
||||||
|
@ -419,6 +424,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin {
|
||||||
return wrap(rawAdmin.listReplicatedTableCFs());
|
return wrap(rawAdmin.listReplicatedTableCFs());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> enableTableReplication(TableName tableName) {
|
||||||
|
return wrap(rawAdmin.enableTableReplication(tableName));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> disableTableReplication(TableName tableName) {
|
||||||
|
return wrap(rawAdmin.disableTableReplication(tableName));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
|
public CompletableFuture<Void> snapshot(SnapshotDescription snapshot) {
|
||||||
return wrap(rawAdmin.snapshot(snapshot));
|
return wrap(rawAdmin.snapshot(snapshot));
|
||||||
|
|
|
@ -18,7 +18,9 @@
|
||||||
package org.apache.hadoop.hbase.client;
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.KeepDeletedCells;
|
import org.apache.hadoop.hbase.KeepDeletedCells;
|
||||||
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
import org.apache.hadoop.hbase.MemoryCompactionPolicy;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -54,6 +56,31 @@ public interface ColumnFamilyDescriptor {
|
||||||
return lhs.getConfiguration().hashCode() - rhs.getConfiguration().hashCode();
|
return lhs.getConfiguration().hashCode() - rhs.getConfiguration().hashCode();
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static final Bytes REPLICATION_SCOPE_BYTES = new Bytes(
|
||||||
|
Bytes.toBytes(ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE));
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
static final Comparator<ColumnFamilyDescriptor> COMPARATOR_IGNORE_REPLICATION = (
|
||||||
|
ColumnFamilyDescriptor lcf, ColumnFamilyDescriptor rcf) -> {
|
||||||
|
int result = Bytes.compareTo(lcf.getName(), rcf.getName());
|
||||||
|
if (result != 0) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
// ColumnFamilyDescriptor.getValues is a immutable map, so copy it and remove
|
||||||
|
// REPLICATION_SCOPE_BYTES
|
||||||
|
Map<Bytes, Bytes> lValues = new HashMap<>();
|
||||||
|
lValues.putAll(lcf.getValues());
|
||||||
|
lValues.remove(REPLICATION_SCOPE_BYTES);
|
||||||
|
Map<Bytes, Bytes> rValues = new HashMap<>();
|
||||||
|
rValues.putAll(rcf.getValues());
|
||||||
|
rValues.remove(REPLICATION_SCOPE_BYTES);
|
||||||
|
result = lValues.hashCode() - rValues.hashCode();
|
||||||
|
if (result != 0) {
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
return lcf.getConfiguration().hashCode() - rcf.getConfiguration().hashCode();
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return The storefile/hfile blocksize for this column family.
|
* @return The storefile/hfile blocksize for this column family.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -30,7 +30,6 @@ import java.util.Arrays;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -45,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -54,10 +54,8 @@ import org.apache.hadoop.hbase.CacheEvictionStats;
|
||||||
import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
|
import org.apache.hadoop.hbase.CacheEvictionStatsBuilder;
|
||||||
import org.apache.hadoop.hbase.ClusterStatus;
|
import org.apache.hadoop.hbase.ClusterStatus;
|
||||||
import org.apache.hadoop.hbase.ClusterStatus.Option;
|
import org.apache.hadoop.hbase.ClusterStatus.Option;
|
||||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.HRegionInfo;
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
import org.apache.hadoop.hbase.HRegionLocation;
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
@ -76,7 +74,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||||
import org.apache.hadoop.hbase.TableNotFoundException;
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||||
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||||
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
||||||
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
|
||||||
|
@ -3893,7 +3891,7 @@ public class HBaseAdmin implements Admin {
|
||||||
protected ReplicationPeerConfig rpcCall() throws Exception {
|
protected ReplicationPeerConfig rpcCall() throws Exception {
|
||||||
GetReplicationPeerConfigResponse response = master.getReplicationPeerConfig(
|
GetReplicationPeerConfigResponse response = master.getReplicationPeerConfig(
|
||||||
getRpcController(), RequestConverter.buildGetReplicationPeerConfigRequest(peerId));
|
getRpcController(), RequestConverter.buildGetReplicationPeerConfigRequest(peerId));
|
||||||
return ReplicationSerDeHelper.convert(response.getPeerConfig());
|
return ReplicationPeerConfigUtil.convert(response.getPeerConfig());
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -3919,7 +3917,7 @@ public class HBaseAdmin implements Admin {
|
||||||
throw new ReplicationException("tableCfs is null");
|
throw new ReplicationException("tableCfs is null");
|
||||||
}
|
}
|
||||||
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
|
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
|
||||||
ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
|
ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
|
||||||
updateReplicationPeerConfig(id, peerConfig);
|
updateReplicationPeerConfig(id, peerConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3931,7 +3929,7 @@ public class HBaseAdmin implements Admin {
|
||||||
throw new ReplicationException("tableCfs is null");
|
throw new ReplicationException("tableCfs is null");
|
||||||
}
|
}
|
||||||
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
|
ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
|
||||||
ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
|
ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
|
||||||
updateReplicationPeerConfig(id, peerConfig);
|
updateReplicationPeerConfig(id, peerConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3957,7 +3955,7 @@ public class HBaseAdmin implements Admin {
|
||||||
.getPeerDescList();
|
.getPeerDescList();
|
||||||
List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size());
|
List<ReplicationPeerDescription> result = new ArrayList<>(peersList.size());
|
||||||
for (ReplicationProtos.ReplicationPeerDescription peer : peersList) {
|
for (ReplicationProtos.ReplicationPeerDescription peer : peersList) {
|
||||||
result.add(ReplicationSerDeHelper.toReplicationPeerDescription(peer));
|
result.add(ReplicationPeerConfigUtil.toReplicationPeerDescription(peer));
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -4010,19 +4008,18 @@ public class HBaseAdmin implements Admin {
|
||||||
@Override
|
@Override
|
||||||
public List<TableCFs> listReplicatedTableCFs() throws IOException {
|
public List<TableCFs> listReplicatedTableCFs() throws IOException {
|
||||||
List<TableCFs> replicatedTableCFs = new ArrayList<>();
|
List<TableCFs> replicatedTableCFs = new ArrayList<>();
|
||||||
HTableDescriptor[] tables = listTables();
|
List<TableDescriptor> tables = listTableDescriptors();
|
||||||
for (HTableDescriptor table : tables) {
|
tables.forEach(table -> {
|
||||||
HColumnDescriptor[] columns = table.getColumnFamilies();
|
|
||||||
Map<String, Integer> cfs = new HashMap<>();
|
Map<String, Integer> cfs = new HashMap<>();
|
||||||
for (HColumnDescriptor column : columns) {
|
Stream.of(table.getColumnFamilies())
|
||||||
if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) {
|
.filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL)
|
||||||
|
.forEach(column -> {
|
||||||
cfs.put(column.getNameAsString(), column.getScope());
|
cfs.put(column.getNameAsString(), column.getScope());
|
||||||
}
|
});
|
||||||
}
|
|
||||||
if (!cfs.isEmpty()) {
|
if (!cfs.isEmpty()) {
|
||||||
replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
|
replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs));
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
return replicatedTableCFs;
|
return replicatedTableCFs;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4046,83 +4043,12 @@ public class HBaseAdmin implements Admin {
|
||||||
throw new IllegalArgumentException("Table name is null");
|
throw new IllegalArgumentException("Table name is null");
|
||||||
}
|
}
|
||||||
if (!tableExists(tableName)) {
|
if (!tableExists(tableName)) {
|
||||||
throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString()
|
throw new TableNotFoundException("Table '" + tableName.getNameAsString()
|
||||||
+ "' does not exists.");
|
+ "' does not exists.");
|
||||||
}
|
}
|
||||||
setTableRep(tableName, false);
|
setTableRep(tableName, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method
|
|
||||||
* ensures that the name of table and column-families should match.
|
|
||||||
* @param peerHtd descriptor on peer cluster
|
|
||||||
* @param localHtd - The HTableDescriptor of table from source cluster.
|
|
||||||
* @return true If the name of table and column families match and REPLICATION_SCOPE copied
|
|
||||||
* successfully. false If there is any mismatch in the names.
|
|
||||||
*/
|
|
||||||
private boolean copyReplicationScope(final HTableDescriptor peerHtd,
|
|
||||||
final HTableDescriptor localHtd) {
|
|
||||||
// Copy the REPLICATION_SCOPE only when table names and the names of
|
|
||||||
// Column-Families are same.
|
|
||||||
int result = peerHtd.getTableName().compareTo(localHtd.getTableName());
|
|
||||||
|
|
||||||
if (result == 0) {
|
|
||||||
Iterator<HColumnDescriptor> remoteHCDIter = peerHtd.getFamilies().iterator();
|
|
||||||
Iterator<HColumnDescriptor> localHCDIter = localHtd.getFamilies().iterator();
|
|
||||||
|
|
||||||
while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) {
|
|
||||||
HColumnDescriptor remoteHCD = remoteHCDIter.next();
|
|
||||||
HColumnDescriptor localHCD = localHCDIter.next();
|
|
||||||
|
|
||||||
String remoteHCDName = remoteHCD.getNameAsString();
|
|
||||||
String localHCDName = localHCD.getNameAsString();
|
|
||||||
|
|
||||||
if (remoteHCDName.equals(localHCDName)) {
|
|
||||||
remoteHCD.setScope(localHCD.getScope());
|
|
||||||
} else {
|
|
||||||
result = -1;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return result == 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Compare the contents of the descriptor with another one passed as a parameter for replication
|
|
||||||
* purpose. The REPLICATION_SCOPE field is ignored during comparison.
|
|
||||||
* @param peerHtd descriptor on peer cluster
|
|
||||||
* @param localHtd descriptor on source cluster which needs to be replicated.
|
|
||||||
* @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE).
|
|
||||||
* @see java.lang.Object#equals(java.lang.Object)
|
|
||||||
*/
|
|
||||||
private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) {
|
|
||||||
if (peerHtd == localHtd) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (peerHtd == null) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
boolean result = false;
|
|
||||||
|
|
||||||
// Create a copy of peer HTD as we need to change its replication
|
|
||||||
// scope to match with the local HTD.
|
|
||||||
HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd);
|
|
||||||
|
|
||||||
result = copyReplicationScope(peerHtdCopy, localHtd);
|
|
||||||
|
|
||||||
// If copy was successful, compare the two tables now.
|
|
||||||
if (result) {
|
|
||||||
result = (peerHtdCopy.compareTo(localHtd) == 0);
|
|
||||||
}
|
|
||||||
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect to peer and check the table descriptor on peer:
|
* Connect to peer and check the table descriptor on peer:
|
||||||
* <ol>
|
* <ol>
|
||||||
|
@ -4143,21 +4069,23 @@ public class HBaseAdmin implements Admin {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ReplicationPeerDescription peerDesc : peers) {
|
for (ReplicationPeerDescription peerDesc : peers) {
|
||||||
if (needToReplicate(tableName, peerDesc)) {
|
if (peerDesc.getPeerConfig().needToReplicate(tableName)) {
|
||||||
Configuration peerConf = getPeerClusterConfiguration(peerDesc);
|
Configuration peerConf =
|
||||||
|
ReplicationPeerConfigUtil.getPeerClusterConfiguration(this.conf, peerDesc);
|
||||||
try (Connection conn = ConnectionFactory.createConnection(peerConf);
|
try (Connection conn = ConnectionFactory.createConnection(peerConf);
|
||||||
Admin repHBaseAdmin = conn.getAdmin()) {
|
Admin repHBaseAdmin = conn.getAdmin()) {
|
||||||
HTableDescriptor localHtd = getTableDescriptor(tableName);
|
TableDescriptor tableDesc = getDescriptor(tableName);
|
||||||
HTableDescriptor peerHtd = null;
|
TableDescriptor peerTableDesc = null;
|
||||||
if (!repHBaseAdmin.tableExists(tableName)) {
|
if (!repHBaseAdmin.tableExists(tableName)) {
|
||||||
repHBaseAdmin.createTable(localHtd, splits);
|
repHBaseAdmin.createTable(tableDesc, splits);
|
||||||
} else {
|
} else {
|
||||||
peerHtd = repHBaseAdmin.getTableDescriptor(tableName);
|
peerTableDesc = repHBaseAdmin.getDescriptor(tableName);
|
||||||
if (peerHtd == null) {
|
if (peerTableDesc == null) {
|
||||||
throw new IllegalArgumentException("Failed to get table descriptor for table "
|
throw new IllegalArgumentException("Failed to get table descriptor for table "
|
||||||
+ tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId());
|
+ tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId());
|
||||||
}
|
}
|
||||||
if (!compareForReplication(peerHtd, localHtd)) {
|
if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc,
|
||||||
|
tableDesc) != 0) {
|
||||||
throw new IllegalArgumentException("Table " + tableName.getNameAsString()
|
throw new IllegalArgumentException("Table " + tableName.getNameAsString()
|
||||||
+ " exists in peer cluster " + peerDesc.getPeerId()
|
+ " exists in peer cluster " + peerDesc.getPeerId()
|
||||||
+ ", but the table descriptors are not same when compared with source cluster."
|
+ ", but the table descriptors are not same when compared with source cluster."
|
||||||
|
@ -4169,33 +4097,6 @@ public class HBaseAdmin implements Admin {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Decide whether the table need replicate to the peer cluster according to the peer config
|
|
||||||
* @param table name of the table
|
|
||||||
* @param peer config for the peer
|
|
||||||
* @return true if the table need replicate to the peer cluster
|
|
||||||
*/
|
|
||||||
private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) {
|
|
||||||
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
|
|
||||||
Set<String> namespaces = peerConfig.getNamespaces();
|
|
||||||
Map<TableName, List<String>> tableCFsMap = peerConfig.getTableCFsMap();
|
|
||||||
// If null means user has explicitly not configured any namespaces and table CFs
|
|
||||||
// so all the tables data are applicable for replication
|
|
||||||
if (namespaces == null && tableCFsMap == null) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
LOG.debug("Table " + table.getNameAsString()
|
|
||||||
+ " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey="
|
|
||||||
+ peerConfig.getClusterKey());
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the table's replication switch if the table's replication switch is already not set.
|
* Set the table's replication switch if the table's replication switch is already not set.
|
||||||
* @param tableName name of the table
|
* @param tableName name of the table
|
||||||
|
@ -4203,73 +4104,12 @@ public class HBaseAdmin implements Admin {
|
||||||
* @throws IOException if a remote or network exception occurs
|
* @throws IOException if a remote or network exception occurs
|
||||||
*/
|
*/
|
||||||
private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
|
private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
|
||||||
HTableDescriptor htd = new HTableDescriptor(getTableDescriptor(tableName));
|
TableDescriptor tableDesc = getDescriptor(tableName);
|
||||||
ReplicationState currentReplicationState = getTableReplicationState(htd);
|
if (!tableDesc.matchReplicationScope(enableRep)) {
|
||||||
if (enableRep && currentReplicationState != ReplicationState.ENABLED
|
int scope =
|
||||||
|| !enableRep && currentReplicationState != ReplicationState.DISABLED) {
|
enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
|
||||||
for (HColumnDescriptor hcd : htd.getFamilies()) {
|
modifyTable(TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build());
|
||||||
hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL
|
|
||||||
: HConstants.REPLICATION_SCOPE_LOCAL);
|
|
||||||
}
|
}
|
||||||
modifyTable(tableName, htd);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This enum indicates the current state of the replication for a given table.
|
|
||||||
*/
|
|
||||||
private enum ReplicationState {
|
|
||||||
ENABLED, // all column families enabled
|
|
||||||
MIXED, // some column families enabled, some disabled
|
|
||||||
DISABLED // all column families disabled
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param htd table descriptor details for the table to check
|
|
||||||
* @return ReplicationState the current state of the table.
|
|
||||||
*/
|
|
||||||
private ReplicationState getTableReplicationState(HTableDescriptor htd) {
|
|
||||||
boolean hasEnabled = false;
|
|
||||||
boolean hasDisabled = false;
|
|
||||||
|
|
||||||
for (HColumnDescriptor hcd : htd.getFamilies()) {
|
|
||||||
if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
|
|
||||||
&& hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
|
|
||||||
hasDisabled = true;
|
|
||||||
} else {
|
|
||||||
hasEnabled = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hasEnabled && hasDisabled) return ReplicationState.MIXED;
|
|
||||||
if (hasEnabled) return ReplicationState.ENABLED;
|
|
||||||
return ReplicationState.DISABLED;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the configuration needed to talk to the remote slave cluster.
|
|
||||||
* @param peer the description of replication peer
|
|
||||||
* @return the configuration for the peer cluster, null if it was unable to get the configuration
|
|
||||||
* @throws IOException
|
|
||||||
*/
|
|
||||||
private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer)
|
|
||||||
throws IOException {
|
|
||||||
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
|
|
||||||
Configuration otherConf;
|
|
||||||
try {
|
|
||||||
otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
|
|
||||||
} catch (IOException e) {
|
|
||||||
throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!peerConfig.getConfiguration().isEmpty()) {
|
|
||||||
CompoundConfiguration compound = new CompoundConfiguration();
|
|
||||||
compound.add(otherConf);
|
|
||||||
compound.addStringMap(peerConfig.getConfiguration());
|
|
||||||
return compound;
|
|
||||||
}
|
|
||||||
|
|
||||||
return otherConf;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -42,6 +42,7 @@ import java.util.stream.Stream;
|
||||||
import org.apache.commons.io.IOUtils;
|
import org.apache.commons.io.IOUtils;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
|
||||||
import org.apache.hadoop.hbase.ClusterStatus;
|
import org.apache.hadoop.hbase.ClusterStatus;
|
||||||
import org.apache.hadoop.hbase.ClusterStatus.Option;
|
import org.apache.hadoop.hbase.ClusterStatus.Option;
|
||||||
|
@ -64,7 +65,7 @@ import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterReques
|
||||||
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
|
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder;
|
||||||
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
|
import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable;
|
||||||
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
import org.apache.hadoop.hbase.client.Scan.ReadType;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
import org.apache.hadoop.hbase.client.replication.TableCFs;
|
||||||
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
import org.apache.hadoop.hbase.client.security.SecurityCapability;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
|
@ -188,6 +189,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColu
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest;
|
||||||
|
@ -505,6 +508,14 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
new CreateTableProcedureBiConsumer(tableName));
|
new CreateTableProcedureBiConsumer(tableName));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> modifyTable(TableDescriptor desc) {
|
||||||
|
return this.<ModifyTableRequest, ModifyTableResponse> procedureCall(
|
||||||
|
RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(),
|
||||||
|
ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done),
|
||||||
|
(resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName()));
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Void> deleteTable(TableName tableName) {
|
public CompletableFuture<Void> deleteTable(TableName tableName) {
|
||||||
return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
|
return this.<DeleteTableRequest, DeleteTableResponse> procedureCall(RequestConverter
|
||||||
|
@ -1515,7 +1526,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
.<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
|
.<GetReplicationPeerConfigRequest, GetReplicationPeerConfigResponse, ReplicationPeerConfig> call(
|
||||||
controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
|
controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), (
|
||||||
s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
|
s, c, req, done) -> s.getReplicationPeerConfig(c, req, done),
|
||||||
(resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call();
|
(resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1541,7 +1552,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
CompletableFuture<Void> future = new CompletableFuture<Void>();
|
CompletableFuture<Void> future = new CompletableFuture<Void>();
|
||||||
getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
|
getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
|
||||||
if (!completeExceptionally(future, error)) {
|
if (!completeExceptionally(future, error)) {
|
||||||
ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
|
ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig);
|
||||||
updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
|
updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> {
|
||||||
if (!completeExceptionally(future, error)) {
|
if (!completeExceptionally(future, error)) {
|
||||||
future.complete(result);
|
future.complete(result);
|
||||||
|
@ -1560,10 +1571,12 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
}
|
}
|
||||||
|
|
||||||
CompletableFuture<Void> future = new CompletableFuture<Void>();
|
CompletableFuture<Void> future = new CompletableFuture<Void>();
|
||||||
getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> {
|
getReplicationPeerConfig(id).whenComplete(
|
||||||
|
(peerConfig, error) -> {
|
||||||
if (!completeExceptionally(future, error)) {
|
if (!completeExceptionally(future, error)) {
|
||||||
try {
|
try {
|
||||||
ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id);
|
ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig,
|
||||||
|
id);
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
future.completeExceptionally(e);
|
future.completeExceptionally(e);
|
||||||
return;
|
return;
|
||||||
|
@ -1602,7 +1615,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
request,
|
request,
|
||||||
(s, c, req, done) -> s.listReplicationPeers(c, req, done),
|
(s, c, req, done) -> s.listReplicationPeers(c, req, done),
|
||||||
(resp) -> resp.getPeerDescList().stream()
|
(resp) -> resp.getPeerDescList().stream()
|
||||||
.map(ReplicationSerDeHelper::toReplicationPeerDescription)
|
.map(ReplicationPeerConfigUtil::toReplicationPeerDescription)
|
||||||
.collect(Collectors.toList()))).call();
|
.collect(Collectors.toList()))).call();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2168,9 +2181,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
returnedFuture.completeExceptionally(err);
|
returnedFuture.completeExceptionally(err);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
LOG.info("location is " + location);
|
|
||||||
if (!location.isPresent() || location.get().getRegion() == null) {
|
if (!location.isPresent() || location.get().getRegion() == null) {
|
||||||
LOG.info("unknown location is " + location);
|
|
||||||
returnedFuture.completeExceptionally(new UnknownRegionException(
|
returnedFuture.completeExceptionally(new UnknownRegionException(
|
||||||
"Invalid region name or encoded region name: "
|
"Invalid region name or encoded region name: "
|
||||||
+ Bytes.toStringBinary(regionNameOrEncodedRegionName)));
|
+ Bytes.toStringBinary(regionNameOrEncodedRegionName)));
|
||||||
|
@ -2323,6 +2334,18 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer {
|
||||||
|
|
||||||
|
ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) {
|
||||||
|
super(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
String getOperationType() {
|
||||||
|
return "ENABLE";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
|
private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer {
|
||||||
|
|
||||||
DeleteTableProcedureBiConsumer(TableName tableName) {
|
DeleteTableProcedureBiConsumer(TableName tableName) {
|
||||||
|
@ -3031,4 +3054,254 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin {
|
||||||
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
|
.pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
|
||||||
.startLogErrorsCnt(startLogErrorsCnt);
|
.startLogErrorsCnt(startLogErrorsCnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> enableTableReplication(TableName tableName) {
|
||||||
|
if (tableName == null) {
|
||||||
|
return failedFuture(new IllegalArgumentException("Table name is null"));
|
||||||
|
}
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
tableExists(tableName).whenComplete(
|
||||||
|
(exist, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!exist) {
|
||||||
|
future.completeExceptionally(new TableNotFoundException("Table '"
|
||||||
|
+ tableName.getNameAsString() + "' does not exists."));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
getTableSplits(tableName).whenComplete((splits, err1) -> {
|
||||||
|
if (err1 != null) {
|
||||||
|
future.completeExceptionally(err1);
|
||||||
|
} else {
|
||||||
|
checkAndSyncTableToPeerClusters(tableName, splits).whenComplete((result, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
} else {
|
||||||
|
setTableReplication(tableName, true).whenComplete((result3, err3) -> {
|
||||||
|
if (err3 != null) {
|
||||||
|
future.completeExceptionally(err3);
|
||||||
|
} else {
|
||||||
|
future.complete(result3);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CompletableFuture<Void> disableTableReplication(TableName tableName) {
|
||||||
|
if (tableName == null) {
|
||||||
|
return failedFuture(new IllegalArgumentException("Table name is null"));
|
||||||
|
}
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
tableExists(tableName).whenComplete(
|
||||||
|
(exist, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!exist) {
|
||||||
|
future.completeExceptionally(new TableNotFoundException("Table '"
|
||||||
|
+ tableName.getNameAsString() + "' does not exists."));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
setTableReplication(tableName, false).whenComplete((result, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
} else {
|
||||||
|
future.complete(result);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<byte[][]> getTableSplits(TableName tableName) {
|
||||||
|
CompletableFuture<byte[][]> future = new CompletableFuture<>();
|
||||||
|
getTableRegions(tableName).whenComplete((regions, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (regions.size() == 1) {
|
||||||
|
future.complete(null);
|
||||||
|
} else {
|
||||||
|
byte[][] splits = new byte[regions.size() - 1][];
|
||||||
|
for (int i = 1; i < regions.size(); i++) {
|
||||||
|
splits[i - 1] = regions.get(i).getStartKey();
|
||||||
|
}
|
||||||
|
future.complete(splits);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Connect to peer and check the table descriptor on peer:
|
||||||
|
* <ol>
|
||||||
|
* <li>Create the same table on peer when not exist.</li>
|
||||||
|
* <li>Throw an exception if the table already has replication enabled on any of the column
|
||||||
|
* families.</li>
|
||||||
|
* <li>Throw an exception if the table exists on peer cluster but descriptors are not same.</li>
|
||||||
|
* </ol>
|
||||||
|
* @param tableName name of the table to sync to the peer
|
||||||
|
* @param splits table split keys
|
||||||
|
*/
|
||||||
|
private CompletableFuture<Void> checkAndSyncTableToPeerClusters(TableName tableName,
|
||||||
|
byte[][] splits) {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
listReplicationPeers().whenComplete(
|
||||||
|
(peers, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (peers == null || peers.size() <= 0) {
|
||||||
|
future.completeExceptionally(new IllegalArgumentException(
|
||||||
|
"Found no peer cluster for replication."));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
List<CompletableFuture<Void>> futures = new ArrayList<>();
|
||||||
|
peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName))
|
||||||
|
.forEach(peer -> {
|
||||||
|
futures.add(trySyncTableToPeerCluster(tableName, splits, peer));
|
||||||
|
});
|
||||||
|
CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[futures.size()]))
|
||||||
|
.whenComplete((result, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
} else {
|
||||||
|
future.complete(result);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<Void> trySyncTableToPeerCluster(TableName tableName, byte[][] splits,
|
||||||
|
ReplicationPeerDescription peer) {
|
||||||
|
Configuration peerConf = null;
|
||||||
|
try {
|
||||||
|
peerConf =
|
||||||
|
ReplicationPeerConfigUtil
|
||||||
|
.getPeerClusterConfiguration(connection.getConfiguration(), peer);
|
||||||
|
} catch (IOException e) {
|
||||||
|
return failedFuture(e);
|
||||||
|
}
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
ConnectionFactory.createAsyncConnection(peerConf).whenComplete(
|
||||||
|
(conn, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
getTableDescriptor(tableName).whenComplete(
|
||||||
|
(tableDesc, err1) -> {
|
||||||
|
if (err1 != null) {
|
||||||
|
future.completeExceptionally(err1);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
AsyncAdmin peerAdmin = conn.getAdmin();
|
||||||
|
peerAdmin.tableExists(tableName).whenComplete(
|
||||||
|
(exist, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!exist) {
|
||||||
|
CompletableFuture<Void> createTableFuture = null;
|
||||||
|
if (splits == null) {
|
||||||
|
createTableFuture = peerAdmin.createTable(tableDesc);
|
||||||
|
} else {
|
||||||
|
createTableFuture = peerAdmin.createTable(tableDesc, splits);
|
||||||
|
}
|
||||||
|
createTableFuture.whenComplete(
|
||||||
|
(result, err3) -> {
|
||||||
|
if (err3 != null) {
|
||||||
|
future.completeExceptionally(err3);
|
||||||
|
} else {
|
||||||
|
future.complete(result);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin).whenComplete(
|
||||||
|
(result, err4) -> {
|
||||||
|
if (err4 != null) {
|
||||||
|
future.completeExceptionally(err4);
|
||||||
|
} else {
|
||||||
|
future.complete(result);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CompletableFuture<Void> compareTableWithPeerCluster(TableName tableName,
|
||||||
|
TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
peerAdmin.getTableDescriptor(tableName).whenComplete(
|
||||||
|
(peerTableDesc, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (peerTableDesc == null) {
|
||||||
|
future.completeExceptionally(new IllegalArgumentException(
|
||||||
|
"Failed to get table descriptor for table " + tableName.getNameAsString()
|
||||||
|
+ " from peer cluster " + peer.getPeerId()));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) {
|
||||||
|
future.completeExceptionally(new IllegalArgumentException("Table "
|
||||||
|
+ tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId()
|
||||||
|
+ ", but the table descriptors are not same when compared with source cluster."
|
||||||
|
+ " Thus can not enable the table's replication switch."));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
future.complete(null);
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set the table's replication switch if the table's replication switch is already not set.
|
||||||
|
* @param tableName name of the table
|
||||||
|
* @param enableRep is replication switch enable or disable
|
||||||
|
*/
|
||||||
|
private CompletableFuture<Void> setTableReplication(TableName tableName, boolean enableRep) {
|
||||||
|
CompletableFuture<Void> future = new CompletableFuture<>();
|
||||||
|
getTableDescriptor(tableName).whenComplete(
|
||||||
|
(tableDesc, err) -> {
|
||||||
|
if (err != null) {
|
||||||
|
future.completeExceptionally(err);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!tableDesc.matchReplicationScope(enableRep)) {
|
||||||
|
int scope =
|
||||||
|
enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL;
|
||||||
|
TableDescriptor newTableDesc =
|
||||||
|
TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build();
|
||||||
|
modifyTable(newTableDesc).whenComplete((result, err2) -> {
|
||||||
|
if (err2 != null) {
|
||||||
|
future.completeExceptionally(err2);
|
||||||
|
} else {
|
||||||
|
future.complete(result);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
future.complete(null);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
return future;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,10 +24,11 @@ import java.util.Comparator;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TableDescriptor contains the details about an HBase table such as the descriptors of
|
* TableDescriptor contains the details about an HBase table such as the descriptors of
|
||||||
|
@ -39,8 +40,15 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
public interface TableDescriptor {
|
public interface TableDescriptor {
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
static final Comparator<TableDescriptor> COMPARATOR
|
Comparator<TableDescriptor> COMPARATOR = getComparator(ColumnFamilyDescriptor.COMPARATOR);
|
||||||
= (TableDescriptor lhs, TableDescriptor rhs) -> {
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
Comparator<TableDescriptor> COMPARATOR_IGNORE_REPLICATION =
|
||||||
|
getComparator(ColumnFamilyDescriptor.COMPARATOR_IGNORE_REPLICATION);
|
||||||
|
|
||||||
|
static Comparator<TableDescriptor>
|
||||||
|
getComparator(Comparator<ColumnFamilyDescriptor> cfComparator) {
|
||||||
|
return (TableDescriptor lhs, TableDescriptor rhs) -> {
|
||||||
int result = lhs.getTableName().compareTo(rhs.getTableName());
|
int result = lhs.getTableName().compareTo(rhs.getTableName());
|
||||||
if (result != 0) {
|
if (result != 0) {
|
||||||
return result;
|
return result;
|
||||||
|
@ -52,9 +60,9 @@ public interface TableDescriptor {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (Iterator<ColumnFamilyDescriptor> it = lhsFamilies.iterator(),
|
for (Iterator<ColumnFamilyDescriptor> it = lhsFamilies.iterator(), it2 =
|
||||||
it2 = rhsFamilies.iterator(); it.hasNext();) {
|
rhsFamilies.iterator(); it.hasNext();) {
|
||||||
result = ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next());
|
result = cfComparator.compare(it.next(), it2.next());
|
||||||
if (result != 0) {
|
if (result != 0) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
@ -62,6 +70,7 @@ public interface TableDescriptor {
|
||||||
// punt on comparison for ordering, just calculate difference
|
// punt on comparison for ordering, just calculate difference
|
||||||
return Integer.compare(lhs.getValues().hashCode(), rhs.getValues().hashCode());
|
return Integer.compare(lhs.getValues().hashCode(), rhs.getValues().hashCode());
|
||||||
};
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the count of the column families of the table.
|
* Returns the count of the column families of the table.
|
||||||
|
@ -266,4 +275,30 @@ public interface TableDescriptor {
|
||||||
*/
|
*/
|
||||||
boolean isReadOnly();
|
boolean isReadOnly();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if the table's cfs' replication scope matched with the replication state
|
||||||
|
* @param enabled replication state
|
||||||
|
* @return true if matched, otherwise false
|
||||||
|
*/
|
||||||
|
default boolean matchReplicationScope(boolean enabled) {
|
||||||
|
boolean hasEnabled = false;
|
||||||
|
boolean hasDisabled = false;
|
||||||
|
|
||||||
|
for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
|
||||||
|
if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
|
||||||
|
&& cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
|
||||||
|
hasDisabled = true;
|
||||||
|
} else {
|
||||||
|
hasEnabled = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasEnabled && hasDisabled) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (hasEnabled) {
|
||||||
|
return enabled;
|
||||||
|
}
|
||||||
|
return !enabled;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,18 +33,19 @@ import java.util.TreeSet;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.Coprocessor;
|
import org.apache.hadoop.hbase.Coprocessor;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.security.User;
|
import org.apache.hadoop.hbase.security.User;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @since 2.0.0
|
* @since 2.0.0
|
||||||
|
@ -409,6 +410,24 @@ public class TableDescriptorBuilder {
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets replication scope all & only the columns already in the builder. Columns added later won't
|
||||||
|
* be backfilled with replication scope.
|
||||||
|
* @param scope replication scope
|
||||||
|
* @return a TableDescriptorBuilder
|
||||||
|
*/
|
||||||
|
public TableDescriptorBuilder setReplicationScope(int scope) {
|
||||||
|
Map<byte[], ColumnFamilyDescriptor> newFamilies = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR);
|
||||||
|
newFamilies.putAll(desc.families);
|
||||||
|
newFamilies
|
||||||
|
.forEach((cf, cfDesc) -> {
|
||||||
|
desc.removeColumnFamily(cf);
|
||||||
|
desc.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfDesc).setScope(scope)
|
||||||
|
.build());
|
||||||
|
});
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
public TableDescriptor build() {
|
public TableDescriptor build() {
|
||||||
return new ModifyableTableDescriptor(desc);
|
return new ModifyableTableDescriptor(desc);
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,7 @@ public class ReplicationAdmin implements Closeable {
|
||||||
* */
|
* */
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
|
public static Map<TableName, List<String>> parseTableCFsFromConfig(String tableCFsConfig) {
|
||||||
return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig);
|
return ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCFsConfig);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -228,7 +228,7 @@ public class ReplicationAdmin implements Closeable {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public String getPeerTableCFs(String id) throws IOException {
|
public String getPeerTableCFs(String id) throws IOException {
|
||||||
ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id);
|
ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id);
|
||||||
return ReplicationSerDeHelper.convertToString(peerConfig.getTableCFsMap());
|
return ReplicationPeerConfigUtil.convertToString(peerConfig.getTableCFsMap());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -243,7 +243,7 @@ public class ReplicationAdmin implements Closeable {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException,
|
public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException,
|
||||||
IOException {
|
IOException {
|
||||||
appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs));
|
appendPeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -300,7 +300,7 @@ public class ReplicationAdmin implements Closeable {
|
||||||
@Deprecated
|
@Deprecated
|
||||||
public void removePeerTableCFs(String id, String tableCf) throws ReplicationException,
|
public void removePeerTableCFs(String id, String tableCf) throws ReplicationException,
|
||||||
IOException {
|
IOException {
|
||||||
removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf));
|
removePeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCf));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -19,13 +19,14 @@
|
||||||
package org.apache.hadoop.hbase.client.replication;
|
package org.apache.hadoop.hbase.client.replication;
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations;
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
|
@ -35,8 +36,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.Strings;
|
import org.apache.hadoop.hbase.util.Strings;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
@ -52,11 +54,11 @@ import java.util.Set;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Stable
|
@InterfaceStability.Stable
|
||||||
public final class ReplicationSerDeHelper {
|
public final class ReplicationPeerConfigUtil {
|
||||||
|
|
||||||
private static final Log LOG = LogFactory.getLog(ReplicationSerDeHelper.class);
|
private static final Log LOG = LogFactory.getLog(ReplicationPeerConfigUtil.class);
|
||||||
|
|
||||||
private ReplicationSerDeHelper() {}
|
private ReplicationPeerConfigUtil() {}
|
||||||
|
|
||||||
public static String convertToString(Set<String> namespaces) {
|
public static String convertToString(Set<String> namespaces) {
|
||||||
if (namespaces == null) {
|
if (namespaces == null) {
|
||||||
|
@ -200,7 +202,7 @@ public final class ReplicationSerDeHelper {
|
||||||
if (bytes == null) {
|
if (bytes == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
return ReplicationSerDeHelper.convert(Bytes.toString(bytes));
|
return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -299,7 +301,8 @@ public final class ReplicationSerDeHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
|
public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) {
|
||||||
ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder();
|
ReplicationProtos.ReplicationPeer.Builder builder =
|
||||||
|
ReplicationProtos.ReplicationPeer.newBuilder();
|
||||||
if (peerConfig.getClusterKey() != null) {
|
if (peerConfig.getClusterKey() != null) {
|
||||||
builder.setClusterkey(peerConfig.getClusterKey());
|
builder.setClusterkey(peerConfig.getClusterKey());
|
||||||
}
|
}
|
||||||
|
@ -359,8 +362,8 @@ public final class ReplicationSerDeHelper {
|
||||||
|
|
||||||
public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription(
|
public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription(
|
||||||
ReplicationPeerDescription desc) {
|
ReplicationPeerDescription desc) {
|
||||||
ReplicationProtos.ReplicationPeerDescription.Builder builder = ReplicationProtos.ReplicationPeerDescription
|
ReplicationProtos.ReplicationPeerDescription.Builder builder =
|
||||||
.newBuilder();
|
ReplicationProtos.ReplicationPeerDescription.newBuilder();
|
||||||
builder.setId(desc.getPeerId());
|
builder.setId(desc.getPeerId());
|
||||||
ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState
|
ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState
|
||||||
.newBuilder();
|
.newBuilder();
|
||||||
|
@ -430,8 +433,36 @@ public final class ReplicationSerDeHelper {
|
||||||
+ " which has specified cfs from table-cfs config in peer: " + id);
|
+ " which has specified cfs from table-cfs config in peer: " + id);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id);
|
throw new ReplicationException("No table: "
|
||||||
|
+ table + " in table-cfs config of peer: " + id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the configuration needed to talk to the remote slave cluster.
|
||||||
|
* @param conf the base configuration
|
||||||
|
* @param peer the description of replication peer
|
||||||
|
* @return the configuration for the peer cluster, null if it was unable to get the configuration
|
||||||
|
* @throws IOException when create peer cluster configuration failed
|
||||||
|
*/
|
||||||
|
public static Configuration getPeerClusterConfiguration(Configuration conf,
|
||||||
|
ReplicationPeerDescription peer) throws IOException {
|
||||||
|
ReplicationPeerConfig peerConfig = peer.getPeerConfig();
|
||||||
|
Configuration otherConf;
|
||||||
|
try {
|
||||||
|
otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey());
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!peerConfig.getConfiguration().isEmpty()) {
|
||||||
|
CompoundConfiguration compound = new CompoundConfiguration();
|
||||||
|
compound.add(otherConf);
|
||||||
|
compound.addStringMap(peerConfig.getConfiguration());
|
||||||
|
return compound;
|
||||||
|
}
|
||||||
|
|
||||||
|
return otherConf;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -123,4 +123,24 @@ public class ReplicationPeerConfig {
|
||||||
builder.append("bandwidth=").append(bandwidth);
|
builder.append("bandwidth=").append(bandwidth);
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Decide whether the table need replicate to the peer cluster
|
||||||
|
* @param table name of the table
|
||||||
|
* @return true if the table need replicate to the peer cluster
|
||||||
|
*/
|
||||||
|
public boolean needToReplicate(TableName table) {
|
||||||
|
// If null means user has explicitly not configured any namespaces and table CFs
|
||||||
|
// so all the tables data are applicable for replication
|
||||||
|
if (namespaces == null && tableCFsMap == null) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (tableCFsMap != null && tableCFsMap.containsKey(table)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Row;
|
||||||
import org.apache.hadoop.hbase.client.RowMutations;
|
import org.apache.hadoop.hbase.client.RowMutations;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
@ -1621,7 +1621,7 @@ public final class RequestConverter {
|
||||||
String peerId, ReplicationPeerConfig peerConfig) {
|
String peerId, ReplicationPeerConfig peerConfig) {
|
||||||
AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder();
|
AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder();
|
||||||
builder.setPeerId(peerId);
|
builder.setPeerId(peerId);
|
||||||
builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig));
|
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1658,7 +1658,7 @@ public final class RequestConverter {
|
||||||
UpdateReplicationPeerConfigRequest.Builder builder = UpdateReplicationPeerConfigRequest
|
UpdateReplicationPeerConfigRequest.Builder builder = UpdateReplicationPeerConfigRequest
|
||||||
.newBuilder();
|
.newBuilder();
|
||||||
builder.setPeerId(peerId);
|
builder.setPeerId(peerId);
|
||||||
builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig));
|
builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,14 +30,14 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
import org.apache.zookeeper.KeeperException.NodeExistsException;
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase
|
||||||
try {
|
try {
|
||||||
byte[] data = peerConfigTracker.getData(false);
|
byte[] data = peerConfigTracker.getData(false);
|
||||||
if (data != null) {
|
if (data != null) {
|
||||||
this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data);
|
this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data);
|
||||||
}
|
}
|
||||||
} catch (DeserializationException e) {
|
} catch (DeserializationException e) {
|
||||||
LOG.error("", e);
|
LOG.error("", e);
|
||||||
|
|
|
@ -35,8 +35,7 @@ import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.CompoundConfiguration;
|
import org.apache.hadoop.hbase.CompoundConfiguration;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
||||||
|
@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -131,7 +131,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
|
|
||||||
List<ZKUtilOp> listOfOps = new ArrayList<>(2);
|
List<ZKUtilOp> listOfOps = new ArrayList<>(2);
|
||||||
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
|
ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
|
||||||
ReplicationSerDeHelper.toByteArray(peerConfig));
|
ReplicationPeerConfigUtil.toByteArray(peerConfig));
|
||||||
// b/w PeerWatcher and ReplicationZookeeper#add method to create the
|
// b/w PeerWatcher and ReplicationZookeeper#add method to create the
|
||||||
// peer-state znode. This happens while adding a peer
|
// peer-state znode. This happens while adding a peer
|
||||||
// The peer state data is set as "ENABLED" by default.
|
// The peer state data is set as "ENABLED" by default.
|
||||||
|
@ -206,9 +206,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
}
|
}
|
||||||
rpc.setTableCFsMap(tableCFs);
|
rpc.setTableCFsMap(tableCFs);
|
||||||
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
||||||
ReplicationSerDeHelper.toByteArray(rpc));
|
ReplicationPeerConfigUtil.toByteArray(rpc));
|
||||||
LOG.info("Peer tableCFs with id= " + id + " is now " +
|
LOG.info("Peer tableCFs with id= " + id + " is now " +
|
||||||
ReplicationSerDeHelper.convertToString(tableCFs));
|
ReplicationPeerConfigUtil.convertToString(tableCFs));
|
||||||
} catch (KeeperException e) {
|
} catch (KeeperException e) {
|
||||||
throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
|
throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
|
||||||
}
|
}
|
||||||
|
@ -303,7 +303,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
return ReplicationSerDeHelper.parsePeerFrom(data);
|
return ReplicationPeerConfigUtil.parsePeerFrom(data);
|
||||||
} catch (DeserializationException e) {
|
} catch (DeserializationException e) {
|
||||||
LOG.warn("Failed to parse cluster key from peerId=" + peerId
|
LOG.warn("Failed to parse cluster key from peerId=" + peerId
|
||||||
+ ", specifically the content from the following znode: " + znode);
|
+ ", specifically the content from the following znode: " + znode);
|
||||||
|
@ -372,7 +372,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
ZKUtil.setData(this.zookeeper, getPeerNode(id),
|
||||||
ReplicationSerDeHelper.toByteArray(existingConfig));
|
ReplicationPeerConfigUtil.toByteArray(existingConfig));
|
||||||
}
|
}
|
||||||
catch(KeeperException ke){
|
catch(KeeperException ke){
|
||||||
throw new ReplicationException("There was a problem trying to save changes to the " +
|
throw new ReplicationException("There was a problem trying to save changes to the " +
|
||||||
|
|
|
@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder;
|
||||||
import org.apache.hadoop.hbase.client.TableDescriptor;
|
import org.apache.hadoop.hbase.client.TableDescriptor;
|
||||||
import org.apache.hadoop.hbase.client.TableState;
|
import org.apache.hadoop.hbase.client.TableState;
|
||||||
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
import org.apache.hadoop.hbase.client.VersionInfoUtil;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
import org.apache.hadoop.hbase.errorhandling.ForeignException;
|
||||||
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
import org.apache.hadoop.hbase.exceptions.UnknownProtocolException;
|
||||||
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
|
||||||
|
@ -1809,7 +1809,7 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
AddReplicationPeerRequest request) throws ServiceException {
|
AddReplicationPeerRequest request) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
master.addReplicationPeer(request.getPeerId(),
|
master.addReplicationPeer(request.getPeerId(),
|
||||||
ReplicationSerDeHelper.convert(request.getPeerConfig()));
|
ReplicationPeerConfigUtil.convert(request.getPeerConfig()));
|
||||||
return AddReplicationPeerResponse.newBuilder().build();
|
return AddReplicationPeerResponse.newBuilder().build();
|
||||||
} catch (ReplicationException | IOException e) {
|
} catch (ReplicationException | IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
|
@ -1858,7 +1858,7 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
String peerId = request.getPeerId();
|
String peerId = request.getPeerId();
|
||||||
ReplicationPeerConfig peerConfig = master.getReplicationPeerConfig(peerId);
|
ReplicationPeerConfig peerConfig = master.getReplicationPeerConfig(peerId);
|
||||||
response.setPeerId(peerId);
|
response.setPeerId(peerId);
|
||||||
response.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig));
|
response.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig));
|
||||||
} catch (ReplicationException | IOException e) {
|
} catch (ReplicationException | IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
|
@ -1870,7 +1870,7 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
UpdateReplicationPeerConfigRequest request) throws ServiceException {
|
UpdateReplicationPeerConfigRequest request) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
master.updateReplicationPeerConfig(request.getPeerId(),
|
master.updateReplicationPeerConfig(request.getPeerId(),
|
||||||
ReplicationSerDeHelper.convert(request.getPeerConfig()));
|
ReplicationPeerConfigUtil.convert(request.getPeerConfig()));
|
||||||
return UpdateReplicationPeerConfigResponse.newBuilder().build();
|
return UpdateReplicationPeerConfigResponse.newBuilder().build();
|
||||||
} catch (ReplicationException | IOException e) {
|
} catch (ReplicationException | IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
|
@ -1885,7 +1885,7 @@ public class MasterRpcServices extends RSRpcServices
|
||||||
List<ReplicationPeerDescription> peers = master
|
List<ReplicationPeerDescription> peers = master
|
||||||
.listReplicationPeers(request.hasRegex() ? request.getRegex() : null);
|
.listReplicationPeers(request.hasRegex() ? request.getRegex() : null);
|
||||||
for (ReplicationPeerDescription peer : peers) {
|
for (ReplicationPeerDescription peer : peers) {
|
||||||
response.addPeerDesc(ReplicationSerDeHelper.toProtoReplicationPeerDescription(peer));
|
response.addPeerDesc(ReplicationPeerConfigUtil.toProtoReplicationPeerDescription(peer));
|
||||||
}
|
}
|
||||||
} catch (ReplicationException | IOException e) {
|
} catch (ReplicationException | IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
|
|
|
@ -23,15 +23,15 @@ import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -79,12 +79,12 @@ public class TableCFsUpdater extends ReplicationStateZKBase {
|
||||||
// we copy TableCFs node into PeerNode
|
// we copy TableCFs node into PeerNode
|
||||||
LOG.info("copy tableCFs into peerNode:" + peerId);
|
LOG.info("copy tableCFs into peerNode:" + peerId);
|
||||||
ReplicationProtos.TableCF[] tableCFs =
|
ReplicationProtos.TableCF[] tableCFs =
|
||||||
ReplicationSerDeHelper.parseTableCFs(
|
ReplicationPeerConfigUtil.parseTableCFs(
|
||||||
ZKUtil.getData(this.zookeeper, tableCFsNode));
|
ZKUtil.getData(this.zookeeper, tableCFsNode));
|
||||||
if (tableCFs != null && tableCFs.length > 0) {
|
if (tableCFs != null && tableCFs.length > 0) {
|
||||||
rpc.setTableCFsMap(ReplicationSerDeHelper.convert2Map(tableCFs));
|
rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
|
||||||
ZKUtil.setData(this.zookeeper, peerNode,
|
ZKUtil.setData(this.zookeeper, peerNode,
|
||||||
ReplicationSerDeHelper.toByteArray(rpc));
|
ReplicationPeerConfigUtil.toByteArray(rpc));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.info("No tableCFs in peerNode:" + peerId);
|
LOG.info("No tableCFs in peerNode:" + peerId);
|
||||||
|
@ -113,7 +113,7 @@ public class TableCFsUpdater extends ReplicationStateZKBase {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
return ReplicationSerDeHelper.parsePeerFrom(data);
|
return ReplicationPeerConfigUtil.parsePeerFrom(data);
|
||||||
} catch (DeserializationException e) {
|
} catch (DeserializationException e) {
|
||||||
LOG.warn("Failed to parse cluster key from peer=" + peerNode);
|
LOG.warn("Failed to parse cluster key from peer=" + peerNode);
|
||||||
return null;
|
return null;
|
||||||
|
|
|
@ -41,10 +41,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
||||||
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
import org.junit.runners.Parameterized;
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,242 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.client;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.CompletionException;
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.ClientTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.runner.RunWith;
|
||||||
|
import org.junit.runners.Parameterized;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class to test asynchronous replication admin operations when more than 1 cluster
|
||||||
|
*/
|
||||||
|
@RunWith(Parameterized.class)
|
||||||
|
@Category({LargeTests.class, ClientTests.class})
|
||||||
|
public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase {
|
||||||
|
|
||||||
|
private final static String ID_SECOND = "2";
|
||||||
|
|
||||||
|
private static HBaseTestingUtility TEST_UTIL2;
|
||||||
|
private static Configuration conf2;
|
||||||
|
private static AsyncAdmin admin2;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
|
||||||
|
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
|
||||||
|
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
|
||||||
|
TEST_UTIL.startMiniCluster();
|
||||||
|
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
|
||||||
|
|
||||||
|
conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
|
||||||
|
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
|
||||||
|
TEST_UTIL2 = new HBaseTestingUtility(conf2);
|
||||||
|
TEST_UTIL2.startMiniCluster();
|
||||||
|
admin2 =
|
||||||
|
ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();
|
||||||
|
|
||||||
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||||
|
rpc.setClusterKey(TEST_UTIL2.getClusterKey());
|
||||||
|
ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*");
|
||||||
|
cleanupTables(admin, pattern);
|
||||||
|
cleanupTables(admin2, pattern);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void cleanupTables(AsyncAdmin admin, Pattern pattern) {
|
||||||
|
admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> {
|
||||||
|
if (tables != null) {
|
||||||
|
tables.forEach(table -> {
|
||||||
|
try {
|
||||||
|
admin.disableTable(table).join();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.debug("Table: " + tableName + " already disabled, so just deleting it.");
|
||||||
|
}
|
||||||
|
admin.deleteTable(table).join();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}, ForkJoinPool.commonPool()).join();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) {
|
||||||
|
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
|
||||||
|
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY));
|
||||||
|
admin.createTable(builder.build()).join();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnableAndDisableTableReplication() throws Exception {
|
||||||
|
// default replication scope is local
|
||||||
|
createTableWithDefaultConf(tableName);
|
||||||
|
admin.enableTableReplication(tableName).join();
|
||||||
|
TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get();
|
||||||
|
for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
|
||||||
|
assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
|
||||||
|
}
|
||||||
|
|
||||||
|
admin.disableTableReplication(tableName).join();
|
||||||
|
tableDesc = admin.getTableDescriptor(tableName).get();
|
||||||
|
for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
|
||||||
|
assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
|
||||||
|
// Only create table in source cluster
|
||||||
|
createTableWithDefaultConf(tableName);
|
||||||
|
assertFalse(admin2.tableExists(tableName).get());
|
||||||
|
admin.enableTableReplication(tableName).join();
|
||||||
|
assertTrue(admin2.tableExists(tableName).get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception {
|
||||||
|
createTableWithDefaultConf(admin, tableName);
|
||||||
|
createTableWithDefaultConf(admin2, tableName);
|
||||||
|
TableDescriptorBuilder builder =
|
||||||
|
TableDescriptorBuilder.newBuilder(admin.getTableDescriptor(tableName).get());
|
||||||
|
builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily"))
|
||||||
|
.build());
|
||||||
|
admin2.disableTable(tableName).join();
|
||||||
|
admin2.modifyTable(builder.build()).join();
|
||||||
|
admin2.enableTable(tableName).join();
|
||||||
|
|
||||||
|
try {
|
||||||
|
admin.enableTableReplication(tableName).join();
|
||||||
|
fail("Exception should be thrown if table descriptors in the clusters are not same.");
|
||||||
|
} catch (Exception ignored) {
|
||||||
|
// ok
|
||||||
|
}
|
||||||
|
|
||||||
|
admin.disableTable(tableName).join();
|
||||||
|
admin.modifyTable(builder.build()).join();
|
||||||
|
admin.enableTable(tableName).join();
|
||||||
|
admin.enableTableReplication(tableName).join();
|
||||||
|
TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get();
|
||||||
|
for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) {
|
||||||
|
assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisableReplicationForNonExistingTable() throws Exception {
|
||||||
|
try {
|
||||||
|
admin.disableTableReplication(tableName).join();
|
||||||
|
} catch (CompletionException e) {
|
||||||
|
assertTrue(e.getCause() instanceof TableNotFoundException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnableReplicationForNonExistingTable() throws Exception {
|
||||||
|
try {
|
||||||
|
admin.enableTableReplication(tableName).join();
|
||||||
|
} catch (CompletionException e) {
|
||||||
|
assertTrue(e.getCause() instanceof TableNotFoundException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisableReplicationWhenTableNameAsNull() throws Exception {
|
||||||
|
try {
|
||||||
|
admin.disableTableReplication(null).join();
|
||||||
|
} catch (CompletionException e) {
|
||||||
|
assertTrue(e.getCause() instanceof IllegalArgumentException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEnableReplicationWhenTableNameAsNull() throws Exception {
|
||||||
|
try {
|
||||||
|
admin.enableTableReplication(null).join();
|
||||||
|
} catch (CompletionException e) {
|
||||||
|
assertTrue(e.getCause() instanceof IllegalArgumentException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Test enable table replication should create table only in user explicit specified table-cfs.
|
||||||
|
* HBASE-14717
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testEnableReplicationForExplicitSetTableCfs() throws Exception {
|
||||||
|
TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2");
|
||||||
|
// Only create table in source cluster
|
||||||
|
createTableWithDefaultConf(tableName);
|
||||||
|
createTableWithDefaultConf(tableName2);
|
||||||
|
assertFalse("Table should not exists in the peer cluster",
|
||||||
|
admin2.tableExists(tableName).get());
|
||||||
|
assertFalse("Table should not exists in the peer cluster",
|
||||||
|
admin2.tableExists(tableName2).get());
|
||||||
|
|
||||||
|
Map<TableName, ? extends Collection<String>> tableCfs = new HashMap<>();
|
||||||
|
tableCfs.put(tableName, null);
|
||||||
|
ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get();
|
||||||
|
rpc.setTableCFsMap(tableCfs);
|
||||||
|
try {
|
||||||
|
// Only add tableName to replication peer config
|
||||||
|
admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
|
||||||
|
admin.enableTableReplication(tableName2).join();
|
||||||
|
assertFalse("Table should not be created if user has set table cfs explicitly for the "
|
||||||
|
+ "peer and this is not part of that collection", admin2.tableExists(tableName2).get());
|
||||||
|
|
||||||
|
// Add tableName2 to replication peer config, too
|
||||||
|
tableCfs.put(tableName2, null);
|
||||||
|
rpc.setTableCFsMap(tableCfs);
|
||||||
|
admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
|
||||||
|
admin.enableTableReplication(tableName2).join();
|
||||||
|
assertTrue(
|
||||||
|
"Table should be created if user has explicitly added table into table cfs collection",
|
||||||
|
admin2.tableExists(tableName2).get());
|
||||||
|
} finally {
|
||||||
|
rpc.setTableCFsMap(null);
|
||||||
|
admin.updateReplicationPeerConfig(ID_SECOND, rpc).join();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -241,7 +241,7 @@ public class TestReplicationAdmin {
|
||||||
tableCFs.put(tableName1, null);
|
tableCFs.put(tableName1, null);
|
||||||
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
||||||
Map<TableName, List<String>> result =
|
Map<TableName, List<String>> result =
|
||||||
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
||||||
assertEquals(1, result.size());
|
assertEquals(1, result.size());
|
||||||
assertEquals(true, result.containsKey(tableName1));
|
assertEquals(true, result.containsKey(tableName1));
|
||||||
assertNull(result.get(tableName1));
|
assertNull(result.get(tableName1));
|
||||||
|
@ -250,7 +250,7 @@ public class TestReplicationAdmin {
|
||||||
tableCFs.clear();
|
tableCFs.clear();
|
||||||
tableCFs.put(tableName2, null);
|
tableCFs.put(tableName2, null);
|
||||||
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
||||||
result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
||||||
assertEquals(2, result.size());
|
assertEquals(2, result.size());
|
||||||
assertTrue("Should contain t1", result.containsKey(tableName1));
|
assertTrue("Should contain t1", result.containsKey(tableName1));
|
||||||
assertTrue("Should contain t2", result.containsKey(tableName2));
|
assertTrue("Should contain t2", result.containsKey(tableName2));
|
||||||
|
@ -262,7 +262,7 @@ public class TestReplicationAdmin {
|
||||||
tableCFs.put(tableName3, new ArrayList<>());
|
tableCFs.put(tableName3, new ArrayList<>());
|
||||||
tableCFs.get(tableName3).add("f1");
|
tableCFs.get(tableName3).add("f1");
|
||||||
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
||||||
result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
||||||
assertEquals(3, result.size());
|
assertEquals(3, result.size());
|
||||||
assertTrue("Should contain t1", result.containsKey(tableName1));
|
assertTrue("Should contain t1", result.containsKey(tableName1));
|
||||||
assertTrue("Should contain t2", result.containsKey(tableName2));
|
assertTrue("Should contain t2", result.containsKey(tableName2));
|
||||||
|
@ -277,7 +277,7 @@ public class TestReplicationAdmin {
|
||||||
tableCFs.get(tableName4).add("f1");
|
tableCFs.get(tableName4).add("f1");
|
||||||
tableCFs.get(tableName4).add("f2");
|
tableCFs.get(tableName4).add("f2");
|
||||||
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
||||||
result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
||||||
assertEquals(4, result.size());
|
assertEquals(4, result.size());
|
||||||
assertTrue("Should contain t1", result.containsKey(tableName1));
|
assertTrue("Should contain t1", result.containsKey(tableName1));
|
||||||
assertTrue("Should contain t2", result.containsKey(tableName2));
|
assertTrue("Should contain t2", result.containsKey(tableName2));
|
||||||
|
@ -299,7 +299,7 @@ public class TestReplicationAdmin {
|
||||||
tableCFs.put(tableName5, new ArrayList<>());
|
tableCFs.put(tableName5, new ArrayList<>());
|
||||||
tableCFs.get(tableName5).add("f1");
|
tableCFs.get(tableName5).add("f1");
|
||||||
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
||||||
result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
||||||
assertEquals(5, result.size());
|
assertEquals(5, result.size());
|
||||||
assertTrue("Should contain t5", result.containsKey(tableName5));
|
assertTrue("Should contain t5", result.containsKey(tableName5));
|
||||||
// null means replication all cfs of tab5
|
// null means replication all cfs of tab5
|
||||||
|
@ -313,7 +313,7 @@ public class TestReplicationAdmin {
|
||||||
tableCFs.clear();
|
tableCFs.clear();
|
||||||
tableCFs.put(tableName6, new ArrayList<>());
|
tableCFs.put(tableName6, new ArrayList<>());
|
||||||
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
admin.appendPeerTableCFs(ID_ONE, tableCFs);
|
||||||
result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
||||||
assertEquals(6, result.size());
|
assertEquals(6, result.size());
|
||||||
assertTrue("Should contain t6", result.containsKey(tableName6));
|
assertTrue("Should contain t6", result.containsKey(tableName6));
|
||||||
// null means replication all cfs of tab6
|
// null means replication all cfs of tab6
|
||||||
|
@ -354,7 +354,7 @@ public class TestReplicationAdmin {
|
||||||
} catch (ReplicationException e) {
|
} catch (ReplicationException e) {
|
||||||
}
|
}
|
||||||
Map<TableName, List<String>> result =
|
Map<TableName, List<String>> result =
|
||||||
ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
||||||
assertEquals(2, result.size());
|
assertEquals(2, result.size());
|
||||||
assertTrue("Should contain t1", result.containsKey(tableName1));
|
assertTrue("Should contain t1", result.containsKey(tableName1));
|
||||||
assertTrue("Should contain t2", result.containsKey(tableName2));
|
assertTrue("Should contain t2", result.containsKey(tableName2));
|
||||||
|
@ -373,7 +373,7 @@ public class TestReplicationAdmin {
|
||||||
tableCFs.clear();
|
tableCFs.clear();
|
||||||
tableCFs.put(tableName1, null);
|
tableCFs.put(tableName1, null);
|
||||||
admin.removePeerTableCFs(ID_ONE, tableCFs);
|
admin.removePeerTableCFs(ID_ONE, tableCFs);
|
||||||
result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE));
|
||||||
assertEquals(1, result.size());
|
assertEquals(1, result.size());
|
||||||
assertEquals(1, result.get(tableName2).size());
|
assertEquals(1, result.get(tableName2).size());
|
||||||
assertEquals("cf1", result.get(tableName2).get(0));
|
assertEquals("cf1", result.get(tableName2).get(0));
|
||||||
|
|
|
@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
|
||||||
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
|
||||||
|
@ -528,7 +528,7 @@ public class TestMasterReplication {
|
||||||
.getAdmin()) {
|
.getAdmin()) {
|
||||||
admin.addReplicationPeer(id,
|
admin.addReplicationPeer(id,
|
||||||
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
|
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
|
||||||
.setTableCFsMap(ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)));
|
.setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Put;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
import org.apache.hadoop.hbase.client.Table;
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
|
||||||
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
|
||||||
|
@ -187,13 +187,13 @@ public class TestPerTableCFReplication {
|
||||||
Map<TableName, List<String>> tabCFsMap = null;
|
Map<TableName, List<String>> tabCFsMap = null;
|
||||||
|
|
||||||
// 1. null or empty string, result should be null
|
// 1. null or empty string, result should be null
|
||||||
tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(null);
|
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null);
|
||||||
assertEquals(null, tabCFsMap);
|
assertEquals(null, tabCFsMap);
|
||||||
|
|
||||||
tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig("");
|
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig("");
|
||||||
assertEquals(null, tabCFsMap);
|
assertEquals(null, tabCFsMap);
|
||||||
|
|
||||||
tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(" ");
|
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(" ");
|
||||||
assertEquals(null, tabCFsMap);
|
assertEquals(null, tabCFsMap);
|
||||||
|
|
||||||
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
|
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
|
||||||
|
@ -201,20 +201,20 @@ public class TestPerTableCFReplication {
|
||||||
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
|
final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3");
|
||||||
|
|
||||||
// 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
|
// 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3"
|
||||||
tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName1.getNameAsString());
|
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString());
|
||||||
assertEquals(1, tabCFsMap.size()); // only one table
|
assertEquals(1, tabCFsMap.size()); // only one table
|
||||||
assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1"
|
assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1"
|
||||||
assertFalse(tabCFsMap.containsKey(tableName2)); // not other table
|
assertFalse(tabCFsMap.containsKey(tableName2)); // not other table
|
||||||
assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list,
|
assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list,
|
||||||
|
|
||||||
tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName2 + ":cf1");
|
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1");
|
||||||
assertEquals(1, tabCFsMap.size()); // only one table
|
assertEquals(1, tabCFsMap.size()); // only one table
|
||||||
assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2"
|
assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2"
|
||||||
assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
|
assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
|
||||||
assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf
|
assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf
|
||||||
assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1"
|
assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1"
|
||||||
|
|
||||||
tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3");
|
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3");
|
||||||
assertEquals(1, tabCFsMap.size()); // only one table
|
assertEquals(1, tabCFsMap.size()); // only one table
|
||||||
assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2"
|
assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2"
|
||||||
assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
|
assertFalse(tabCFsMap.containsKey(tableName1)); // not other table
|
||||||
|
@ -223,7 +223,7 @@ public class TestPerTableCFReplication {
|
||||||
assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3"
|
assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3"
|
||||||
|
|
||||||
// 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
|
// 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
|
||||||
tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName1 + " ; " + tableName2
|
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1 + " ; " + tableName2
|
||||||
+ ":cf1 ; " + tableName3 + ":cf1,cf3");
|
+ ":cf1 ; " + tableName3 + ":cf1,cf3");
|
||||||
// 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
|
// 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
|
||||||
assertEquals(3, tabCFsMap.size());
|
assertEquals(3, tabCFsMap.size());
|
||||||
|
@ -242,7 +242,7 @@ public class TestPerTableCFReplication {
|
||||||
|
|
||||||
// 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
|
// 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated
|
||||||
// still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
|
// still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3"
|
||||||
tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
|
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
|
||||||
tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;");
|
tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;");
|
||||||
// 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
|
// 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3"
|
||||||
assertEquals(3, tabCFsMap.size());
|
assertEquals(3, tabCFsMap.size());
|
||||||
|
@ -261,7 +261,7 @@ public class TestPerTableCFReplication {
|
||||||
|
|
||||||
// 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3"
|
// 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3"
|
||||||
// "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally
|
// "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally
|
||||||
tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(
|
tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(
|
||||||
tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3");
|
tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3");
|
||||||
// 5.1 no "tableName1" and "tableName2", only "tableName3"
|
// 5.1 no "tableName1" and "tableName2", only "tableName3"
|
||||||
assertEquals(1, tabCFsMap.size()); // only one table
|
assertEquals(1, tabCFsMap.size()); // only one table
|
||||||
|
@ -281,10 +281,10 @@ public class TestPerTableCFReplication {
|
||||||
Map<TableName, List<String>> tabCFsMap = null;
|
Map<TableName, List<String>> tabCFsMap = null;
|
||||||
|
|
||||||
// 1. null or empty string, result should be null
|
// 1. null or empty string, result should be null
|
||||||
assertNull(ReplicationSerDeHelper.convert(tabCFsMap));
|
assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap));
|
||||||
|
|
||||||
tabCFsMap = new HashMap<>();
|
tabCFsMap = new HashMap<>();
|
||||||
tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
|
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
|
||||||
assertEquals(0, tableCFs.length);
|
assertEquals(0, tableCFs.length);
|
||||||
|
|
||||||
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
|
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
|
||||||
|
@ -294,7 +294,7 @@ public class TestPerTableCFReplication {
|
||||||
// 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
|
// 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3"
|
||||||
tabCFsMap.clear();
|
tabCFsMap.clear();
|
||||||
tabCFsMap.put(tableName1, null);
|
tabCFsMap.put(tableName1, null);
|
||||||
tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
|
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
|
||||||
assertEquals(1, tableCFs.length); // only one table
|
assertEquals(1, tableCFs.length); // only one table
|
||||||
assertEquals(tableName1.toString(),
|
assertEquals(tableName1.toString(),
|
||||||
tableCFs[0].getTableName().getQualifier().toStringUtf8());
|
tableCFs[0].getTableName().getQualifier().toStringUtf8());
|
||||||
|
@ -303,7 +303,7 @@ public class TestPerTableCFReplication {
|
||||||
tabCFsMap.clear();
|
tabCFsMap.clear();
|
||||||
tabCFsMap.put(tableName2, new ArrayList<>());
|
tabCFsMap.put(tableName2, new ArrayList<>());
|
||||||
tabCFsMap.get(tableName2).add("cf1");
|
tabCFsMap.get(tableName2).add("cf1");
|
||||||
tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
|
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
|
||||||
assertEquals(1, tableCFs.length); // only one table
|
assertEquals(1, tableCFs.length); // only one table
|
||||||
assertEquals(tableName2.toString(),
|
assertEquals(tableName2.toString(),
|
||||||
tableCFs[0].getTableName().getQualifier().toStringUtf8());
|
tableCFs[0].getTableName().getQualifier().toStringUtf8());
|
||||||
|
@ -314,7 +314,7 @@ public class TestPerTableCFReplication {
|
||||||
tabCFsMap.put(tableName3, new ArrayList<>());
|
tabCFsMap.put(tableName3, new ArrayList<>());
|
||||||
tabCFsMap.get(tableName3).add("cf1");
|
tabCFsMap.get(tableName3).add("cf1");
|
||||||
tabCFsMap.get(tableName3).add("cf3");
|
tabCFsMap.get(tableName3).add("cf3");
|
||||||
tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
|
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
|
||||||
assertEquals(1, tableCFs.length);
|
assertEquals(1, tableCFs.length);
|
||||||
assertEquals(tableName3.toString(),
|
assertEquals(tableName3.toString(),
|
||||||
tableCFs[0].getTableName().getQualifier().toStringUtf8());
|
tableCFs[0].getTableName().getQualifier().toStringUtf8());
|
||||||
|
@ -330,28 +330,28 @@ public class TestPerTableCFReplication {
|
||||||
tabCFsMap.get(tableName3).add("cf1");
|
tabCFsMap.get(tableName3).add("cf1");
|
||||||
tabCFsMap.get(tableName3).add("cf3");
|
tabCFsMap.get(tableName3).add("cf3");
|
||||||
|
|
||||||
tableCFs = ReplicationSerDeHelper.convert(tabCFsMap);
|
tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap);
|
||||||
assertEquals(3, tableCFs.length);
|
assertEquals(3, tableCFs.length);
|
||||||
assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tableName1.toString()));
|
assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()));
|
||||||
assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tableName2.toString()));
|
assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()));
|
||||||
assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()));
|
assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()));
|
||||||
|
|
||||||
assertEquals(0,
|
assertEquals(0,
|
||||||
ReplicationSerDeHelper.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount());
|
ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount());
|
||||||
|
|
||||||
assertEquals(1,
|
assertEquals(1, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
|
||||||
ReplicationSerDeHelper.getTableCF(tableCFs, tableName2.toString()).getFamiliesCount());
|
.getFamiliesCount());
|
||||||
assertEquals("cf1",
|
assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())
|
||||||
ReplicationSerDeHelper.getTableCF(tableCFs, tableName2.toString()).getFamilies(0).toStringUtf8());
|
.getFamilies(0).toStringUtf8());
|
||||||
|
|
||||||
assertEquals(2,
|
assertEquals(2, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
|
||||||
ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()).getFamiliesCount());
|
.getFamiliesCount());
|
||||||
assertEquals("cf1",
|
assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
|
||||||
ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()).getFamilies(0).toStringUtf8());
|
.getFamilies(0).toStringUtf8());
|
||||||
assertEquals("cf3",
|
assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())
|
||||||
ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()).getFamilies(1).toStringUtf8());
|
.getFamilies(1).toStringUtf8());
|
||||||
|
|
||||||
tabCFsMap = ReplicationSerDeHelper.convert2Map(tableCFs);
|
tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs);
|
||||||
assertEquals(3, tabCFsMap.size());
|
assertEquals(3, tabCFsMap.size());
|
||||||
assertTrue(tabCFsMap.containsKey(tableName1));
|
assertTrue(tabCFsMap.containsKey(tableName1));
|
||||||
assertTrue(tabCFsMap.containsKey(tableName2));
|
assertTrue(tabCFsMap.containsKey(tableName2));
|
||||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.TableName;
|
||||||
import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
|
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
|
||||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||||
|
@ -99,14 +99,15 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
|
||||||
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
|
||||||
rpc.setClusterKey(zkw.getQuorum());
|
rpc.setClusterKey(zkw.getQuorum());
|
||||||
String peerNode = getPeerNode(peerId);
|
String peerNode = getPeerNode(peerId);
|
||||||
ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
|
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
|
||||||
|
|
||||||
String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3;
|
String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3;
|
||||||
String tableCFsNode = getTableCFsNode(peerId);
|
String tableCFsNode = getTableCFsNode(peerId);
|
||||||
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
|
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
|
||||||
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
|
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
|
||||||
|
|
||||||
ReplicationPeerConfig actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
ReplicationPeerConfig actualRpc =
|
||||||
|
ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
||||||
String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
||||||
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
||||||
|
@ -117,14 +118,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
|
||||||
rpc = new ReplicationPeerConfig();
|
rpc = new ReplicationPeerConfig();
|
||||||
rpc.setClusterKey(zkw.getQuorum());
|
rpc.setClusterKey(zkw.getQuorum());
|
||||||
peerNode = getPeerNode(peerId);
|
peerNode = getPeerNode(peerId);
|
||||||
ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
|
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
|
||||||
|
|
||||||
tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2";
|
tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2";
|
||||||
tableCFsNode = getTableCFsNode(peerId);
|
tableCFsNode = getTableCFsNode(peerId);
|
||||||
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
|
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
|
||||||
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
|
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
|
||||||
|
|
||||||
actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
||||||
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
||||||
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
||||||
|
@ -135,14 +136,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
|
||||||
rpc = new ReplicationPeerConfig();
|
rpc = new ReplicationPeerConfig();
|
||||||
rpc.setClusterKey(zkw.getQuorum());
|
rpc.setClusterKey(zkw.getQuorum());
|
||||||
peerNode = getPeerNode(peerId);
|
peerNode = getPeerNode(peerId);
|
||||||
ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
|
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
|
||||||
|
|
||||||
tableCFs = "";
|
tableCFs = "";
|
||||||
tableCFsNode = getTableCFsNode(peerId);
|
tableCFsNode = getTableCFsNode(peerId);
|
||||||
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
|
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
|
||||||
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
|
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
|
||||||
|
|
||||||
actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
||||||
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
||||||
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
||||||
|
@ -153,10 +154,10 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
|
||||||
rpc = new ReplicationPeerConfig();
|
rpc = new ReplicationPeerConfig();
|
||||||
rpc.setClusterKey(zkw.getQuorum());
|
rpc.setClusterKey(zkw.getQuorum());
|
||||||
peerNode = getPeerNode(peerId);
|
peerNode = getPeerNode(peerId);
|
||||||
ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc));
|
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
|
||||||
|
|
||||||
tableCFsNode = getTableCFsNode(peerId);
|
tableCFsNode = getTableCFsNode(peerId);
|
||||||
actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
||||||
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
|
||||||
|
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
||||||
|
@ -167,7 +168,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
|
||||||
|
|
||||||
peerId = "1";
|
peerId = "1";
|
||||||
peerNode = getPeerNode(peerId);
|
peerNode = getPeerNode(peerId);
|
||||||
actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
||||||
Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
|
Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
|
||||||
assertEquals(3, tableNameListMap.size());
|
assertEquals(3, tableNameListMap.size());
|
||||||
|
@ -184,7 +185,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
|
||||||
|
|
||||||
peerId = "2";
|
peerId = "2";
|
||||||
peerNode = getPeerNode(peerId);
|
peerNode = getPeerNode(peerId);
|
||||||
actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
||||||
tableNameListMap = actualRpc.getTableCFsMap();
|
tableNameListMap = actualRpc.getTableCFsMap();
|
||||||
assertEquals(2, tableNameListMap.size());
|
assertEquals(2, tableNameListMap.size());
|
||||||
|
@ -198,14 +199,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater {
|
||||||
|
|
||||||
peerId = "3";
|
peerId = "3";
|
||||||
peerNode = getPeerNode(peerId);
|
peerNode = getPeerNode(peerId);
|
||||||
actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
||||||
tableNameListMap = actualRpc.getTableCFsMap();
|
tableNameListMap = actualRpc.getTableCFsMap();
|
||||||
assertNull(tableNameListMap);
|
assertNull(tableNameListMap);
|
||||||
|
|
||||||
peerId = "4";
|
peerId = "4";
|
||||||
peerNode = getPeerNode(peerId);
|
peerNode = getPeerNode(peerId);
|
||||||
actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
|
||||||
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
|
||||||
tableNameListMap = actualRpc.getTableCFsMap();
|
tableNameListMap = actualRpc.getTableCFsMap();
|
||||||
assertNull(tableNameListMap);
|
assertNull(tableNameListMap);
|
||||||
|
|
Loading…
Reference in New Issue