HBASE-19661 Replace ReplicationStateZKBase with ZKReplicationStorageBase

This commit is contained in:
huzheng 2017-12-29 15:55:28 +08:00 committed by zhangduo
parent b84fbde175
commit 5655b3c0ca
13 changed files with 136 additions and 292 deletions

View File

@ -33,9 +33,8 @@ public class ReplicationFactory {
return new ReplicationPeers(zk, conf); return new ReplicationPeers(zk, conf);
} }
public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, Abortable abortable,
final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
Stoppable stopper) { Stoppable stopper) {
return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper); return new ReplicationTrackerZKImpl(zookeeper, abortable, stopper);
} }
} }

View File

@ -1,159 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
* This is a base class for maintaining replication state in zookeeper.
*/
@InterfaceAudience.Private
public abstract class ReplicationStateZKBase {
/**
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
* cluster.
*/
protected final String peerStateNodeName;
/** The name of the base znode that contains all replication state. */
protected final String replicationZNode;
/** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
protected final String peersZNode;
/** The name of the znode that contains all replication queues */
protected final String queuesZNode;
/** The name of the znode that contains queues of hfile references to be replicated */
protected final String hfileRefsZNode;
/** The cluster key of the local cluster */
protected final String ourClusterKey;
/** The name of the znode that contains tableCFs */
protected final String tableCFsNodeName;
protected final ZKWatcher zookeeper;
protected final Configuration conf;
protected final Abortable abortable;
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
public static final byte[] DISABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
"zookeeper.znode.replication.hfile.refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
public ReplicationStateZKBase(ZKWatcher zookeeper, Configuration conf,
Abortable abortable) {
this.zookeeper = zookeeper;
this.conf = conf;
this.abortable = abortable;
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
this.replicationZNode = ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode,
replicationZNodeName);
this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
}
/**
* Subclasses that use ZK explicitly can just call this directly while classes
* that are trying to hide internal details of storage can wrap the KeeperException
* into a ReplicationException or something else.
*/
protected List<String> getListOfReplicatorsZK() throws KeeperException {
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of replicators", e);
throw e;
}
return result;
}
/**
* @param state
* @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for
* use as content of a peer-state znode under a peer cluster id as in
* /hbase/replication/peers/PEER_ID/peer-state.
*/
protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) {
ReplicationProtos.ReplicationState msg =
ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
// There is no toByteArray on this pb Message?
// 32 bytes is default which seems fair enough here.
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16);
msg.writeTo(cos);
cos.flush();
baos.flush();
return ProtobufUtil.prependPBMagic(baos.toByteArray());
} catch (IOException e) {
throw new RuntimeException(e);
}
}
protected boolean peerExists(String id) throws KeeperException {
return ZKUtil.checkExists(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id)) >= 0;
}
/**
* Determine if a ZK path points to a peer node.
* @param path path to be checked
* @return true if the path points to a peer node, otherwise false
*/
protected boolean isPeerPath(String path) {
return path.split("/").length == peersZNode.split("/").length + 1;
}
@VisibleForTesting
protected String getTableCFsNode(String id) {
return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.tableCFsNodeName));
}
@VisibleForTesting
protected String getPeerStateNode(String id) {
return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.peerStateNodeName));
}
@VisibleForTesting
protected String getPeerNode(String id) {
return ZNodePaths.joinZNode(this.peersZNode, id);
}
}

View File

