From 600fdee8449aa1de80c8a78d3bb5e8551d3a0261 Mon Sep 17 00:00:00 2001 From: Guanghao Zhang Date: Sun, 12 Nov 2017 20:16:20 +0800 Subject: [PATCH] HBASE-19009 implement modifyTable and enable/disableTableReplication for AsyncAdmin --- .../hadoop/hbase/client/AsyncAdmin.java | 18 + .../hadoop/hbase/client/AsyncHBaseAdmin.java | 17 +- .../hbase/client/ColumnFamilyDescriptor.java | 27 ++ .../hadoop/hbase/client/HBaseAdmin.java | 220 ++---------- .../hbase/client/RawAsyncHBaseAdmin.java | 313 ++++++++++++++++-- .../hadoop/hbase/client/TableDescriptor.java | 51 ++- .../hbase/client/TableDescriptorBuilder.java | 21 +- .../client/replication/ReplicationAdmin.java | 8 +- ...er.java => ReplicationPeerConfigUtil.java} | 55 ++- .../replication/ReplicationPeerConfig.java | 20 ++ .../shaded/protobuf/RequestConverter.java | 6 +- .../replication/ReplicationPeerZKImpl.java | 6 +- .../replication/ReplicationPeersZKImpl.java | 14 +- .../hbase/master/MasterRpcServices.java | 10 +- .../replication/master/TableCFsUpdater.java | 14 +- .../client/TestAsyncReplicationAdminApi.java | 2 - ...tAsyncReplicationAdminApiWithClusters.java | 242 ++++++++++++++ .../replication/TestReplicationAdmin.java | 16 +- .../replication/TestMasterReplication.java | 4 +- .../TestPerTableCFReplication.java | 62 ++-- .../master/TestTableCFsUpdater.java | 27 +- 21 files changed, 836 insertions(+), 317 deletions(-) rename hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/{ReplicationSerDeHelper.java => ReplicationPeerConfigUtil.java} (90%) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java index f251a8f5dd8..722e8b5d4dd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java @@ -141,6 +141,12 @@ public interface AsyncAdmin { */ CompletableFuture createTable(TableDescriptor desc, byte[][] splitKeys); + /* + * Modify an existing table, more IRB friendly version. + * @param desc modified description of the table + */ + CompletableFuture modifyTable(TableDescriptor desc); + /** * Deletes a table. * @param tableName name of table to delete @@ -552,6 +558,18 @@ public interface AsyncAdmin { */ CompletableFuture> listReplicatedTableCFs(); + /** + * Enable a table's replication switch. + * @param tableName name of the table + */ + CompletableFuture enableTableReplication(TableName tableName); + + /** + * Disable a table's replication switch. + * @param tableName name of the table + */ + CompletableFuture disableTableReplication(TableName tableName); + /** * Take a snapshot for the given table. If the table is enabled, a FLUSH-type snapshot will be * taken. If the table is disabled, an offline snapshot is taken. Snapshots are considered unique diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java index 250a38c7ad0..5a20291c3fe 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java @@ -127,6 +127,11 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.createTable(desc, splitKeys)); } + @Override + public CompletableFuture modifyTable(TableDescriptor desc) { + return wrap(rawAdmin.modifyTable(desc)); + } + @Override public CompletableFuture deleteTable(TableName tableName) { return wrap(rawAdmin.deleteTable(tableName)); @@ -419,6 +424,16 @@ public class AsyncHBaseAdmin implements AsyncAdmin { return wrap(rawAdmin.listReplicatedTableCFs()); } + @Override + public CompletableFuture enableTableReplication(TableName tableName) { + return wrap(rawAdmin.enableTableReplication(tableName)); + } + + @Override + public CompletableFuture disableTableReplication(TableName tableName) { + return wrap(rawAdmin.disableTableReplication(tableName)); + } + @Override public CompletableFuture snapshot(SnapshotDescription snapshot) { return wrap(rawAdmin.snapshot(snapshot)); @@ -709,4 +724,4 @@ public class AsyncHBaseAdmin implements AsyncAdmin { public CompletableFuture> clearDeadServers(List servers) { return wrap(rawAdmin.clearDeadServers(servers)); } -} \ No newline at end of file +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java index c232271b2c5..03f45822e14 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ColumnFamilyDescriptor.java @@ -18,7 +18,9 @@ package org.apache.hadoop.hbase.client; import java.util.Comparator; +import java.util.HashMap; import java.util.Map; + import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.MemoryCompactionPolicy; import org.apache.yetus.audience.InterfaceAudience; @@ -54,6 +56,31 @@ public interface ColumnFamilyDescriptor { return lhs.getConfiguration().hashCode() - rhs.getConfiguration().hashCode(); }; + static final Bytes REPLICATION_SCOPE_BYTES = new Bytes( + Bytes.toBytes(ColumnFamilyDescriptorBuilder.REPLICATION_SCOPE)); + + @InterfaceAudience.Private + static final Comparator COMPARATOR_IGNORE_REPLICATION = ( + ColumnFamilyDescriptor lcf, ColumnFamilyDescriptor rcf) -> { + int result = Bytes.compareTo(lcf.getName(), rcf.getName()); + if (result != 0) { + return result; + } + // ColumnFamilyDescriptor.getValues is a immutable map, so copy it and remove + // REPLICATION_SCOPE_BYTES + Map lValues = new HashMap<>(); + lValues.putAll(lcf.getValues()); + lValues.remove(REPLICATION_SCOPE_BYTES); + Map rValues = new HashMap<>(); + rValues.putAll(rcf.getValues()); + rValues.remove(REPLICATION_SCOPE_BYTES); + result = lValues.hashCode() - rValues.hashCode(); + if (result != 0) { + return result; + } + return lcf.getConfiguration().hashCode() - rcf.getConfiguration().hashCode(); + }; + /** * @return The storefile/hfile blocksize for this column family. */ diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 80f9d163b39..e153381c6c5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -30,7 +30,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.EnumSet; import java.util.HashMap; -import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -45,6 +44,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -54,10 +54,8 @@ import org.apache.hadoop.hbase.CacheEvictionStats; import org.apache.hadoop.hbase.CacheEvictionStatsBuilder; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus.Option; -import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionLocation; @@ -76,7 +74,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.UnknownRegionException; import org.apache.hadoop.hbase.ZooKeeperConnectionException; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.exceptions.TimeoutIOException; @@ -3893,7 +3891,7 @@ public class HBaseAdmin implements Admin { protected ReplicationPeerConfig rpcCall() throws Exception { GetReplicationPeerConfigResponse response = master.getReplicationPeerConfig( getRpcController(), RequestConverter.buildGetReplicationPeerConfigRequest(peerId)); - return ReplicationSerDeHelper.convert(response.getPeerConfig()); + return ReplicationPeerConfigUtil.convert(response.getPeerConfig()); } }); } @@ -3919,7 +3917,7 @@ public class HBaseAdmin implements Admin { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); updateReplicationPeerConfig(id, peerConfig); } @@ -3931,7 +3929,7 @@ public class HBaseAdmin implements Admin { throw new ReplicationException("tableCfs is null"); } ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id); - ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); + ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); updateReplicationPeerConfig(id, peerConfig); } @@ -3957,7 +3955,7 @@ public class HBaseAdmin implements Admin { .getPeerDescList(); List result = new ArrayList<>(peersList.size()); for (ReplicationProtos.ReplicationPeerDescription peer : peersList) { - result.add(ReplicationSerDeHelper.toReplicationPeerDescription(peer)); + result.add(ReplicationPeerConfigUtil.toReplicationPeerDescription(peer)); } return result; } @@ -4010,19 +4008,18 @@ public class HBaseAdmin implements Admin { @Override public List listReplicatedTableCFs() throws IOException { List replicatedTableCFs = new ArrayList<>(); - HTableDescriptor[] tables = listTables(); - for (HTableDescriptor table : tables) { - HColumnDescriptor[] columns = table.getColumnFamilies(); + List tables = listTableDescriptors(); + tables.forEach(table -> { Map cfs = new HashMap<>(); - for (HColumnDescriptor column : columns) { - if (column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) { - cfs.put(column.getNameAsString(), column.getScope()); - } - } + Stream.of(table.getColumnFamilies()) + .filter(column -> column.getScope() != HConstants.REPLICATION_SCOPE_LOCAL) + .forEach(column -> { + cfs.put(column.getNameAsString(), column.getScope()); + }); if (!cfs.isEmpty()) { replicatedTableCFs.add(new TableCFs(table.getTableName(), cfs)); } - } + }); return replicatedTableCFs; } @@ -4046,83 +4043,12 @@ public class HBaseAdmin implements Admin { throw new IllegalArgumentException("Table name is null"); } if (!tableExists(tableName)) { - throw new TableNotFoundException("Table '" + tableName.getNamespaceAsString() + throw new TableNotFoundException("Table '" + tableName.getNameAsString() + "' does not exists."); } setTableRep(tableName, false); } - /** - * Copies the REPLICATION_SCOPE of table descriptor passed as an argument. Before copy, the method - * ensures that the name of table and column-families should match. - * @param peerHtd descriptor on peer cluster - * @param localHtd - The HTableDescriptor of table from source cluster. - * @return true If the name of table and column families match and REPLICATION_SCOPE copied - * successfully. false If there is any mismatch in the names. - */ - private boolean copyReplicationScope(final HTableDescriptor peerHtd, - final HTableDescriptor localHtd) { - // Copy the REPLICATION_SCOPE only when table names and the names of - // Column-Families are same. - int result = peerHtd.getTableName().compareTo(localHtd.getTableName()); - - if (result == 0) { - Iterator remoteHCDIter = peerHtd.getFamilies().iterator(); - Iterator localHCDIter = localHtd.getFamilies().iterator(); - - while (remoteHCDIter.hasNext() && localHCDIter.hasNext()) { - HColumnDescriptor remoteHCD = remoteHCDIter.next(); - HColumnDescriptor localHCD = localHCDIter.next(); - - String remoteHCDName = remoteHCD.getNameAsString(); - String localHCDName = localHCD.getNameAsString(); - - if (remoteHCDName.equals(localHCDName)) { - remoteHCD.setScope(localHCD.getScope()); - } else { - result = -1; - break; - } - } - if (remoteHCDIter.hasNext() || localHCDIter.hasNext()) { - return false; - } - } - - return result == 0; - } - - /** - * Compare the contents of the descriptor with another one passed as a parameter for replication - * purpose. The REPLICATION_SCOPE field is ignored during comparison. - * @param peerHtd descriptor on peer cluster - * @param localHtd descriptor on source cluster which needs to be replicated. - * @return true if the contents of the two descriptors match (ignoring just REPLICATION_SCOPE). - * @see java.lang.Object#equals(java.lang.Object) - */ - private boolean compareForReplication(HTableDescriptor peerHtd, HTableDescriptor localHtd) { - if (peerHtd == localHtd) { - return true; - } - if (peerHtd == null) { - return false; - } - boolean result = false; - - // Create a copy of peer HTD as we need to change its replication - // scope to match with the local HTD. - HTableDescriptor peerHtdCopy = new HTableDescriptor(peerHtd); - - result = copyReplicationScope(peerHtdCopy, localHtd); - - // If copy was successful, compare the two tables now. - if (result) { - result = (peerHtdCopy.compareTo(localHtd) == 0); - } - - return result; - } - /** * Connect to peer and check the table descriptor on peer: *
    @@ -4143,21 +4069,23 @@ public class HBaseAdmin implements Admin { } for (ReplicationPeerDescription peerDesc : peers) { - if (needToReplicate(tableName, peerDesc)) { - Configuration peerConf = getPeerClusterConfiguration(peerDesc); + if (peerDesc.getPeerConfig().needToReplicate(tableName)) { + Configuration peerConf = + ReplicationPeerConfigUtil.getPeerClusterConfiguration(this.conf, peerDesc); try (Connection conn = ConnectionFactory.createConnection(peerConf); Admin repHBaseAdmin = conn.getAdmin()) { - HTableDescriptor localHtd = getTableDescriptor(tableName); - HTableDescriptor peerHtd = null; + TableDescriptor tableDesc = getDescriptor(tableName); + TableDescriptor peerTableDesc = null; if (!repHBaseAdmin.tableExists(tableName)) { - repHBaseAdmin.createTable(localHtd, splits); + repHBaseAdmin.createTable(tableDesc, splits); } else { - peerHtd = repHBaseAdmin.getTableDescriptor(tableName); - if (peerHtd == null) { + peerTableDesc = repHBaseAdmin.getDescriptor(tableName); + if (peerTableDesc == null) { throw new IllegalArgumentException("Failed to get table descriptor for table " + tableName.getNameAsString() + " from peer cluster " + peerDesc.getPeerId()); } - if (!compareForReplication(peerHtd, localHtd)) { + if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, + tableDesc) != 0) { throw new IllegalArgumentException("Table " + tableName.getNameAsString() + " exists in peer cluster " + peerDesc.getPeerId() + ", but the table descriptors are not same when compared with source cluster." @@ -4169,33 +4097,6 @@ public class HBaseAdmin implements Admin { } } - /** - * Decide whether the table need replicate to the peer cluster according to the peer config - * @param table name of the table - * @param peer config for the peer - * @return true if the table need replicate to the peer cluster - */ - private boolean needToReplicate(TableName table, ReplicationPeerDescription peer) { - ReplicationPeerConfig peerConfig = peer.getPeerConfig(); - Set namespaces = peerConfig.getNamespaces(); - Map> tableCFsMap = peerConfig.getTableCFsMap(); - // If null means user has explicitly not configured any namespaces and table CFs - // so all the tables data are applicable for replication - if (namespaces == null && tableCFsMap == null) { - return true; - } - if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { - return true; - } - if (tableCFsMap != null && tableCFsMap.containsKey(table)) { - return true; - } - LOG.debug("Table " + table.getNameAsString() - + " doesn't need replicate to peer cluster, peerId=" + peer.getPeerId() + ", clusterKey=" - + peerConfig.getClusterKey()); - return false; - } - /** * Set the table's replication switch if the table's replication switch is already not set. * @param tableName name of the table @@ -4203,75 +4104,14 @@ public class HBaseAdmin implements Admin { * @throws IOException if a remote or network exception occurs */ private void setTableRep(final TableName tableName, boolean enableRep) throws IOException { - HTableDescriptor htd = new HTableDescriptor(getTableDescriptor(tableName)); - ReplicationState currentReplicationState = getTableReplicationState(htd); - if (enableRep && currentReplicationState != ReplicationState.ENABLED - || !enableRep && currentReplicationState != ReplicationState.DISABLED) { - for (HColumnDescriptor hcd : htd.getFamilies()) { - hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL - : HConstants.REPLICATION_SCOPE_LOCAL); - } - modifyTable(tableName, htd); + TableDescriptor tableDesc = getDescriptor(tableName); + if (!tableDesc.matchReplicationScope(enableRep)) { + int scope = + enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; + modifyTable(TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build()); } } - /** - * This enum indicates the current state of the replication for a given table. - */ - private enum ReplicationState { - ENABLED, // all column families enabled - MIXED, // some column families enabled, some disabled - DISABLED // all column families disabled - } - - /** - * @param htd table descriptor details for the table to check - * @return ReplicationState the current state of the table. - */ - private ReplicationState getTableReplicationState(HTableDescriptor htd) { - boolean hasEnabled = false; - boolean hasDisabled = false; - - for (HColumnDescriptor hcd : htd.getFamilies()) { - if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL - && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { - hasDisabled = true; - } else { - hasEnabled = true; - } - } - - if (hasEnabled && hasDisabled) return ReplicationState.MIXED; - if (hasEnabled) return ReplicationState.ENABLED; - return ReplicationState.DISABLED; - } - - /** - * Returns the configuration needed to talk to the remote slave cluster. - * @param peer the description of replication peer - * @return the configuration for the peer cluster, null if it was unable to get the configuration - * @throws IOException - */ - private Configuration getPeerClusterConfiguration(ReplicationPeerDescription peer) - throws IOException { - ReplicationPeerConfig peerConfig = peer.getPeerConfig(); - Configuration otherConf; - try { - otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey()); - } catch (IOException e) { - throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); - } - - if (!peerConfig.getConfiguration().isEmpty()) { - CompoundConfiguration compound = new CompoundConfiguration(); - compound.add(otherConf); - compound.addStringMap(peerConfig.getConfiguration()); - return compound; - } - - return otherConf; - } - @Override public void clearCompactionQueues(final ServerName sn, final Set queues) throws IOException, InterruptedException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java index d77cd156895..bcf581b1363 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java @@ -42,6 +42,7 @@ import java.util.stream.Stream; import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.AsyncMetaTableAccessor; import org.apache.hadoop.hbase.ClusterStatus; import org.apache.hadoop.hbase.ClusterStatus.Option; @@ -64,7 +65,7 @@ import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.MasterReques import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.ServerRequestCallerBuilder; import org.apache.hadoop.hbase.client.RawAsyncTable.CoprocessorCallable; import org.apache.hadoop.hbase.client.Scan.ReadType; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.TableCFs; import org.apache.hadoop.hbase.client.security.SecurityCapability; import org.apache.hadoop.hbase.exceptions.DeserializationException; @@ -188,6 +189,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColu import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyColumnResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyNamespaceResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ModifyTableResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MoveRegionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.NormalizeRequest; @@ -505,6 +508,14 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { new CreateTableProcedureBiConsumer(tableName)); } + @Override + public CompletableFuture modifyTable(TableDescriptor desc) { + return this. procedureCall( + RequestConverter.buildModifyTableRequest(desc.getTableName(), desc, ng.getNonceGroup(), + ng.newNonce()), (s, c, req, done) -> s.modifyTable(c, req, done), + (resp) -> resp.getProcId(), new ModifyTableProcedureBiConsumer(this, desc.getTableName())); + } + @Override public CompletableFuture deleteTable(TableName tableName) { return this. procedureCall(RequestConverter @@ -1515,7 +1526,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { . call( controller, stub, RequestConverter.buildGetReplicationPeerConfigRequest(peerId), ( s, c, req, done) -> s.getReplicationPeerConfig(c, req, done), - (resp) -> ReplicationSerDeHelper.convert(resp.getPeerConfig()))).call(); + (resp) -> ReplicationPeerConfigUtil.convert(resp.getPeerConfig()))).call(); } @Override @@ -1541,7 +1552,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { CompletableFuture future = new CompletableFuture(); getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { if (!completeExceptionally(future, error)) { - ReplicationSerDeHelper.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); + ReplicationPeerConfigUtil.appendTableCFsToReplicationPeerConfig(tableCfs, peerConfig); updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { if (!completeExceptionally(future, error)) { future.complete(result); @@ -1560,21 +1571,23 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { } CompletableFuture future = new CompletableFuture(); - getReplicationPeerConfig(id).whenComplete((peerConfig, error) -> { - if (!completeExceptionally(future, error)) { - try { - ReplicationSerDeHelper.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, id); - } catch (ReplicationException e) { - future.completeExceptionally(e); - return; - } - updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { - if (!completeExceptionally(future, error)) { - future.complete(result); + getReplicationPeerConfig(id).whenComplete( + (peerConfig, error) -> { + if (!completeExceptionally(future, error)) { + try { + ReplicationPeerConfigUtil.removeTableCFsFromReplicationPeerConfig(tableCfs, peerConfig, + id); + } catch (ReplicationException e) { + future.completeExceptionally(e); + return; } - }); - } - }); + updateReplicationPeerConfig(id, peerConfig).whenComplete((result, err) -> { + if (!completeExceptionally(future, error)) { + future.complete(result); + } + }); + } + }); return future; } @@ -1602,7 +1615,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { request, (s, c, req, done) -> s.listReplicationPeers(c, req, done), (resp) -> resp.getPeerDescList().stream() - .map(ReplicationSerDeHelper::toReplicationPeerDescription) + .map(ReplicationPeerConfigUtil::toReplicationPeerDescription) .collect(Collectors.toList()))).call(); } @@ -2168,9 +2181,7 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { returnedFuture.completeExceptionally(err); return; } - LOG.info("location is " + location); if (!location.isPresent() || location.get().getRegion() == null) { - LOG.info("unknown location is " + location); returnedFuture.completeExceptionally(new UnknownRegionException( "Invalid region name or encoded region name: " + Bytes.toStringBinary(regionNameOrEncodedRegionName))); @@ -2323,6 +2334,18 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { } } + private class ModifyTableProcedureBiConsumer extends TableProcedureBiConsumer { + + ModifyTableProcedureBiConsumer(AsyncAdmin admin, TableName tableName) { + super(tableName); + } + + @Override + String getOperationType() { + return "ENABLE"; + } + } + private class DeleteTableProcedureBiConsumer extends TableProcedureBiConsumer { DeleteTableProcedureBiConsumer(TableName tableName) { @@ -3031,4 +3054,254 @@ public class RawAsyncHBaseAdmin implements AsyncAdmin { .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts) .startLogErrorsCnt(startLogErrorsCnt); } + + @Override + public CompletableFuture enableTableReplication(TableName tableName) { + if (tableName == null) { + return failedFuture(new IllegalArgumentException("Table name is null")); + } + CompletableFuture future = new CompletableFuture<>(); + tableExists(tableName).whenComplete( + (exist, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (!exist) { + future.completeExceptionally(new TableNotFoundException("Table '" + + tableName.getNameAsString() + "' does not exists.")); + return; + } + getTableSplits(tableName).whenComplete((splits, err1) -> { + if (err1 != null) { + future.completeExceptionally(err1); + } else { + checkAndSyncTableToPeerClusters(tableName, splits).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + setTableReplication(tableName, true).whenComplete((result3, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(result3); + } + }); + } + }); + } + }); + }); + return future; + } + + @Override + public CompletableFuture disableTableReplication(TableName tableName) { + if (tableName == null) { + return failedFuture(new IllegalArgumentException("Table name is null")); + } + CompletableFuture future = new CompletableFuture<>(); + tableExists(tableName).whenComplete( + (exist, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (!exist) { + future.completeExceptionally(new TableNotFoundException("Table '" + + tableName.getNameAsString() + "' does not exists.")); + return; + } + setTableReplication(tableName, false).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + }); + return future; + } + + private CompletableFuture getTableSplits(TableName tableName) { + CompletableFuture future = new CompletableFuture<>(); + getTableRegions(tableName).whenComplete((regions, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (regions.size() == 1) { + future.complete(null); + } else { + byte[][] splits = new byte[regions.size() - 1][]; + for (int i = 1; i < regions.size(); i++) { + splits[i - 1] = regions.get(i).getStartKey(); + } + future.complete(splits); + } + }); + return future; + } + + /** + * Connect to peer and check the table descriptor on peer: + *
      + *
    1. Create the same table on peer when not exist.
    2. + *
    3. Throw an exception if the table already has replication enabled on any of the column + * families.
    4. + *
    5. Throw an exception if the table exists on peer cluster but descriptors are not same.
    6. + *
    + * @param tableName name of the table to sync to the peer + * @param splits table split keys + */ + private CompletableFuture checkAndSyncTableToPeerClusters(TableName tableName, + byte[][] splits) { + CompletableFuture future = new CompletableFuture<>(); + listReplicationPeers().whenComplete( + (peers, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (peers == null || peers.size() <= 0) { + future.completeExceptionally(new IllegalArgumentException( + "Found no peer cluster for replication.")); + return; + } + List> futures = new ArrayList<>(); + peers.stream().filter(peer -> peer.getPeerConfig().needToReplicate(tableName)) + .forEach(peer -> { + futures.add(trySyncTableToPeerCluster(tableName, splits, peer)); + }); + CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])) + .whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + }); + return future; + } + + private CompletableFuture trySyncTableToPeerCluster(TableName tableName, byte[][] splits, + ReplicationPeerDescription peer) { + Configuration peerConf = null; + try { + peerConf = + ReplicationPeerConfigUtil + .getPeerClusterConfiguration(connection.getConfiguration(), peer); + } catch (IOException e) { + return failedFuture(e); + } + CompletableFuture future = new CompletableFuture<>(); + ConnectionFactory.createAsyncConnection(peerConf).whenComplete( + (conn, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + getTableDescriptor(tableName).whenComplete( + (tableDesc, err1) -> { + if (err1 != null) { + future.completeExceptionally(err1); + return; + } + AsyncAdmin peerAdmin = conn.getAdmin(); + peerAdmin.tableExists(tableName).whenComplete( + (exist, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + return; + } + if (!exist) { + CompletableFuture createTableFuture = null; + if (splits == null) { + createTableFuture = peerAdmin.createTable(tableDesc); + } else { + createTableFuture = peerAdmin.createTable(tableDesc, splits); + } + createTableFuture.whenComplete( + (result, err3) -> { + if (err3 != null) { + future.completeExceptionally(err3); + } else { + future.complete(result); + } + }); + } else { + compareTableWithPeerCluster(tableName, tableDesc, peer, peerAdmin).whenComplete( + (result, err4) -> { + if (err4 != null) { + future.completeExceptionally(err4); + } else { + future.complete(result); + } + }); + } + }); + }); + }); + return future; + } + + private CompletableFuture compareTableWithPeerCluster(TableName tableName, + TableDescriptor tableDesc, ReplicationPeerDescription peer, AsyncAdmin peerAdmin) { + CompletableFuture future = new CompletableFuture<>(); + peerAdmin.getTableDescriptor(tableName).whenComplete( + (peerTableDesc, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (peerTableDesc == null) { + future.completeExceptionally(new IllegalArgumentException( + "Failed to get table descriptor for table " + tableName.getNameAsString() + + " from peer cluster " + peer.getPeerId())); + return; + } + if (TableDescriptor.COMPARATOR_IGNORE_REPLICATION.compare(peerTableDesc, tableDesc) != 0) { + future.completeExceptionally(new IllegalArgumentException("Table " + + tableName.getNameAsString() + " exists in peer cluster " + peer.getPeerId() + + ", but the table descriptors are not same when compared with source cluster." + + " Thus can not enable the table's replication switch.")); + return; + } + future.complete(null); + }); + return future; + } + + /** + * Set the table's replication switch if the table's replication switch is already not set. + * @param tableName name of the table + * @param enableRep is replication switch enable or disable + */ + private CompletableFuture setTableReplication(TableName tableName, boolean enableRep) { + CompletableFuture future = new CompletableFuture<>(); + getTableDescriptor(tableName).whenComplete( + (tableDesc, err) -> { + if (err != null) { + future.completeExceptionally(err); + return; + } + if (!tableDesc.matchReplicationScope(enableRep)) { + int scope = + enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL : HConstants.REPLICATION_SCOPE_LOCAL; + TableDescriptor newTableDesc = + TableDescriptorBuilder.newBuilder(tableDesc).setReplicationScope(scope).build(); + modifyTable(newTableDesc).whenComplete((result, err2) -> { + if (err2 != null) { + future.completeExceptionally(err2); + } else { + future.complete(result); + } + }); + } else { + future.complete(null); + } + }); + return future; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java index 4e2deedd73a..f485c4e93db 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java @@ -24,10 +24,11 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Map; import java.util.Set; -import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Bytes; - +import org.apache.yetus.audience.InterfaceAudience; /** * TableDescriptor contains the details about an HBase table such as the descriptors of @@ -39,8 +40,15 @@ import org.apache.hadoop.hbase.util.Bytes; public interface TableDescriptor { @InterfaceAudience.Private - static final Comparator COMPARATOR - = (TableDescriptor lhs, TableDescriptor rhs) -> { + Comparator COMPARATOR = getComparator(ColumnFamilyDescriptor.COMPARATOR); + + @InterfaceAudience.Private + Comparator COMPARATOR_IGNORE_REPLICATION = + getComparator(ColumnFamilyDescriptor.COMPARATOR_IGNORE_REPLICATION); + + static Comparator + getComparator(Comparator cfComparator) { + return (TableDescriptor lhs, TableDescriptor rhs) -> { int result = lhs.getTableName().compareTo(rhs.getTableName()); if (result != 0) { return result; @@ -52,16 +60,17 @@ public interface TableDescriptor { return result; } - for (Iterator it = lhsFamilies.iterator(), - it2 = rhsFamilies.iterator(); it.hasNext();) { - result = ColumnFamilyDescriptor.COMPARATOR.compare(it.next(), it2.next()); + for (Iterator it = lhsFamilies.iterator(), it2 = + rhsFamilies.iterator(); it.hasNext();) { + result = cfComparator.compare(it.next(), it2.next()); if (result != 0) { return result; } } // punt on comparison for ordering, just calculate difference return Integer.compare(lhs.getValues().hashCode(), rhs.getValues().hashCode()); - }; + }; + } /** * Returns the count of the column families of the table. @@ -266,4 +275,30 @@ public interface TableDescriptor { */ boolean isReadOnly(); + /** + * Check if the table's cfs' replication scope matched with the replication state + * @param enabled replication state + * @return true if matched, otherwise false + */ + default boolean matchReplicationScope(boolean enabled) { + boolean hasEnabled = false; + boolean hasDisabled = false; + + for (ColumnFamilyDescriptor cf : getColumnFamilies()) { + if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL + && cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) { + hasDisabled = true; + } else { + hasEnabled = true; + } + } + + if (hasEnabled && hasDisabled) { + return false; + } + if (hasEnabled) { + return enabled; + } + return !enabled; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java index 7bde1c11bcf..ef593113245 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java @@ -33,18 +33,19 @@ import java.util.TreeSet; import java.util.function.Function; import java.util.regex.Matcher; import java.util.stream.Stream; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.Coprocessor; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.yetus.audience.InterfaceAudience; /** * @since 2.0.0 @@ -409,6 +410,24 @@ public class TableDescriptorBuilder { return this; } + /** + * Sets replication scope all & only the columns already in the builder. Columns added later won't + * be backfilled with replication scope. + * @param scope replication scope + * @return a TableDescriptorBuilder + */ + public TableDescriptorBuilder setReplicationScope(int scope) { + Map newFamilies = new TreeMap<>(Bytes.BYTES_RAWCOMPARATOR); + newFamilies.putAll(desc.families); + newFamilies + .forEach((cf, cfDesc) -> { + desc.removeColumnFamily(cf); + desc.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cfDesc).setScope(scope) + .build()); + }); + return this; + } + public TableDescriptor build() { return new ModifyableTableDescriptor(desc); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java index 39f2045935c..5a5913c06a6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java @@ -141,7 +141,7 @@ public class ReplicationAdmin implements Closeable { * */ @Deprecated public static Map> parseTableCFsFromConfig(String tableCFsConfig) { - return ReplicationSerDeHelper.parseTableCFsFromConfig(tableCFsConfig); + return ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCFsConfig); } /** @@ -228,7 +228,7 @@ public class ReplicationAdmin implements Closeable { @Deprecated public String getPeerTableCFs(String id) throws IOException { ReplicationPeerConfig peerConfig = admin.getReplicationPeerConfig(id); - return ReplicationSerDeHelper.convertToString(peerConfig.getTableCFsMap()); + return ReplicationPeerConfigUtil.convertToString(peerConfig.getTableCFsMap()); } /** @@ -243,7 +243,7 @@ public class ReplicationAdmin implements Closeable { @Deprecated public void appendPeerTableCFs(String id, String tableCfs) throws ReplicationException, IOException { - appendPeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs)); + appendPeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs)); } /** @@ -300,7 +300,7 @@ public class ReplicationAdmin implements Closeable { @Deprecated public void removePeerTableCFs(String id, String tableCf) throws ReplicationException, IOException { - removePeerTableCFs(id, ReplicationSerDeHelper.parseTableCFsFromConfig(tableCf)); + removePeerTableCFs(id, ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCf)); } /** diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java similarity index 90% rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java rename to hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java index 986a09f9d53..be468ae1007 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationSerDeHelper.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java @@ -19,13 +19,14 @@ package org.apache.hadoop.hbase.client.replication; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString; import org.apache.hadoop.hbase.shaded.com.google.protobuf.UnsafeByteOperations; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.CompoundConfiguration; +import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; @@ -35,8 +36,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Strings; - import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; import java.io.IOException; import java.util.Collection; @@ -52,11 +54,11 @@ import java.util.Set; */ @InterfaceAudience.Private @InterfaceStability.Stable -public final class ReplicationSerDeHelper { +public final class ReplicationPeerConfigUtil { - private static final Log LOG = LogFactory.getLog(ReplicationSerDeHelper.class); + private static final Log LOG = LogFactory.getLog(ReplicationPeerConfigUtil.class); - private ReplicationSerDeHelper() {} + private ReplicationPeerConfigUtil() {} public static String convertToString(Set namespaces) { if (namespaces == null) { @@ -200,7 +202,7 @@ public final class ReplicationSerDeHelper { if (bytes == null) { return null; } - return ReplicationSerDeHelper.convert(Bytes.toString(bytes)); + return ReplicationPeerConfigUtil.convert(Bytes.toString(bytes)); } /** @@ -299,7 +301,8 @@ public final class ReplicationSerDeHelper { } public static ReplicationProtos.ReplicationPeer convert(ReplicationPeerConfig peerConfig) { - ReplicationProtos.ReplicationPeer.Builder builder = ReplicationProtos.ReplicationPeer.newBuilder(); + ReplicationProtos.ReplicationPeer.Builder builder = + ReplicationProtos.ReplicationPeer.newBuilder(); if (peerConfig.getClusterKey() != null) { builder.setClusterkey(peerConfig.getClusterKey()); } @@ -359,8 +362,8 @@ public final class ReplicationSerDeHelper { public static ReplicationProtos.ReplicationPeerDescription toProtoReplicationPeerDescription( ReplicationPeerDescription desc) { - ReplicationProtos.ReplicationPeerDescription.Builder builder = ReplicationProtos.ReplicationPeerDescription - .newBuilder(); + ReplicationProtos.ReplicationPeerDescription.Builder builder = + ReplicationProtos.ReplicationPeerDescription.newBuilder(); builder.setId(desc.getPeerId()); ReplicationProtos.ReplicationState.Builder stateBuilder = ReplicationProtos.ReplicationState .newBuilder(); @@ -430,8 +433,36 @@ public final class ReplicationSerDeHelper { + " which has specified cfs from table-cfs config in peer: " + id); } } else { - throw new ReplicationException("No table: " + table + " in table-cfs config of peer: " + id); + throw new ReplicationException("No table: " + + table + " in table-cfs config of peer: " + id); } } } -} + + /** + * Returns the configuration needed to talk to the remote slave cluster. + * @param conf the base configuration + * @param peer the description of replication peer + * @return the configuration for the peer cluster, null if it was unable to get the configuration + * @throws IOException when create peer cluster configuration failed + */ + public static Configuration getPeerClusterConfiguration(Configuration conf, + ReplicationPeerDescription peer) throws IOException { + ReplicationPeerConfig peerConfig = peer.getPeerConfig(); + Configuration otherConf; + try { + otherConf = HBaseConfiguration.createClusterConf(conf, peerConfig.getClusterKey()); + } catch (IOException e) { + throw new IOException("Can't get peer configuration for peerId=" + peer.getPeerId(), e); + } + + if (!peerConfig.getConfiguration().isEmpty()) { + CompoundConfiguration compound = new CompoundConfiguration(); + compound.add(otherConf); + compound.addStringMap(peerConfig.getConfiguration()); + return compound; + } + + return otherConf; + } +} \ No newline at end of file diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java index bdd6e747a51..4d429c9896e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java @@ -123,4 +123,24 @@ public class ReplicationPeerConfig { builder.append("bandwidth=").append(bandwidth); return builder.toString(); } + + /** + * Decide whether the table need replicate to the peer cluster + * @param table name of the table + * @return true if the table need replicate to the peer cluster + */ + public boolean needToReplicate(TableName table) { + // If null means user has explicitly not configured any namespaces and table CFs + // so all the tables data are applicable for replication + if (namespaces == null && tableCFsMap == null) { + return true; + } + if (namespaces != null && namespaces.contains(table.getNamespaceAsString())) { + return true; + } + if (tableCFsMap != null && tableCFsMap.containsKey(table)) { + return true; + } + return false; + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java index 4558deb3054..9eff1143cba 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java @@ -48,7 +48,7 @@ import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TableDescriptor; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.filter.ByteArrayComparable; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -1621,7 +1621,7 @@ public final class RequestConverter { String peerId, ReplicationPeerConfig peerConfig) { AddReplicationPeerRequest.Builder builder = AddReplicationPeerRequest.newBuilder(); builder.setPeerId(peerId); - builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig)); + builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); return builder.build(); } @@ -1658,7 +1658,7 @@ public final class RequestConverter { UpdateReplicationPeerConfigRequest.Builder builder = UpdateReplicationPeerConfigRequest .newBuilder(); builder.setPeerId(peerId); - builder.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig)); + builder.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); return builder.build(); } diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java index 2de61cba2ba..8f09479aa71 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java @@ -30,14 +30,14 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NodeExistsException; @@ -114,7 +114,7 @@ public class ReplicationPeerZKImpl extends ReplicationStateZKBase try { byte[] data = peerConfigTracker.getData(false); if (data != null) { - this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data); + this.peerConfig = ReplicationPeerConfigUtil.parsePeerFrom(data); } } catch (DeserializationException e) { LOG.error("", e); diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java index 0f39b2afa44..cc84c1d89ce 100644 --- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java +++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java @@ -35,8 +35,7 @@ import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.CompoundConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState; @@ -46,6 +45,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; /** @@ -131,7 +131,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re List listOfOps = new ArrayList<>(2); ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id), - ReplicationSerDeHelper.toByteArray(peerConfig)); + ReplicationPeerConfigUtil.toByteArray(peerConfig)); // b/w PeerWatcher and ReplicationZookeeper#add method to create the // peer-state znode. This happens while adding a peer // The peer state data is set as "ENABLED" by default. @@ -206,9 +206,9 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } rpc.setTableCFsMap(tableCFs); ZKUtil.setData(this.zookeeper, getPeerNode(id), - ReplicationSerDeHelper.toByteArray(rpc)); + ReplicationPeerConfigUtil.toByteArray(rpc)); LOG.info("Peer tableCFs with id= " + id + " is now " + - ReplicationSerDeHelper.convertToString(tableCFs)); + ReplicationPeerConfigUtil.convertToString(tableCFs)); } catch (KeeperException e) { throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e); } @@ -303,7 +303,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re } try { - return ReplicationSerDeHelper.parsePeerFrom(data); + return ReplicationPeerConfigUtil.parsePeerFrom(data); } catch (DeserializationException e) { LOG.warn("Failed to parse cluster key from peerId=" + peerId + ", specifically the content from the following znode: " + znode); @@ -372,7 +372,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re try { ZKUtil.setData(this.zookeeper, getPeerNode(id), - ReplicationSerDeHelper.toByteArray(existingConfig)); + ReplicationPeerConfigUtil.toByteArray(existingConfig)); } catch(KeeperException ke){ throw new ReplicationException("There was a problem trying to save changes to the " + diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index 3c751f7fc57..2e3df2d3fab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.TableDescriptor; import org.apache.hadoop.hbase.client.TableState; import org.apache.hadoop.hbase.client.VersionInfoUtil; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.errorhandling.ForeignException; import org.apache.hadoop.hbase.exceptions.UnknownProtocolException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; @@ -1809,7 +1809,7 @@ public class MasterRpcServices extends RSRpcServices AddReplicationPeerRequest request) throws ServiceException { try { master.addReplicationPeer(request.getPeerId(), - ReplicationSerDeHelper.convert(request.getPeerConfig())); + ReplicationPeerConfigUtil.convert(request.getPeerConfig())); return AddReplicationPeerResponse.newBuilder().build(); } catch (ReplicationException | IOException e) { throw new ServiceException(e); @@ -1858,7 +1858,7 @@ public class MasterRpcServices extends RSRpcServices String peerId = request.getPeerId(); ReplicationPeerConfig peerConfig = master.getReplicationPeerConfig(peerId); response.setPeerId(peerId); - response.setPeerConfig(ReplicationSerDeHelper.convert(peerConfig)); + response.setPeerConfig(ReplicationPeerConfigUtil.convert(peerConfig)); } catch (ReplicationException | IOException e) { throw new ServiceException(e); } @@ -1870,7 +1870,7 @@ public class MasterRpcServices extends RSRpcServices UpdateReplicationPeerConfigRequest request) throws ServiceException { try { master.updateReplicationPeerConfig(request.getPeerId(), - ReplicationSerDeHelper.convert(request.getPeerConfig())); + ReplicationPeerConfigUtil.convert(request.getPeerConfig())); return UpdateReplicationPeerConfigResponse.newBuilder().build(); } catch (ReplicationException | IOException e) { throw new ServiceException(e); @@ -1885,7 +1885,7 @@ public class MasterRpcServices extends RSRpcServices List peers = master .listReplicationPeers(request.hasRegex() ? request.getRegex() : null); for (ReplicationPeerDescription peer : peers) { - response.addPeerDesc(ReplicationSerDeHelper.toProtoReplicationPeerDescription(peer)); + response.addPeerDesc(ReplicationPeerConfigUtil.toProtoReplicationPeerDescription(peer)); } } catch (ReplicationException | IOException e) { throw new ServiceException(e); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java index 0585c97c3f1..d094d1cd77f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/TableCFsUpdater.java @@ -23,15 +23,15 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.yetus.audience.InterfaceAudience; -import org.apache.yetus.audience.InterfaceStability; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; import org.apache.zookeeper.KeeperException; import java.io.IOException; @@ -79,12 +79,12 @@ public class TableCFsUpdater extends ReplicationStateZKBase { // we copy TableCFs node into PeerNode LOG.info("copy tableCFs into peerNode:" + peerId); ReplicationProtos.TableCF[] tableCFs = - ReplicationSerDeHelper.parseTableCFs( + ReplicationPeerConfigUtil.parseTableCFs( ZKUtil.getData(this.zookeeper, tableCFsNode)); if (tableCFs != null && tableCFs.length > 0) { - rpc.setTableCFsMap(ReplicationSerDeHelper.convert2Map(tableCFs)); + rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs)); ZKUtil.setData(this.zookeeper, peerNode, - ReplicationSerDeHelper.toByteArray(rpc)); + ReplicationPeerConfigUtil.toByteArray(rpc)); } } else { LOG.info("No tableCFs in peerNode:" + peerId); @@ -113,7 +113,7 @@ public class TableCFsUpdater extends ReplicationStateZKBase { return null; } try { - return ReplicationSerDeHelper.parsePeerFrom(data); + return ReplicationPeerConfigUtil.parsePeerFrom(data); } catch (DeserializationException e) { LOG.warn("Failed to parse cluster key from peer=" + peerNode); return null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java index 3e577bcf556..e48907849d7 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java @@ -41,10 +41,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.junit.BeforeClass; -import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java new file mode 100644 index 00000000000..bf6005366fd --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ForkJoinPool; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.TableNotFoundException; +import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * Class to test asynchronous replication admin operations when more than 1 cluster + */ +@RunWith(Parameterized.class) +@Category({LargeTests.class, ClientTests.class}) +public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase { + + private final static String ID_SECOND = "2"; + + private static HBaseTestingUtility TEST_UTIL2; + private static Configuration conf2; + private static AsyncAdmin admin2; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0); + TEST_UTIL.startMiniCluster(); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + + conf2 = HBaseConfiguration.create(TEST_UTIL.getConfiguration()); + conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2"); + TEST_UTIL2 = new HBaseTestingUtility(conf2); + TEST_UTIL2.startMiniCluster(); + admin2 = + ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin(); + + ReplicationPeerConfig rpc = new ReplicationPeerConfig(); + rpc.setClusterKey(TEST_UTIL2.getClusterKey()); + ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join(); + } + + @After + public void tearDown() throws Exception { + Pattern pattern = Pattern.compile(tableName.getNameAsString() + ".*"); + cleanupTables(admin, pattern); + cleanupTables(admin2, pattern); + } + + private void cleanupTables(AsyncAdmin admin, Pattern pattern) { + admin.listTableNames(pattern, false).whenCompleteAsync((tables, err) -> { + if (tables != null) { + tables.forEach(table -> { + try { + admin.disableTable(table).join(); + } catch (Exception e) { + LOG.debug("Table: " + tableName + " already disabled, so just deleting it."); + } + admin.deleteTable(table).join(); + }); + } + }, ForkJoinPool.commonPool()).join(); + } + + private void createTableWithDefaultConf(AsyncAdmin admin, TableName tableName) { + TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)); + admin.createTable(builder.build()).join(); + } + + @Test + public void testEnableAndDisableTableReplication() throws Exception { + // default replication scope is local + createTableWithDefaultConf(tableName); + admin.enableTableReplication(tableName).join(); + TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get(); + for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { + assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); + } + + admin.disableTableReplication(tableName).join(); + tableDesc = admin.getTableDescriptor(tableName).get(); + for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { + assertEquals(HConstants.REPLICATION_SCOPE_LOCAL, fam.getScope()); + } + } + + @Test + public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception { + // Only create table in source cluster + createTableWithDefaultConf(tableName); + assertFalse(admin2.tableExists(tableName).get()); + admin.enableTableReplication(tableName).join(); + assertTrue(admin2.tableExists(tableName).get()); + } + + @Test + public void testEnableReplicationWhenTableDescriptorIsNotSameInClusters() throws Exception { + createTableWithDefaultConf(admin, tableName); + createTableWithDefaultConf(admin2, tableName); + TableDescriptorBuilder builder = + TableDescriptorBuilder.newBuilder(admin.getTableDescriptor(tableName).get()); + builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("newFamily")) + .build()); + admin2.disableTable(tableName).join(); + admin2.modifyTable(builder.build()).join(); + admin2.enableTable(tableName).join(); + + try { + admin.enableTableReplication(tableName).join(); + fail("Exception should be thrown if table descriptors in the clusters are not same."); + } catch (Exception ignored) { + // ok + } + + admin.disableTable(tableName).join(); + admin.modifyTable(builder.build()).join(); + admin.enableTable(tableName).join(); + admin.enableTableReplication(tableName).join(); + TableDescriptor tableDesc = admin.getTableDescriptor(tableName).get(); + for (ColumnFamilyDescriptor fam : tableDesc.getColumnFamilies()) { + assertEquals(HConstants.REPLICATION_SCOPE_GLOBAL, fam.getScope()); + } + } + + @Test + public void testDisableReplicationForNonExistingTable() throws Exception { + try { + admin.disableTableReplication(tableName).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof TableNotFoundException); + } + } + + @Test + public void testEnableReplicationForNonExistingTable() throws Exception { + try { + admin.enableTableReplication(tableName).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof TableNotFoundException); + } + } + + @Test + public void testDisableReplicationWhenTableNameAsNull() throws Exception { + try { + admin.disableTableReplication(null).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + @Test + public void testEnableReplicationWhenTableNameAsNull() throws Exception { + try { + admin.enableTableReplication(null).join(); + } catch (CompletionException e) { + assertTrue(e.getCause() instanceof IllegalArgumentException); + } + } + + /* + * Test enable table replication should create table only in user explicit specified table-cfs. + * HBASE-14717 + */ + @Test + public void testEnableReplicationForExplicitSetTableCfs() throws Exception { + TableName tableName2 = TableName.valueOf(tableName.getNameAsString() + "2"); + // Only create table in source cluster + createTableWithDefaultConf(tableName); + createTableWithDefaultConf(tableName2); + assertFalse("Table should not exists in the peer cluster", + admin2.tableExists(tableName).get()); + assertFalse("Table should not exists in the peer cluster", + admin2.tableExists(tableName2).get()); + + Map> tableCfs = new HashMap<>(); + tableCfs.put(tableName, null); + ReplicationPeerConfig rpc = admin.getReplicationPeerConfig(ID_SECOND).get(); + rpc.setTableCFsMap(tableCfs); + try { + // Only add tableName to replication peer config + admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); + admin.enableTableReplication(tableName2).join(); + assertFalse("Table should not be created if user has set table cfs explicitly for the " + + "peer and this is not part of that collection", admin2.tableExists(tableName2).get()); + + // Add tableName2 to replication peer config, too + tableCfs.put(tableName2, null); + rpc.setTableCFsMap(tableCfs); + admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); + admin.enableTableReplication(tableName2).join(); + assertTrue( + "Table should be created if user has explicitly added table into table cfs collection", + admin2.tableExists(tableName2).get()); + } finally { + rpc.setTableCFsMap(null); + admin.updateReplicationPeerConfig(ID_SECOND, rpc).join(); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java index a23b76ae0fc..62951ef4fa6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java @@ -241,7 +241,7 @@ public class TestReplicationAdmin { tableCFs.put(tableName1, null); admin.appendPeerTableCFs(ID_ONE, tableCFs); Map> result = - ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); + ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); assertEquals(1, result.size()); assertEquals(true, result.containsKey(tableName1)); assertNull(result.get(tableName1)); @@ -250,7 +250,7 @@ public class TestReplicationAdmin { tableCFs.clear(); tableCFs.put(tableName2, null); admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); + result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); assertEquals(2, result.size()); assertTrue("Should contain t1", result.containsKey(tableName1)); assertTrue("Should contain t2", result.containsKey(tableName2)); @@ -262,7 +262,7 @@ public class TestReplicationAdmin { tableCFs.put(tableName3, new ArrayList<>()); tableCFs.get(tableName3).add("f1"); admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); + result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); assertEquals(3, result.size()); assertTrue("Should contain t1", result.containsKey(tableName1)); assertTrue("Should contain t2", result.containsKey(tableName2)); @@ -277,7 +277,7 @@ public class TestReplicationAdmin { tableCFs.get(tableName4).add("f1"); tableCFs.get(tableName4).add("f2"); admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); + result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); assertEquals(4, result.size()); assertTrue("Should contain t1", result.containsKey(tableName1)); assertTrue("Should contain t2", result.containsKey(tableName2)); @@ -299,7 +299,7 @@ public class TestReplicationAdmin { tableCFs.put(tableName5, new ArrayList<>()); tableCFs.get(tableName5).add("f1"); admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); + result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); assertEquals(5, result.size()); assertTrue("Should contain t5", result.containsKey(tableName5)); // null means replication all cfs of tab5 @@ -313,7 +313,7 @@ public class TestReplicationAdmin { tableCFs.clear(); tableCFs.put(tableName6, new ArrayList<>()); admin.appendPeerTableCFs(ID_ONE, tableCFs); - result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); + result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); assertEquals(6, result.size()); assertTrue("Should contain t6", result.containsKey(tableName6)); // null means replication all cfs of tab6 @@ -354,7 +354,7 @@ public class TestReplicationAdmin { } catch (ReplicationException e) { } Map> result = - ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); + ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); assertEquals(2, result.size()); assertTrue("Should contain t1", result.containsKey(tableName1)); assertTrue("Should contain t2", result.containsKey(tableName2)); @@ -373,7 +373,7 @@ public class TestReplicationAdmin { tableCFs.clear(); tableCFs.put(tableName1, null); admin.removePeerTableCFs(ID_ONE, tableCFs); - result = ReplicationSerDeHelper.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); + result = ReplicationPeerConfigUtil.parseTableCFsFromConfig(admin.getPeerTableCFs(ID_ONE)); assertEquals(1, result.size()); assertEquals(1, result.get(tableName2).size()); assertEquals("cf1", result.get(tableName2).get(0)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java index 3e499b2b306..6b7d36b682e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java @@ -58,7 +58,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor; @@ -528,7 +528,7 @@ public class TestMasterReplication { .getAdmin()) { admin.addReplicationPeer(id, new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()) - .setTableCFsMap(ReplicationSerDeHelper.parseTableCFsFromConfig(tableCfs))); + .setTableCFsMap(ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableCfs))); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java index abf2db3a1ac..6572404326b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestPerTableCFReplication.java @@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.replication.ReplicationAdmin; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.testclassification.FlakeyTests; @@ -187,13 +187,13 @@ public class TestPerTableCFReplication { Map> tabCFsMap = null; // 1. null or empty string, result should be null - tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(null); + tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(null); assertEquals(null, tabCFsMap); - tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(""); + tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(""); assertEquals(null, tabCFsMap); - tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(" "); + tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(" "); assertEquals(null, tabCFsMap); final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); @@ -201,20 +201,20 @@ public class TestPerTableCFReplication { final TableName tableName3 = TableName.valueOf(name.getMethodName() + "3"); // 2. single table: "tableName1" / "tableName2:cf1" / "tableName3:cf1,cf3" - tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName1.getNameAsString()); + tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1.getNameAsString()); assertEquals(1, tabCFsMap.size()); // only one table assertTrue(tabCFsMap.containsKey(tableName1)); // its table name is "tableName1" assertFalse(tabCFsMap.containsKey(tableName2)); // not other table assertEquals(null, tabCFsMap.get(tableName1)); // null cf-list, - tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName2 + ":cf1"); + tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName2 + ":cf1"); assertEquals(1, tabCFsMap.size()); // only one table assertTrue(tabCFsMap.containsKey(tableName2)); // its table name is "tableName2" assertFalse(tabCFsMap.containsKey(tableName1)); // not other table assertEquals(1, tabCFsMap.get(tableName2).size()); // cf-list contains only 1 cf assertEquals("cf1", tabCFsMap.get(tableName2).get(0));// the only cf is "cf1" - tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3"); + tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName3 + " : cf1 , cf3"); assertEquals(1, tabCFsMap.size()); // only one table assertTrue(tabCFsMap.containsKey(tableName3)); // its table name is "tableName2" assertFalse(tabCFsMap.containsKey(tableName1)); // not other table @@ -223,7 +223,7 @@ public class TestPerTableCFReplication { assertTrue(tabCFsMap.get(tableName3).contains("cf3"));// contains "cf3" // 3. multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" - tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig(tableName1 + " ; " + tableName2 + tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig(tableName1 + " ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,cf3"); // 3.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" assertEquals(3, tabCFsMap.size()); @@ -242,7 +242,7 @@ public class TestPerTableCFReplication { // 4. contiguous or additional ";"(table delimiter) or ","(cf delimiter) can be tolerated // still use the example of multiple tables: "tableName1 ; tableName2:cf1 ; tableName3:cf1,cf3" - tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig( + tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( tableName1 + " ; ; " + tableName2 + ":cf1 ; " + tableName3 + ":cf1,,cf3 ;"); // 4.1 contains 3 tables : "tableName1", "tableName2" and "tableName3" assertEquals(3, tabCFsMap.size()); @@ -261,7 +261,7 @@ public class TestPerTableCFReplication { // 5. invalid format "tableName1:tt:cf1 ; tableName2::cf1 ; tableName3:cf1,cf3" // "tableName1:tt:cf1" and "tableName2::cf1" are invalid and will be ignored totally - tabCFsMap = ReplicationSerDeHelper.parseTableCFsFromConfig( + tabCFsMap = ReplicationPeerConfigUtil.parseTableCFsFromConfig( tableName1 + ":tt:cf1 ; " + tableName2 + "::cf1 ; " + tableName3 + ":cf1,cf3"); // 5.1 no "tableName1" and "tableName2", only "tableName3" assertEquals(1, tabCFsMap.size()); // only one table @@ -281,10 +281,10 @@ public class TestPerTableCFReplication { Map> tabCFsMap = null; // 1. null or empty string, result should be null - assertNull(ReplicationSerDeHelper.convert(tabCFsMap)); + assertNull(ReplicationPeerConfigUtil.convert(tabCFsMap)); tabCFsMap = new HashMap<>(); - tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); assertEquals(0, tableCFs.length); final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); @@ -294,7 +294,7 @@ public class TestPerTableCFReplication { // 2. single table: "tab1" / "tab2:cf1" / "tab3:cf1,cf3" tabCFsMap.clear(); tabCFsMap.put(tableName1, null); - tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); assertEquals(1, tableCFs.length); // only one table assertEquals(tableName1.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); @@ -303,7 +303,7 @@ public class TestPerTableCFReplication { tabCFsMap.clear(); tabCFsMap.put(tableName2, new ArrayList<>()); tabCFsMap.get(tableName2).add("cf1"); - tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); assertEquals(1, tableCFs.length); // only one table assertEquals(tableName2.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); @@ -314,7 +314,7 @@ public class TestPerTableCFReplication { tabCFsMap.put(tableName3, new ArrayList<>()); tabCFsMap.get(tableName3).add("cf1"); tabCFsMap.get(tableName3).add("cf3"); - tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); assertEquals(1, tableCFs.length); assertEquals(tableName3.toString(), tableCFs[0].getTableName().getQualifier().toStringUtf8()); @@ -330,28 +330,28 @@ public class TestPerTableCFReplication { tabCFsMap.get(tableName3).add("cf1"); tabCFsMap.get(tableName3).add("cf3"); - tableCFs = ReplicationSerDeHelper.convert(tabCFsMap); + tableCFs = ReplicationPeerConfigUtil.convert(tabCFsMap); assertEquals(3, tableCFs.length); - assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tableName1.toString())); - assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tableName2.toString())); - assertNotNull(ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString())); + assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString())); + assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString())); + assertNotNull(ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString())); assertEquals(0, - ReplicationSerDeHelper.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount()); + ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName1.toString()).getFamiliesCount()); - assertEquals(1, - ReplicationSerDeHelper.getTableCF(tableCFs, tableName2.toString()).getFamiliesCount()); - assertEquals("cf1", - ReplicationSerDeHelper.getTableCF(tableCFs, tableName2.toString()).getFamilies(0).toStringUtf8()); + assertEquals(1, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()) + .getFamiliesCount()); + assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName2.toString()) + .getFamilies(0).toStringUtf8()); - assertEquals(2, - ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()).getFamiliesCount()); - assertEquals("cf1", - ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()).getFamilies(0).toStringUtf8()); - assertEquals("cf3", - ReplicationSerDeHelper.getTableCF(tableCFs, tableName3.toString()).getFamilies(1).toStringUtf8()); + assertEquals(2, ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) + .getFamiliesCount()); + assertEquals("cf1", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) + .getFamilies(0).toStringUtf8()); + assertEquals("cf3", ReplicationPeerConfigUtil.getTableCF(tableCFs, tableName3.toString()) + .getFamilies(1).toStringUtf8()); - tabCFsMap = ReplicationSerDeHelper.convert2Map(tableCFs); + tabCFsMap = ReplicationPeerConfigUtil.convert2Map(tableCFs); assertEquals(3, tabCFsMap.size()); assertTrue(tabCFsMap.containsKey(tableName1)); assertTrue(tabCFsMap.containsKey(tableName2)); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java index 8c604f46a86..1a0231748fb 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java @@ -26,7 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper; +import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -99,14 +99,15 @@ public class TestTableCFsUpdater extends TableCFsUpdater { ReplicationPeerConfig rpc = new ReplicationPeerConfig(); rpc.setClusterKey(zkw.getQuorum()); String peerNode = getPeerNode(peerId); - ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc)); + ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3; String tableCFsNode = getTableCFsNode(peerId); LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); - ReplicationPeerConfig actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + ReplicationPeerConfig actualRpc = + ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); String actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); @@ -117,14 +118,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater { rpc = new ReplicationPeerConfig(); rpc.setClusterKey(zkw.getQuorum()); peerNode = getPeerNode(peerId); - ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc)); + ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2"; tableCFsNode = getTableCFsNode(peerId); LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); - actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); @@ -135,14 +136,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater { rpc = new ReplicationPeerConfig(); rpc.setClusterKey(zkw.getQuorum()); peerNode = getPeerNode(peerId); - ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc)); + ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); tableCFs = ""; tableCFsNode = getTableCFsNode(peerId); LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); - actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); @@ -153,10 +154,10 @@ public class TestTableCFsUpdater extends TableCFsUpdater { rpc = new ReplicationPeerConfig(); rpc.setClusterKey(zkw.getQuorum()); peerNode = getPeerNode(peerId); - ZKUtil.createWithParents(zkw, peerNode, ReplicationSerDeHelper.toByteArray(rpc)); + ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); tableCFsNode = getTableCFsNode(peerId); - actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); @@ -167,7 +168,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater { peerId = "1"; peerNode = getPeerNode(peerId); - actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); Map> tableNameListMap = actualRpc.getTableCFsMap(); assertEquals(3, tableNameListMap.size()); @@ -184,7 +185,7 @@ public class TestTableCFsUpdater extends TableCFsUpdater { peerId = "2"; peerNode = getPeerNode(peerId); - actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); tableNameListMap = actualRpc.getTableCFsMap(); assertEquals(2, tableNameListMap.size()); @@ -198,14 +199,14 @@ public class TestTableCFsUpdater extends TableCFsUpdater { peerId = "3"; peerNode = getPeerNode(peerId); - actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); tableNameListMap = actualRpc.getTableCFsMap(); assertNull(tableNameListMap); peerId = "4"; peerNode = getPeerNode(peerId); - actualRpc = ReplicationSerDeHelper.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); + actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); tableNameListMap = actualRpc.getTableCFsMap(); assertNull(tableNameListMap);