@ -20,14 +20,12 @@ package org.apache.hadoop.hbase.replication;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable; import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -38,9 +36,14 @@ import org.slf4j.LoggerFactory;
* interface. * interface.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker { public class ReplicationTrackerZKImpl implements ReplicationTracker {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class);
// Zookeeper
private final ZKWatcher zookeeper;
// Server to abort.
private final Abortable abortable;
// All about stopping // All about stopping
private final Stoppable stopper; private final Stoppable stopper;
// listeners to be notified // listeners to be notified
@ -48,9 +51,9 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
// List of all the other region servers in this cluster // List of all the other region servers in this cluster
private final ArrayList<String> otherRegionServers = new ArrayList<>(); private final ArrayList<String> otherRegionServers = new ArrayList<>();
public ReplicationTrackerZKImpl(ZKWatcher zookeeper, final ReplicationPeers replicationPeers, public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, Stoppable stopper) {
Configuration conf, Abortable abortable, Stoppable stopper) { this.zookeeper = zookeeper;
super(zookeeper, conf, abortable); this.abortable = abortable;
this.stopper = stopper; this.stopper = stopper;
this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper)); this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
// watch the changes // watch the changes

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@ -36,7 +37,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
* ZK based replication peer storage. * ZK based replication peer storage.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage { public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
implements ReplicationPeerStorage {
public static final String PEERS_ZNODE = "zookeeper.znode.replication.peers";
public static final String PEERS_ZNODE_DEFAULT = "peers";
public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state";
public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state";
public static final byte[] ENABLED_ZNODE_BYTES = public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED); toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
@ -56,16 +64,18 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) { public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf); super(zookeeper, conf);
this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state"); this.peerStateNodeName = conf.get(PEERS_STATE_ZNODE, PEERS_STATE_ZNODE_DEFAULT);
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers"); String peersZNodeName = conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT);
this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName); this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
} }
private String getPeerStateNode(String peerId) { @VisibleForTesting
public String getPeerStateNode(String peerId) {
return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName); return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName);
} }
private String getPeerNode(String peerId) { @VisibleForTesting
public String getPeerNode(String peerId) {
return ZNodePaths.joinZNode(peersZNode, peerId); return ZNodePaths.joinZNode(peersZNode, peerId);
} }
@ -82,8 +92,8 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)), enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
false); false);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" + throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>"
peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e); + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
} }
} }

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.ByteArrayOutputStream; import java.io.ByteArrayOutputStream;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths; import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@ -34,7 +35,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
* zookeeper. * zookeeper.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class ZKReplicationStorageBase { public class ZKReplicationStorageBase {
public static final String REPLICATION_ZNODE = "zookeeper.znode.replication";
public static final String REPLICATION_ZNODE_DEFAULT = "replication";
/** The name of the base znode that contains all replication state. */ /** The name of the base znode that contains all replication state. */
protected final String replicationZNode; protected final String replicationZNode;
@ -45,10 +49,9 @@ class ZKReplicationStorageBase {
protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) { protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) {
this.zookeeper = zookeeper; this.zookeeper = zookeeper;
this.conf = conf; this.conf = conf;
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
this.replicationZNode = this.replicationZNode = ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode,
ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName); conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT));
} }
/** /**

View File

@ -834,8 +834,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// This is for backwards compatibility // This is for backwards compatibility
// See HBASE-11393 // See HBASE-11393
status.setStatus("Update TableCFs node in ZNode"); status.setStatus("Update TableCFs node in ZNode");
ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zooKeeper, ReplicationPeerConfigUpgrader tableCFsUpdater =
conf, this.clusterConnection); new ReplicationPeerConfigUpgrader(zooKeeper, conf);
tableCFsUpdater.copyTableCFs(); tableCFsUpdater.copyTableCFs();
// Add the Observer to delete space quotas on table deletion before starting all CPs by // Add the Observer to delete space quotas on table deletion before starting all CPs by

View File

@ -18,48 +18,60 @@
*/ */
package org.apache.hadoop.hbase.replication.master; package org.apache.hadoop.hbase.replication.master;
import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE;
import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE_DEFAULT;
import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE;
import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase; import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability; import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/** /**
* This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. It will
* It will be removed in HBase 3.x. See HBASE-11393 * be removed in HBase 3.x. See HBASE-11393
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase { public class ReplicationPeerConfigUpgrader{
private static final String TABLE_CFS_ZNODE = "zookeeper.znode.replication.peers.tableCFs";
private static final String TABLE_CFS_ZNODE_DEFAULT = "tableCFs";
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class); private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class);
private final Configuration conf;
private final ZKWatcher zookeeper;
private final ReplicationPeerStorage peerStorage;
public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration conf) {
Configuration conf, Abortable abortable) { this.zookeeper = zookeeper;
super(zookeeper, conf, abortable); this.conf = conf;
this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
} }
public void upgrade() throws Exception { public void upgrade() throws Exception {
try (Connection conn = ConnectionFactory.createConnection(conf)) { try (Connection conn = ConnectionFactory.createConnection(conf)) {
Admin admin = conn.getAdmin(); Admin admin = conn.getAdmin();
admin.listReplicationPeers().forEach( admin.listReplicationPeers().forEach((peerDesc) -> {
(peerDesc) -> {
String peerId = peerDesc.getPeerId(); String peerId = peerDesc.getPeerId();
ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig(); ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty()) if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
@ -75,39 +87,38 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
} }
} }
public void copyTableCFs() { public void copyTableCFs() throws ReplicationException {
List<String> znodes = null; for (String peerId : peerStorage.listPeerIds()) {
try {
znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
} catch (KeeperException e) {
LOG.error("Failed to get peers znode", e);
}
if (znodes != null) {
for (String peerId : znodes) {
if (!copyTableCFs(peerId)) { if (!copyTableCFs(peerId)) {
LOG.error("upgrade tableCFs failed for peerId=" + peerId); LOG.error("upgrade tableCFs failed for peerId=" + peerId);
} }
} }
} }
@VisibleForTesting
protected String getTableCFsNode(String peerId) {
String replicationZNode = ZNodePaths.joinZNode(zookeeper.znodePaths.baseZNode,
conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT));
String peersZNode =
ZNodePaths.joinZNode(replicationZNode, conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT));
return ZNodePaths.joinZNode(peersZNode,
ZNodePaths.joinZNode(peerId, conf.get(TABLE_CFS_ZNODE, TABLE_CFS_ZNODE_DEFAULT)));
} }
public boolean copyTableCFs(String peerId) { public boolean copyTableCFs(String peerId) throws ReplicationException {
String tableCFsNode = getTableCFsNode(peerId); String tableCFsNode = getTableCFsNode(peerId);
try { try {
if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) { if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
String peerNode = getPeerNode(peerId); ReplicationPeerConfig rpc = peerStorage.getPeerConfig(peerId);
ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode);
// We only need to copy data from tableCFs node to rpc Node the first time hmaster start. // We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) { if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
// we copy TableCFs node into PeerNode // we copy TableCFs node into PeerNode
LOG.info("Copy table ColumnFamilies into peer=" + peerId); LOG.info("Copy table ColumnFamilies into peer=" + peerId);
ReplicationProtos.TableCF[] tableCFs = ReplicationProtos.TableCF[] tableCFs =
ReplicationPeerConfigUtil.parseTableCFs( ReplicationPeerConfigUtil.parseTableCFs(ZKUtil.getData(this.zookeeper, tableCFsNode));
ZKUtil.getData(this.zookeeper, tableCFsNode));
if (tableCFs != null && tableCFs.length > 0) { if (tableCFs != null && tableCFs.length > 0) {
rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs)); rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
ZKUtil.setData(this.zookeeper, peerNode, peerStorage.updatePeerConfig(peerId, rpc);
ReplicationPeerConfigUtil.toByteArray(rpc));
} }
} else { } else {
LOG.info("No tableCFs in peerNode:" + peerId); LOG.info("No tableCFs in peerNode:" + peerId);
@ -126,23 +137,6 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
return true; return true;
} }
private ReplicationPeerConfig getReplicationPeerConig(String peerNode)
throws KeeperException, InterruptedException {
byte[] data = null;
data = ZKUtil.getData(this.zookeeper, peerNode);
if (data == null) {
LOG.error("Could not get configuration for " +
"peer because it doesn't exist. peer=" + peerNode);
return null;
}
try {
return ReplicationPeerConfigUtil.parsePeerFrom(data);
} catch (DeserializationException e) {
LOG.warn("Failed to parse cluster key from peer=" + peerNode);
return null;
}
}
private static void printUsageAndExit() { private static void printUsageAndExit() {
System.err.printf( System.err.printf(
"Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader" "Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader"
@ -163,19 +157,17 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
printUsageAndExit(); printUsageAndExit();
} else if (args[0].equals("copyTableCFs")) { } else if (args[0].equals("copyTableCFs")) {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) {
try { ReplicationPeerConfigUpgrader tableCFsUpdater =
ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw, new ReplicationPeerConfigUpgrader(zkw, conf);
conf, null);
tableCFsUpdater.copyTableCFs(); tableCFsUpdater.copyTableCFs();
} finally {
zkw.close();
} }
} else if (args[0].equals("upgrade")) { } else if (args[0].equals("upgrade")) {
Configuration conf = HBaseConfiguration.create(); Configuration conf = HBaseConfiguration.create();
ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null); try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) {
ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null); ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf);
upgrader.upgrade(); upgrader.upgrade();
}
} else { } else {
printUsageAndExit(); printUsageAndExit();
} }

View File

@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.replication.ReplicationFactory; import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription; import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo; import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
@ -237,7 +236,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
LOG.info("Found [--distributed], will poll each RegionServer."); LOG.info("Found [--distributed], will poll each RegionServer.");
Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId()) Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId())
.collect(Collectors.toSet()); .collect(Collectors.toSet());
System.out.println(dumpQueues(connection, zkw, peerIds, opts.isHdfs())); System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
System.out.println(dumpReplicationSummary()); System.out.println(dumpReplicationSummary());
} else { } else {
// use ZK instead // use ZK instead
@ -301,18 +300,15 @@ public class DumpReplicationQueues extends Configured implements Tool {
return sb.toString(); return sb.toString();
} }
public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set<String> peerIds, public String dumpQueues(ZKWatcher zkw, Set<String> peerIds,
boolean hdfs) throws Exception { boolean hdfs) throws Exception {
ReplicationQueueStorage queueStorage; ReplicationQueueStorage queueStorage;
ReplicationPeers replicationPeers;
ReplicationTracker replicationTracker; ReplicationTracker replicationTracker;
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf()); queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
replicationPeers = replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(),
ReplicationFactory.getReplicationPeers(zkw, getConf()); new WarnOnlyStoppable());
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
new WarnOnlyAbortable(), new WarnOnlyStoppable());
Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers()); Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
// Loops each peer on each RS and dumps the queues // Loops each peer on each RS and dumps the queues
@ -330,11 +326,9 @@ public class DumpReplicationQueues extends Configured implements Tool {
List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId); List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
if (!peerIds.contains(queueInfo.getPeerId())) { if (!peerIds.contains(queueInfo.getPeerId())) {
deletedQueues.add(regionserver + "/" + queueId); deletedQueues.add(regionserver + "/" + queueId);
sb.append( sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
} else { } else {
sb.append( sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
} }
} }
} }

View File

@ -110,8 +110,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf); ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
this.replicationPeers.init(); this.replicationPeers.init();
this.replicationTracker = this.replicationTracker =
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers, ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server);
this.conf, this.server, this.server);
} catch (Exception e) { } catch (Exception e) {
throw new IOException("Failed replication handler create", e); throw new IOException("Failed replication handler create", e);
} }

View File

@ -328,8 +328,7 @@ public class HBaseFsck extends Configured implements Closeable {
* @throws MasterNotRunningException if the master is not running * @throws MasterNotRunningException if the master is not running
* @throws ZooKeeperConnectionException if unable to connect to ZooKeeper * @throws ZooKeeperConnectionException if unable to connect to ZooKeeper
*/ */
public HBaseFsck(Configuration conf) throws MasterNotRunningException, public HBaseFsck(Configuration conf) throws IOException, ClassNotFoundException {
ZooKeeperConnectionException, IOException, ClassNotFoundException {
this(conf, createThreadPool(conf)); this(conf, createThreadPool(conf));
} }

View File

@ -94,7 +94,8 @@ public class TestReplicationTrackerZKImpl {
ZKClusterId.setClusterId(zkw, new ClusterId()); ZKClusterId.setClusterId(zkw, new ClusterId());
rp = ReplicationFactory.getReplicationPeers(zkw, conf); rp = ReplicationFactory.getReplicationPeers(zkw, conf);
rp.init(); rp.init();
rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1)); rt = ReplicationFactory.getReplicationTracker(zkw, new DummyServer(fakeRs1),
new DummyServer(fakeRs1));
} catch (Exception e) { } catch (Exception e) {
fail("Exception during test setup: " + e); fail("Exception during test setup: " + e);
} }

View File

@ -29,14 +29,13 @@ import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil; import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.BeforeClass; import org.junit.BeforeClass;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -59,12 +58,19 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
private static ZKWatcher zkw = null; private static ZKWatcher zkw = null;
private static Abortable abortable = null; private static Abortable abortable = null;
private static ZKStorageUtil zkStorageUtil = null;
private static class ZKStorageUtil extends ZKReplicationPeerStorage {
public ZKStorageUtil(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
}
}
@Rule @Rule
public TestName name = new TestName(); public TestName name = new TestName();
public TestTableCFsUpdater() { public TestTableCFsUpdater() {
super(zkw, TEST_UTIL.getConfiguration(), abortable); super(zkw, TEST_UTIL.getConfiguration());
} }
@BeforeClass @BeforeClass
@ -83,6 +89,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
} }
}; };
zkw = new ZKWatcher(conf, "TableCFs", abortable, true); zkw = new ZKWatcher(conf, "TableCFs", abortable, true);
zkStorageUtil = new ZKStorageUtil(zkw, conf);
} }
@AfterClass @AfterClass
@ -91,8 +98,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
} }
@Test @Test
public void testUpgrade() throws KeeperException, InterruptedException, public void testUpgrade() throws Exception {
DeserializationException {
String peerId = "1"; String peerId = "1";
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1"); final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2"); final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
@ -100,13 +106,13 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
ReplicationPeerConfig rpc = new ReplicationPeerConfig(); ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum()); rpc.setClusterKey(zkw.getQuorum());
String peerNode = getPeerNode(peerId); String peerNode = zkStorageUtil.getPeerNode(peerId);
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3; String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3;
String tableCFsNode = getTableCFsNode(peerId); String tableCFsNode = getTableCFsNode(peerId);
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
ReplicationPeerConfig actualRpc = ReplicationPeerConfig actualRpc =
ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
@ -119,13 +125,13 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
peerId = "2"; peerId = "2";
rpc = new ReplicationPeerConfig(); rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum()); rpc.setClusterKey(zkw.getQuorum());
peerNode = getPeerNode(peerId); peerNode = zkStorageUtil.getPeerNode(peerId);
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2"; tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2";
tableCFsNode = getTableCFsNode(peerId); tableCFsNode = getTableCFsNode(peerId);
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
@ -137,13 +143,13 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
peerId = "3"; peerId = "3";
rpc = new ReplicationPeerConfig(); rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum()); rpc.setClusterKey(zkw.getQuorum());
peerNode = getPeerNode(peerId); peerNode = zkStorageUtil.getPeerNode(peerId);
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
tableCFs = ""; tableCFs = "";
tableCFsNode = getTableCFsNode(peerId); tableCFsNode = getTableCFsNode(peerId);
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId); LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs)); ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode)); actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
@ -155,7 +161,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
peerId = "4"; peerId = "4";
rpc = new ReplicationPeerConfig(); rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum()); rpc.setClusterKey(zkw.getQuorum());
peerNode = getPeerNode(peerId); peerNode = zkStorageUtil.getPeerNode(peerId);
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc)); ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
tableCFsNode = getTableCFsNode(peerId); tableCFsNode = getTableCFsNode(peerId);
@ -169,7 +175,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
copyTableCFs(); copyTableCFs();
peerId = "1"; peerId = "1";
peerNode = getPeerNode(peerId); peerNode = zkStorageUtil.getPeerNode(peerId);
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap(); Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
@ -184,9 +190,8 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
assertEquals("cf3", tableNameListMap.get(tableName2).get(0)); assertEquals("cf3", tableNameListMap.get(tableName2).get(0));
assertNull(tableNameListMap.get(tableName3)); assertNull(tableNameListMap.get(tableName3));
peerId = "2"; peerId = "2";
peerNode = getPeerNode(peerId); peerNode = zkStorageUtil.getPeerNode(peerId);
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
tableNameListMap = actualRpc.getTableCFsMap(); tableNameListMap = actualRpc.getTableCFsMap();
@ -200,19 +205,17 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
assertEquals("cf2", tableNameListMap.get(tableName2).get(0)); assertEquals("cf2", tableNameListMap.get(tableName2).get(0));
peerId = "3"; peerId = "3";
peerNode = getPeerNode(peerId); peerNode = zkStorageUtil.getPeerNode(peerId);
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
tableNameListMap = actualRpc.getTableCFsMap(); tableNameListMap = actualRpc.getTableCFsMap();
assertNull(tableNameListMap); assertNull(tableNameListMap);
peerId = "4"; peerId = "4";
peerNode = getPeerNode(peerId); peerNode = zkStorageUtil.getPeerNode(peerId);
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode)); actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey()); assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
tableNameListMap = actualRpc.getTableCFsMap(); tableNameListMap = actualRpc.getTableCFsMap();
assertNull(tableNameListMap); assertNull(tableNameListMap);
} }
} }

View File

@ -70,8 +70,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers; import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage; import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy; import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory; import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker; import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests; import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@ -171,9 +171,9 @@ public abstract class TestReplicationSourceManager {
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1")); + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state"); ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state", ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
ReplicationStateZKBase.ENABLED_ZNODE_BYTES); ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state"); ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES); ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKClusterId.setClusterId(zkw, new ClusterId()); ZKClusterId.setClusterId(zkw, new ClusterId());
FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir()); FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());