HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly

This commit is contained in:
zhangduo 2017-12-27 22:03:51 +08:00
parent 5fc90244a8
commit f4703c6ed3
24 changed files with 384 additions and 1571 deletions

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.replication;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable;
@ -32,12 +31,6 @@ public final class ReplicationFactory {
private ReplicationFactory() {
}
public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
throws Exception {
return (ReplicationQueues) ConstructorUtils.invokeConstructor(ReplicationQueuesZKImpl.class,
args);
}
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
Abortable abortable) {
return getReplicationPeers(zk, conf, null, abortable);

View File

@ -1,161 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.SortedSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.yetus.audience.InterfaceAudience;
/**
* This provides an interface for maintaining a region server's replication queues. These queues
* keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
* that still need to be replicated to remote clusters.
*/
@InterfaceAudience.Private
public interface ReplicationQueues {
/**
* Initialize the region server replication queue interface.
* @param serverName The server name of the region server that owns the replication queues this
* interface manages.
*/
void init(String serverName) throws ReplicationException;
/**
* Remove a replication queue.
* @param queueId a String that identifies the queue.
*/
void removeQueue(String queueId);
/**
* Add a new WAL file to the given queue. If the queue does not exist it is created.
* @param queueId a String that identifies the queue.
* @param filename name of the WAL
*/
void addLog(String queueId, String filename) throws ReplicationException;
/**
* Remove an WAL file from the given queue.
* @param queueId a String that identifies the queue.
* @param filename name of the WAL
*/
void removeLog(String queueId, String filename);
/**
* Set the current position for a specific WAL in a given queue.
* @param queueId a String that identifies the queue
* @param filename name of the WAL
* @param position the current position in the file
*/
void setLogPosition(String queueId, String filename, long position);
/**
* Get the current position for a specific WAL in a given queue.
* @param queueId a String that identifies the queue
* @param filename name of the WAL
* @return the current position in the file
*/
long getLogPosition(String queueId, String filename) throws ReplicationException;
/**
* Remove all replication queues for this region server.
*/
void removeAllQueues();
/**
* Get a list of all WALs in the given queue.
* @param queueId a String that identifies the queue
* @return a list of WALs, null if no such queue exists for this server
*/
List<String> getLogsInQueue(String queueId);
/**
* Get a list of all queues for this region server.
* @return a list of queueIds, an empty list if this region server is dead and has no outstanding
* queues
*/
List<String> getAllQueues();
/**
* Get queueIds from a dead region server, whose queues has not been claimed by other region
* servers.
* @return empty if the queue exists but no children, null if the queue does not exist.
*/
List<String> getUnClaimedQueueIds(String regionserver);
/**
* Take ownership for the queue identified by queueId and belongs to a dead region server.
* @param regionserver the id of the dead region server
* @param queueId the id of the queue
* @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
*/
Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
/**
* Remove the znode of region server if the queue is empty.
* @param regionserver the id of the region server
*/
void removeReplicatorIfQueueIsEmpty(String regionserver);
/**
* Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster.
* @return a list of server names
*/
List<String> getListOfReplicators();
/**
* Checks if the provided znode is the same as this region server's
* @param regionserver the id of the region server
* @return if this is this rs's znode
*/
boolean isThisOurRegionServer(String regionserver);
/**
* Add a peer to hfile reference queue if peer does not exist.
* @param peerId peer cluster id to be added
* @throws ReplicationException if fails to add a peer id to hfile reference queue
*/
void addPeerToHFileRefs(String peerId) throws ReplicationException;
/**
* Remove a peer from hfile reference queue.
* @param peerId peer cluster id to be removed
*/
void removePeerFromHFileRefs(String peerId);
/**
* Add new hfile references to the queue.
* @param peerId peer cluster id to which the hfiles need to be replicated
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
* will be added in the queue }
* @throws ReplicationException if fails to add a hfile reference
*/
void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
/**
* Remove hfile references from the queue.
* @param peerId peer cluster id from which this hfile references needs to be removed
* @param files list of hfile references to be removed
*/
void removeHFileRefs(String peerId, List<String> files);
}

View File

@ -1,70 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Wrapper around common arguments used to construct ReplicationQueues. Used to construct various
* ReplicationQueues Implementations with different constructor arguments by reflection.
*/
@InterfaceAudience.Private
public class ReplicationQueuesArguments {
private ZKWatcher zk;
private Configuration conf;
private Abortable abort;
public ReplicationQueuesArguments(Configuration conf, Abortable abort) {
this.conf = conf;
this.abort = abort;
}
public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZKWatcher zk) {
this(conf, abort);
setZk(zk);
}
public ZKWatcher getZk() {
return zk;
}
public void setZk(ZKWatcher zk) {
this.zk = zk;
}
public Configuration getConf() {
return conf;
}
public void setConf(Configuration conf) {
this.conf = conf;
}
public Abortable getAbortable() {
return abort;
}
public void setAbortable(Abortable abort) {
this.abort = abort;
}
}

View File

@ -1,408 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.ArrayList;
import java.util.List;
import java.util.SortedSet;
import java.util.TreeSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class provides an implementation of the
* interface using ZooKeeper. The
* base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
* all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is
* the regionserver name (a concatenation of the region servers hostname, client port and start
* code). For example:
*
* /hbase/replication/rs/hostname.example.org,6020,1234
*
* Within this znode, the region server maintains a set of WAL replication queues. These queues are
* represented by child znodes named using there give queue id. For example:
*
* /hbase/replication/rs/hostname.example.org,6020,1234/1
* /hbase/replication/rs/hostname.example.org,6020,1234/2
*
* Each queue has one child znode for every WAL that still needs to be replicated. The value of
* these WAL child znodes is the latest position that has been replicated. This position is updated
* every time a WAL entry is replicated. For example:
*
* /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
*/
@InterfaceAudience.Private
public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
/** Znode containing all replication queues for this region server. */
private String myQueuesZnode;
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);
public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
this(args.getZk(), args.getConf(), args.getAbortable());
}
public ReplicationQueuesZKImpl(final ZKWatcher zk, Configuration conf,
Abortable abortable) {
super(zk, conf, abortable);
}
@Override
public void init(String serverName) throws ReplicationException {
this.myQueuesZnode = ZNodePaths.joinZNode(this.queuesZNode, serverName);
try {
if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
}
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize replication queues.", e);
}
if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
try {
if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) {
ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
}
} catch (KeeperException e) {
throw new ReplicationException("Could not initialize hfile references replication queue.",
e);
}
}
}
@Override
public void removeQueue(String queueId) {
try {
ZKUtil.deleteNodeRecursively(this.zookeeper,
ZNodePaths.joinZNode(this.myQueuesZnode, queueId));
} catch (KeeperException e) {
this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
}
}
@Override
public void addLog(String queueId, String filename) throws ReplicationException {
String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
znode = ZNodePaths.joinZNode(znode, filename);
try {
ZKUtil.createWithParents(this.zookeeper, znode);
} catch (KeeperException e) {
throw new ReplicationException(
"Could not add log because znode could not be created. queueId=" + queueId
+ ", filename=" + filename);
}
}
@Override
public void removeLog(String queueId, String filename) {
try {
String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
znode = ZNodePaths.joinZNode(znode, filename);
ZKUtil.deleteNode(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
+ filename + ")", e);
}
}
@Override
public void setLogPosition(String queueId, String filename, long position) {
try {
String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
znode = ZNodePaths.joinZNode(znode, filename);
// Why serialize String of Long and not Long as bytes?
ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
} catch (KeeperException e) {
this.abortable.abort("Failed to write replication wal position (filename=" + filename
+ ", position=" + position + ")", e);
}
}
@Override
public long getLogPosition(String queueId, String filename) throws ReplicationException {
String clusterZnode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
String znode = ZNodePaths.joinZNode(clusterZnode, filename);
byte[] bytes = null;
try {
bytes = ZKUtil.getData(this.zookeeper, znode);
} catch (KeeperException e) {
throw new ReplicationException("Internal Error: could not get position in log for queueId="
+ queueId + ", filename=" + filename, e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return 0;
}
try {
return ZKUtil.parseWALPositionFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
+ " znode content, continuing.");
}
// if we can not parse the position, start at the beginning of the wal file
// again
return 0;
}
@Override
public boolean isThisOurRegionServer(String regionserver) {
return ZNodePaths.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode);
}
@Override
public List<String> getUnClaimedQueueIds(String regionserver) {
if (isThisOurRegionServer(regionserver)) {
return null;
}
String rsZnodePath = ZNodePaths.joinZNode(this.queuesZNode, regionserver);
List<String> queues = null;
try {
queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath);
} catch (KeeperException e) {
this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e);
}
return queues;
}
@Override
public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue");
return moveQueueUsingMulti(regionserver, queueId);
}
@Override
public void removeReplicatorIfQueueIsEmpty(String regionserver) {
String rsPath = ZNodePaths.joinZNode(this.queuesZNode, regionserver);
try {
List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath);
if (list != null && list.isEmpty()){
ZKUtil.deleteNode(this.zookeeper, rsPath);
}
} catch (KeeperException e) {
LOG.warn("Got error while removing replicator", e);
}
}
@Override
public void removeAllQueues() {
try {
ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
} catch (KeeperException e) {
// if the znode is already expired, don't bother going further
if (e instanceof KeeperException.SessionExpiredException) {
return;
}
this.abortable.abort("Failed to delete replication queues for region server: "
+ this.myQueuesZnode, e);
}
}
@Override
public List<String> getLogsInQueue(String queueId) {
String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
}
return result;
}
@Override
public List<String> getAllQueues() {
List<String> listOfQueues = null;
try {
listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get a list of queues for region server: "
+ this.myQueuesZnode, e);
}
return listOfQueues == null ? new ArrayList<>() : listOfQueues;
}
/**
* It "atomically" copies one peer's wals queue from another dead region server and returns them
* all sorted. The new peer id is equal to the old peer id appended with the dead server's znode.
* @param znode pertaining to the region server to copy the queues from
* @peerId peerId pertaining to the queue need to be copied
*/
private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
try {
// hbase/replication/rs/deadrs
String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode);
List<ZKUtilOp> listOfOps = new ArrayList<>();
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
String newPeerId = peerId + "-" + znode;
String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId);
// check the logs queue for the old peer cluster
String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId);
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
if (!peerExists(replicationQueueInfo.getPeerId())) {
LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
" didn't exist, will move its queue to avoid the failure of multi op");
for (String wal : wals) {
String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
}
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
return null;
}
SortedSet<String> logQueue = new TreeSet<>();
if (wals == null || wals.isEmpty()) {
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
} else {
// create the new cluster znode
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
listOfOps.add(op);
// get the offset of the logs and set it to new znodes
for (String wal : wals) {
String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal);
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
logQueue.add(wal);
}
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
if (LOG.isTraceEnabled()) {
LOG.trace(" The multi list size is: " + listOfOps.size());
}
}
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
return new Pair<>(newPeerId, logQueue);
} catch (KeeperException e) {
// Multi call failed; it looks like some other regionserver took away the logs.
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
} catch (InterruptedException e) {
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
Thread.currentThread().interrupt();
}
return null;
}
@Override
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
throws ReplicationException {
String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
boolean debugEnabled = LOG.isDebugEnabled();
if (debugEnabled) {
LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
}
int size = pairs.size();
List<ZKUtilOp> listOfOps = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
listOfOps.add(ZKUtilOp.createAndFailSilent(
ZNodePaths.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
HConstants.EMPTY_BYTE_ARRAY));
}
if (debugEnabled) {
LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
+ " is " + listOfOps.size());
}
try {
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
} catch (KeeperException e) {
throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
}
}
@Override
public void removeHFileRefs(String peerId, List<String> files) {
String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
boolean debugEnabled = LOG.isDebugEnabled();
if (debugEnabled) {
LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
}
int size = files.size();
List<ZKUtilOp> listOfOps = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZNodePaths.joinZNode(peerZnode, files.get(i))));
}
if (debugEnabled) {
LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
+ " is " + listOfOps.size());
}
try {
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
} catch (KeeperException e) {
LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
}
}
@Override
public void addPeerToHFileRefs(String peerId) throws ReplicationException {
String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
try {
if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
LOG.info("Adding peer " + peerId + " to hfile reference queue.");
ZKUtil.createWithParents(this.zookeeper, peerZnode);
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
e);
}
}
@Override
public void removePeerFromHFileRefs(String peerId) {
final String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
try {
if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
if (LOG.isDebugEnabled()) {
LOG.debug("Peer " + peerZnode + " not found in hfile reference queue.");
}
return;
} else {
LOG.info("Removing peer " + peerZnode + " from hfile reference queue.");
ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode);
}
} catch (KeeperException e) {
LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.",
e);
}
}
}

View File

@ -1,441 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.regionserver.BloomType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/*
* Abstract class that provides an interface to the Replication Table. Which is currently
* being used for WAL offset tracking.
* The basic schema of this table will store each individual queue as a
* seperate row. The row key will be a unique identifier of the creating server's name and the
* queueId. Each queue must have the following two columns:
* COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue
* COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this
* queue. The most recent previous owner is the leftmost entry.
* They will also have columns mapping [WAL filename : offset]
* The most flexible method of interacting with the Replication Table is by calling
* getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up
* to the caller to close the returned table.
*/
@InterfaceAudience.Private
abstract class ReplicationTableBase {
/** Name of the HBase Table used for tracking replication*/
public static final TableName REPLICATION_TABLE_NAME =
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
// Column family and column names for Queues in the Replication Table
public static final byte[] CF_QUEUE = Bytes.toBytes("q");
public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o");
public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h");
// Column Descriptor for the Replication Table
private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
new HColumnDescriptor(CF_QUEUE).setMaxVersions(1)
.setInMemory(true)
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
// TODO: Figure out which bloom filter to use
.setBloomFilterType(BloomType.NONE);
// The value used to delimit the queueId and server name inside of a queue's row key. Currently a
// hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens.
// See HBASE-11394.
public static final String ROW_KEY_DELIMITER = "-";
// The value used to delimit server names in the queue history list
public static final String QUEUE_HISTORY_DELIMITER = "|";
/*
* Make sure that HBase table operations for replication have a high number of retries. This is
* because the server is aborted if any HBase table operation fails. Each RPC will be attempted
* 3600 times before exiting. This provides each operation with 2 hours of retries
* before the server is aborted.
*/
private static final int CLIENT_RETRIES = 3600;
private static final int RPC_TIMEOUT = 2000;
private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
// We only need a single thread to initialize the Replication Table
private static final int NUM_INITIALIZE_WORKERS = 1;
protected final Configuration conf;
protected final Abortable abortable;
private final Connection connection;
private final Executor executor;
private volatile CountDownLatch replicationTableInitialized;
public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException {
this.conf = new Configuration(conf);
this.abortable = abort;
decorateConf();
this.connection = ConnectionFactory.createConnection(this.conf);
this.executor = setUpExecutor();
this.replicationTableInitialized = new CountDownLatch(1);
createReplicationTableInBackground();
}
/**
* Modify the connection's config so that operations run on the Replication Table have longer and
* a larger number of retries
*/
private void decorateConf() {
this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
}
/**
* Sets up the thread pool executor used to build the Replication Table in the background
* @return the configured executor
*/
private Executor setUpExecutor() {
ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS,
NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationTableExecutor-%d");
tfb.setDaemon(true);
tempExecutor.setThreadFactory(tfb.build());
return tempExecutor;
}
/**
* Get whether the Replication Table has been successfully initialized yet
* @return whether the Replication Table is initialized
*/
public boolean getInitializationStatus() {
return replicationTableInitialized.getCount() == 0;
}
/**
* Increases the RPC and operations timeouts for the Replication Table
*/
private Table setReplicationTableTimeOuts(Table replicationTable) {
replicationTable.setRpcTimeout(RPC_TIMEOUT);
replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
return replicationTable;
}
/**
* Build the row key for the given queueId. This will uniquely identify it from all other queues
* in the cluster.
* @param serverName The owner of the queue
* @param queueId String identifier of the queue
* @return String representation of the queue's row key
*/
protected String buildQueueRowKey(String serverName, String queueId) {
return queueId + ROW_KEY_DELIMITER + serverName;
}
/**
* Parse the original queueId from a row key
* @param rowKey String representation of a queue's row key
* @return the original queueId
*/
protected String getRawQueueIdFromRowKey(String rowKey) {
return rowKey.split(ROW_KEY_DELIMITER)[0];
}
/**
* Returns a queue's row key given either its raw or reclaimed queueId
*
* @param queueId queueId of the queue
* @return byte representation of the queue's row key
*/
protected byte[] queueIdToRowKey(String serverName, String queueId) {
// Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen
// then this is not a reclaimed queue.
if (!queueId.contains(ROW_KEY_DELIMITER)) {
return Bytes.toBytes(buildQueueRowKey(serverName, queueId));
// If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the
// queue's row key
} else {
return Bytes.toBytes(queueId);
}
}
/**
* Creates a "|" delimited record of the queue's past region server owners.
*
* @param originalHistory the queue's original owner history
* @param oldServer the name of the server that used to own the queue
* @return the queue's new owner history
*/
protected String buildClaimedQueueHistory(String originalHistory, String oldServer) {
return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory;
}
/**
* Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster.
* @return a list of server names
*/
protected List<String> getListOfReplicators() {
// scan all of the queues and return a list of all unique OWNER values
Set<String> peerServers = new HashSet<>();
ResultScanner allQueuesInCluster = null;
try (Table replicationTable = getOrBlockOnReplicationTable()){
Scan scan = new Scan();
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
allQueuesInCluster = replicationTable.getScanner(scan);
for (Result queue : allQueuesInCluster) {
peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER)));
}
} catch (IOException e) {
String errMsg = "Failed getting list of replicators";
abortable.abort(errMsg, e);
} finally {
if (allQueuesInCluster != null) {
allQueuesInCluster.close();
}
}
return new ArrayList<>(peerServers);
}
protected List<String> getAllQueues(String serverName) {
List<String> allQueues = new ArrayList<>();
ResultScanner queueScanner = null;
try {
queueScanner = getQueuesBelongingToServer(serverName);
for (Result queue : queueScanner) {
String rowKey = Bytes.toString(queue.getRow());
// If the queue does not have a Owner History, then we must be its original owner. So we
// want to return its queueId in raw form
if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) {
allQueues.add(getRawQueueIdFromRowKey(rowKey));
} else {
allQueues.add(rowKey);
}
}
return allQueues;
} catch (IOException e) {
String errMsg = "Failed getting list of all replication queues for serverName=" + serverName;
abortable.abort(errMsg, e);
return null;
} finally {
if (queueScanner != null) {
queueScanner.close();
}
}
}
protected List<String> getLogsInQueue(String serverName, String queueId) {
String rowKey = queueId;
if (!queueId.contains(ROW_KEY_DELIMITER)) {
rowKey = buildQueueRowKey(serverName, queueId);
}
return getLogsInQueue(Bytes.toBytes(rowKey));
}
protected List<String> getLogsInQueue(byte[] rowKey) {
String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
try (Table replicationTable = getOrBlockOnReplicationTable()) {
Get getQueue = new Get(rowKey);
Result queue = replicationTable.get(getQueue);
if (queue == null || queue.isEmpty()) {
abortable.abort(errMsg, new ReplicationException(errMsg));
return null;
}
return readWALsFromResult(queue);
} catch (IOException e) {
abortable.abort(errMsg, e);
return null;
}
}
/**
* Read all of the WAL's from a queue into a list
*
* @param queue HBase query result containing the queue
* @return a list of all the WAL filenames
*/
protected List<String> readWALsFromResult(Result queue) {
List<String> wals = new ArrayList<>();
Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
for (byte[] cQualifier : familyMap.keySet()) {
// Ignore the meta data fields of the queue
if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
COL_QUEUE_OWNER_HISTORY)) {
continue;
}
wals.add(Bytes.toString(cQualifier));
}
return wals;
}
/**
* Get the queue id's and meta data (Owner and History) for the queues belonging to the named
* server
*
* @param server name of the server
* @return a ResultScanner over the QueueIds belonging to the server
* @throws IOException if getting the table or the scanner fails
*/
protected ResultScanner getQueuesBelongingToServer(String server) throws IOException {
Scan scan = new Scan();
SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
CompareOperator.EQUAL, Bytes.toBytes(server));
scan.setFilter(filterMyQueues);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
try (Table replicationTable = getOrBlockOnReplicationTable()) {
ResultScanner results = replicationTable.getScanner(scan);
return results;
}
}
/**
* Attempts to acquire the Replication Table. This operation will block until it is assigned by
* the CreateReplicationWorker thread. It is up to the caller of this method to close the
* returned Table
* @return the Replication Table when it is created
* @throws IOException if getting the table or the scanner fails
*/
protected Table getOrBlockOnReplicationTable() throws IOException {
// Sleep until the Replication Table becomes available
try {
replicationTableInitialized.await();
} catch (InterruptedException e) {
String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " +
e.getMessage();
throw new InterruptedIOException(errMsg);
}
return getAndSetUpReplicationTable();
}
/**
* Creates a new copy of the Replication Table and sets up the proper Table time outs for it
*
* @return the Replication Table
* @throws IOException if getting the table fails
*/
private Table getAndSetUpReplicationTable() throws IOException {
Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME);
setReplicationTableTimeOuts(replicationTable);
return replicationTable;
}
/**
* Builds the Replication Table in a background thread. Any method accessing the Replication Table
* should do so through getOrBlockOnReplicationTable()
*
* @return the Replication Table
* @throws IOException if the Replication Table takes too long to build
*/
private void createReplicationTableInBackground() throws IOException {
executor.execute(new CreateReplicationTableWorker());
}
/**
* Attempts to build the Replication Table. Will continue blocking until we have a valid
* Table for the Replication Table.
*/
private class CreateReplicationTableWorker implements Runnable {
private Admin admin;
@Override
public void run() {
try {
admin = connection.getAdmin();
if (!replicationTableExists()) {
createReplicationTable();
}
int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number",
CLIENT_RETRIES);
RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT);
RetryCounter retryCounter = counterFactory.create();
while (!replicationTableExists()) {
retryCounter.sleepUntilNextRetry();
if (!retryCounter.shouldRetry()) {
throw new IOException("Unable to acquire the Replication Table");
}
}
replicationTableInitialized.countDown();
} catch (IOException | InterruptedException e) {
abortable.abort("Failed building Replication Table", e);
}
}
/**
* Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
* in TableBasedReplicationQueuesImpl
*
* @throws IOException if creating the table fails
*/
private void createReplicationTable() throws IOException {
HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
try {
admin.createTable(replicationTableDescriptor);
} catch (TableExistsException e) {
// In this case we can just continue as normal
}
}
/**
* Checks whether the Replication Table exists yet
*
* @return whether the Replication Table exists
*/
private boolean replicationTableExists() {
try {
return admin.tableExists(REPLICATION_TABLE_NAME);
} catch (IOException e) {
return false;
}
}
}
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -54,6 +53,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
super(zookeeper, conf, abortable);
this.stopper = stopper;
this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
// watch the changes
refreshOtherRegionServersList(true);
}
@Override
@ -71,7 +72,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
*/
@Override
public List<String> getListOfRegionServers() {
refreshOtherRegionServersList();
refreshOtherRegionServersList(false);
List<String> list = null;
synchronized (otherRegionServers) {
@ -137,7 +138,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
if (!path.startsWith(this.watcher.znodePaths.rsZNode)) {
return false;
}
return refreshOtherRegionServersList();
return refreshOtherRegionServersList(true);
}
}
@ -157,8 +158,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
* @return true if the local list of the other region servers was updated with the ZK data (even
* if it was empty), false if the data was missing in ZK
*/
private boolean refreshOtherRegionServersList() {
List<String> newRsList = getRegisteredRegionServers();
private boolean refreshOtherRegionServersList(boolean watch) {
List<String> newRsList = getRegisteredRegionServers(watch);
if (newRsList == null) {
return false;
} else {
@ -174,10 +175,14 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
* Get a list of all the other region servers in this cluster and set a watch
* @return a list of server nanes
*/
private List<String> getRegisteredRegionServers() {
private List<String> getRegisteredRegionServers(boolean watch) {
List<String> result = null;
try {
result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
if (watch) {
result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
} else {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
}
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}

View File

@ -54,6 +54,28 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
/**
* ZK based replication queue storage.
* <p>
* The base znode for each regionserver is the regionserver name. For example:
*
* <pre>
* /hbase/replication/rs/hostname.example.org,6020,1234
* </pre>
*
* Within this znode, the region server maintains a set of WAL replication queues. These queues are
* represented by child znodes named using there give queue id. For example:
*
* <pre>
* /hbase/replication/rs/hostname.example.org,6020,1234/1
* /hbase/replication/rs/hostname.example.org,6020,1234/2
* </pre>
*
* Each queue has one child znode for every WAL that still needs to be replicated. The value of
* these WAL child znodes is the latest position that has been replicated. This position is updated
* every time a WAL entry is replicated. For example:
*
* <pre>
* /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
* </pre>
*/
@InterfaceAudience.Private
class ZKReplicationQueueStorage extends ZKReplicationStorageBase

View File

@ -42,9 +42,8 @@ import org.slf4j.LoggerFactory;
*/
public abstract class TestReplicationStateBasic {
protected ReplicationQueues rq1;
protected ReplicationQueues rq2;
protected ReplicationQueues rq3;
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
protected ReplicationQueueStorage rqs;
protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
@ -63,8 +62,6 @@ public abstract class TestReplicationStateBasic {
protected static final int ZK_MAX_COUNT = 300;
protected static final int ZK_SLEEP_INTERVAL = 100; // millis
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
@Test
public void testReplicationQueueStorage() throws ReplicationException {
// Test methods with empty state
@ -76,15 +73,13 @@ public abstract class TestReplicationStateBasic {
* Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
* server2: zero queues
*/
rq1.init(server1.getServerName());
rq2.init(server2.getServerName());
rq1.addLog("qId1", "trash");
rq1.removeLog("qId1", "trash");
rq1.addLog("qId2", "filename1");
rq1.addLog("qId3", "filename2");
rq1.addLog("qId3", "filename3");
rq2.addLog("trash", "trash");
rq2.removeQueue("trash");
rqs.addWAL(server1, "qId1", "trash");
rqs.removeWAL(server1, "qId1", "trash");
rqs.addWAL(server1,"qId2", "filename1");
rqs.addWAL(server1,"qId3", "filename2");
rqs.addWAL(server1,"qId3", "filename3");
rqs.addWAL(server2,"trash", "trash");
rqs.removeQueue(server2,"trash");
List<ServerName> reps = rqs.getListOfReplicators();
assertEquals(2, reps.size());
@ -105,62 +100,55 @@ public abstract class TestReplicationStateBasic {
assertTrue(list.contains("qId3"));
}
private void removeAllQueues(ServerName serverName) throws ReplicationException {
for (String queue: rqs.getAllQueues(serverName)) {
rqs.removeQueue(serverName, queue);
}
}
@Test
public void testReplicationQueues() throws ReplicationException {
rq1.init(server1.getServerName());
rq2.init(server2.getServerName());
rq3.init(server3.getServerName());
// Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
rp.init();
// 3 replicators should exist
assertEquals(3, rq1.getListOfReplicators().size());
rq1.removeQueue("bogus");
rq1.removeLog("bogus", "bogus");
rq1.removeAllQueues();
assertEquals(0, rq1.getAllQueues().size());
assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
assertNull(rq1.getLogsInQueue("bogus"));
assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString()));
rq1.setLogPosition("bogus", "bogus", 5L);
rqs.removeQueue(server1, "bogus");
rqs.removeWAL(server1, "bogus", "bogus");
removeAllQueues(server1);
assertEquals(0, rqs.getAllQueues(server1).size());
assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty());
populateQueues();
assertEquals(3, rq1.getListOfReplicators().size());
assertEquals(0, rq2.getLogsInQueue("qId1").size());
assertEquals(5, rq3.getLogsInQueue("qId5").size());
assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
rq3.setLogPosition("qId5", "filename4", 354L);
assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
assertEquals(3, rqs.getListOfReplicators().size());
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
rqs.setWALPosition(server3, "qId5", "filename4", 354L);
assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
assertEquals(5, rq3.getLogsInQueue("qId5").size());
assertEquals(0, rq2.getLogsInQueue("qId1").size());
assertEquals(0, rq1.getAllQueues().size());
assertEquals(1, rq2.getAllQueues().size());
assertEquals(5, rq3.getAllQueues().size());
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
assertEquals(0, rqs.getAllQueues(server1).size());
assertEquals(1, rqs.getAllQueues(server2).size());
assertEquals(5, rqs.getAllQueues(server3).size());
assertEquals(0, rq3.getUnClaimedQueueIds(server1.getServerName()).size());
rq3.removeReplicatorIfQueueIsEmpty(server1.getServerName());
assertEquals(2, rq3.getListOfReplicators().size());
assertEquals(0, rqs.getAllQueues(server1).size());
rqs.removeReplicatorIfQueueIsEmpty(server1);
assertEquals(2, rqs.getListOfReplicators().size());
List<String> queues = rq2.getUnClaimedQueueIds(server3.getServerName());
List<String> queues = rqs.getAllQueues(server3);
assertEquals(5, queues.size());
for (String queue : queues) {
rq2.claimQueue(server3.getServerName(), queue);
rqs.claimQueue(server3, queue, server2);
}
rq2.removeReplicatorIfQueueIsEmpty(server3.getServerName());
assertEquals(1, rq2.getListOfReplicators().size());
rqs.removeReplicatorIfQueueIsEmpty(server3);
assertEquals(1, rqs.getListOfReplicators().size());
// Try to claim our own queues
assertNull(rq2.getUnClaimedQueueIds(server2.getServerName()));
rq2.removeReplicatorIfQueueIsEmpty(server2.getServerName());
assertEquals(6, rq2.getAllQueues().size());
rq2.removeAllQueues();
assertEquals(0, rq2.getListOfReplicators().size());
assertEquals(6, rqs.getAllQueues(server2).size());
removeAllQueues(server2);
rqs.removeReplicatorIfQueueIsEmpty(server2);
assertEquals(0, rqs.getListOfReplicators().size());
}
@Test
@ -197,7 +185,6 @@ public abstract class TestReplicationStateBasic {
@Test
public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
rp.init();
rq1.init(server1.getServerName());
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
files1.add(new Pair<>(null, new Path("file_1")));
@ -206,8 +193,8 @@ public abstract class TestReplicationStateBasic {
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rq1.addPeerToHFileRefs(ID_ONE);
rq1.addHFileRefs(ID_ONE, files1);
rqs.addPeerToHFileRefs(ID_ONE);
rqs.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
List<String> hfiles2 = new ArrayList<>(files1.size());
@ -215,43 +202,41 @@ public abstract class TestReplicationStateBasic {
hfiles2.add(p.getSecond().getName());
}
String removedString = hfiles2.remove(0);
rq1.removeHFileRefs(ID_ONE, hfiles2);
rqs.removeHFileRefs(ID_ONE, hfiles2);
assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
hfiles2 = new ArrayList<>(1);
hfiles2.add(removedString);
rq1.removeHFileRefs(ID_ONE, hfiles2);
rqs.removeHFileRefs(ID_ONE, hfiles2);
assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
rp.unregisterPeer(ID_ONE);
}
@Test
public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
rq1.init(server1.getServerName());
rp.init();
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rq1.addPeerToHFileRefs(ID_ONE);
rqs.addPeerToHFileRefs(ID_ONE);
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
rq1.addPeerToHFileRefs(ID_TWO);
rqs.addPeerToHFileRefs(ID_TWO);
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
files1.add(new Pair<>(null, new Path("file_1")));
files1.add(new Pair<>(null, new Path("file_2")));
files1.add(new Pair<>(null, new Path("file_3")));
rq1.addHFileRefs(ID_ONE, files1);
rq1.addHFileRefs(ID_TWO, files1);
rqs.addHFileRefs(ID_ONE, files1);
rqs.addHFileRefs(ID_TWO, files1);
assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
rp.unregisterPeer(ID_ONE);
rq1.removePeerFromHFileRefs(ID_ONE);
rqs.removePeerFromHFileRefs(ID_ONE);
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
rp.unregisterPeer(ID_TWO);
rq1.removePeerFromHFileRefs(ID_TWO);
rqs.removePeerFromHFileRefs(ID_TWO);
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
}
@ -363,15 +348,15 @@ public abstract class TestReplicationStateBasic {
* 3, 4, 5 log files respectively
*/
protected void populateQueues() throws ReplicationException {
rq1.addLog("trash", "trash");
rq1.removeQueue("trash");
rqs.addWAL(server1, "trash", "trash");
rqs.removeQueue(server1, "trash");
rq2.addLog("qId1", "trash");
rq2.removeLog("qId1", "trash");
rqs.addWAL(server2, "qId1", "trash");
rqs.removeWAL(server2, "qId1", "trash");
for (int i = 1; i < 6; i++) {
for (int j = 0; j < i; j++) {
rq3.addLog("qId" + i, "filename" + j);
rqs.addWAL(server3, "qId" + i, "filename" + j);
}
// Add peers for the corresponding queues so they are not orphans
rp.registerPeer("qId" + i,

View File

@ -17,10 +17,6 @@
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
@ -40,7 +36,6 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,7 +49,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
private static HBaseZKTestingUtility utility;
private static ZKWatcher zkw;
private static String replicationZNode;
private ReplicationQueuesZKImpl rqZK;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@ -84,23 +78,9 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
@Before
public void setUp() {
zkTimeoutCount = 0;
WarnOnlyAbortable abortable = new WarnOnlyAbortable();
try {
rq1 = ReplicationFactory
.getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
rq2 = ReplicationFactory
.getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
rq3 = ReplicationFactory
.getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
} catch (Exception e) {
// This should not occur, because getReplicationQueues() only throws for
// TableBasedReplicationQueuesImpl
fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
}
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable());
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
rqZK = new ReplicationQueuesZKImpl(zkw, conf, abortable);
}
@After
@ -113,23 +93,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
utility.shutdownMiniZKCluster();
}
@Test
public void testIsPeerPath_PathToParentOfPeerNode() {
assertFalse(rqZK.isPeerPath(rqZK.peersZNode));
}
@Test
public void testIsPeerPath_PathToChildOfPeerNode() {
String peerChild = ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child");
assertFalse(rqZK.isPeerPath(peerChild));
}
@Test
public void testIsPeerPath_ActualPeerPath() {
String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1");
assertTrue(rqZK.isPeerPath(peerPath));
}
private static class WarnOnlyAbortable implements Abortable {
@Override

View File

@ -49,8 +49,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -307,14 +305,10 @@ public class DumpReplicationQueues extends Configured implements Tool {
boolean hdfs) throws Exception {
ReplicationQueueStorage queueStorage;
ReplicationPeers replicationPeers;
ReplicationQueues replicationQueues;
ReplicationTracker replicationTracker;
ReplicationQueuesArguments replicationArgs =
new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), zkw);
StringBuilder sb = new StringBuilder();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs);
replicationPeers =
ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection);
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
@ -328,7 +322,6 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
for (ServerName regionserver : regionservers) {
List<String> queueIds = queueStorage.getAllQueues(regionserver);
replicationQueues.init(regionserver.getServerName());
if (!liveRegionServers.contains(regionserver.getServerName())) {
deadRegionServers.add(regionserver.getServerName());
}
@ -338,17 +331,17 @@ public class DumpReplicationQueues extends Configured implements Tool {
if (!peerIds.contains(queueInfo.getPeerId())) {
deletedQueues.add(regionserver + "/" + queueId);
sb.append(
formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
} else {
sb.append(
formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
}
}
}
return sb.toString();
}
private String formatQueue(ServerName regionserver, ReplicationQueues replicationQueues,
private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
boolean hdfs) throws Exception {
StringBuilder sb = new StringBuilder();
@ -370,7 +363,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
for (String wal : wals) {
long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal);
long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0"
+ " (not started or nothing to replicate)") + "\n");
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -29,15 +28,15 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class that handles the recovered source of a replication stream, which is transfered from
@ -52,10 +51,10 @@ public class RecoveredReplicationSource extends ReplicationSource {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode,
super.init(conf, fs, manager, queueStorage, replicationPeers, server, peerClusterZnode,
clusterId, replicationEndpoint, walFileLengthProvider, metrics);
this.actualPeerId = this.replicationQueueInfo.getPeerId();
}
@ -64,7 +63,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
final RecoveredReplicationSourceShipper worker =
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
this.replicationQueues);
this.queueStorage);
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
if (extant != null) {
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);

View File

@ -23,13 +23,13 @@ import java.util.concurrent.PriorityBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
import org.apache.hadoop.hbase.util.Threads;
/**
* Used by a {@link RecoveredReplicationSource}.
@ -40,14 +40,14 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class);
protected final RecoveredReplicationSource source;
private final ReplicationQueues replicationQueues;
private final ReplicationQueueStorage replicationQueues;
public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
ReplicationQueues replicationQueues) {
ReplicationQueueStorage queueStorage) {
super(conf, walGroupId, queue, source);
this.source = source;
this.replicationQueues = replicationQueues;
this.replicationQueues = queueStorage;
}
@Override
@ -116,11 +116,11 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
long startPosition = 0;
String peerClusterZnode = source.getPeerClusterZnode();
try {
startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
this.queue.peek().getName());
startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
peerClusterZnode, this.queue.peek().getName());
if (LOG.isTraceEnabled()) {
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
+ startPosition);
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " +
startPosition);
}
} catch (ReplicationException e) {
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -28,12 +27,6 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -43,27 +36,33 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/
@ -74,7 +73,7 @@ public class Replication implements
LoggerFactory.getLogger(Replication.class);
private boolean replicationForBulkLoadData;
private ReplicationSourceManager replicationManager;
private ReplicationQueues replicationQueues;
private ReplicationQueueStorage queueStorage;
private ReplicationPeers replicationPeers;
private ReplicationTracker replicationTracker;
private Configuration conf;
@ -127,10 +126,8 @@ public class Replication implements
}
try {
this.replicationQueues =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server,
server.getZooKeeper()));
this.replicationQueues.init(this.server.getServerName().toString());
this.queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
this.replicationPeers =
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
this.replicationPeers.init();
@ -147,7 +144,7 @@ public class Replication implements
throw new IOException("Could not read cluster id", ke);
}
this.replicationManager =
new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker, conf,
new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
this.server, fs, logDir, oldLogDir, clusterId, walFileLengthProvider);
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -31,7 +30,6 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@ -41,9 +39,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
@ -52,7 +47,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.util.Bytes;
@ -60,6 +55,10 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@ -83,7 +82,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
// per group queue size, keep no more than this number of logs in each wal group
protected int queueSizePerGroup;
protected ReplicationQueues replicationQueues;
protected ReplicationQueueStorage queueStorage;
private ReplicationPeers replicationPeers;
protected Configuration conf;
@ -148,7 +147,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
*/
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.server = server;
@ -161,7 +160,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
this.maxRetriesMultiplier =
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
this.replicationQueues = replicationQueues;
this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
this.manager = manager;
this.fs = fs;
@ -229,7 +228,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
List<String> tableCfs = tableCFMap.get(tableName);
if (tableCFMap.containsKey(tableName)
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
this.replicationQueues.addHFileRefs(peerId, pairs);
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
} else {
LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
@ -238,7 +237,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
} else {
// user has explicitly not defined any table cfs for replication, means replicate all the
// data
this.replicationQueues.addHFileRefs(peerId, pairs);
this.queueStorage.addHFileRefs(peerId, pairs);
metrics.incrSizeOfHFileRefsQueue(pairs.size());
}
}

View File

@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -32,9 +31,10 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Interface that defines a replication source
@ -47,15 +47,10 @@ public interface ReplicationSourceInterface {
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
* @param replicationQueues
* @param replicationPeers
* @param server the server for this region server
* @param peerClusterZnode
* @param clusterId
* @throws IOException
*/
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server,
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;

View File

@ -34,19 +34,21 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
@ -60,7 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
@ -68,6 +70,7 @@ import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
@ -95,7 +98,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final List<ReplicationSourceInterface> sources;
// List of all the sources we got from died RSs
private final List<ReplicationSourceInterface> oldsources;
private final ReplicationQueues replicationQueues;
private final ReplicationQueueStorage queueStorage;
private final ReplicationTracker replicationTracker;
private final ReplicationPeers replicationPeers;
// UUID for this cluster
@ -130,7 +133,7 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
* @param queueStorage the interface for manipulating replication queues
* @param replicationPeers
* @param replicationTracker
* @param conf the configuration to use
@ -140,14 +143,14 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param oldLogDir the directory where old logs are archived
* @param clusterId
*/
public ReplicationSourceManager(ReplicationQueues replicationQueues,
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
WALFileLengthProvider walFileLengthProvider) throws IOException {
//CopyOnWriteArrayList is thread-safe.
//Generally, reading is more than modifying.
this.sources = new CopyOnWriteArrayList<>();
this.replicationQueues = replicationQueues;
this.queueStorage = queueStorage;
this.replicationPeers = replicationPeers;
this.replicationTracker = replicationTracker;
this.server = server;
@ -184,6 +187,19 @@ public class ReplicationSourceManager implements ReplicationListener {
connection = ConnectionFactory.createConnection(conf);
}
@FunctionalInterface
private interface ReplicationQueueOperation {
void exec() throws ReplicationException;
}
private void abortWhenFail(ReplicationQueueOperation op) {
try {
op.exec();
} catch (ReplicationException e) {
server.abort("Failed to operate on replication queue", e);
}
}
/**
* Provide the id of the peer and a log key and this method will figure which
* wal it belongs to and will log, for this region server, the current
@ -195,12 +211,13 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK
*/
public void logPositionAndCleanOldLogs(Path log, String id, long position,
boolean queueRecovered, boolean holdLogInZK) {
public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered,
boolean holdLogInZK) {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
abortWhenFail(
() -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position));
if (holdLogInZK) {
return;
return;
}
cleanOldLogs(fileName, id, queueRecovered);
}
@ -227,36 +244,59 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
}
}
}
private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
SortedSet<String> walSet = wals.headSet(key);
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
if (LOG.isDebugEnabled()) {
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
}
for (String wal : walSet) {
this.replicationQueues.removeLog(id, wal);
abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
}
walSet.clear();
}
private void adoptAbandonedQueues() {
List<ServerName> currentReplicators = null;
try {
currentReplicators = queueStorage.getListOfReplicators();
} catch (ReplicationException e) {
server.abort("Failed to get all replicators", e);
return;
}
if (currentReplicators == null || currentReplicators.isEmpty()) {
return;
}
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
.map(ServerName::valueOf).collect(Collectors.toList());
LOG.info(
"Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
// Look if there's anything to process after a restart
for (ServerName rs : currentReplicators) {
if (!otherRegionServers.contains(rs)) {
transferQueues(rs);
}
}
}
/**
* Adds a normal source per registered peer cluster and tries to process all
* old region server wal queues
* Adds a normal source per registered peer cluster and tries to process all old region server wal
* queues
* <p>
* The returned future is for adoptAbandonedQueues task.
*/
void init() throws IOException, ReplicationException {
Future<?> init() throws IOException, ReplicationException {
for (String id : this.replicationPeers.getConnectedPeerIds()) {
addSource(id);
if (replicationForBulkLoadDataEnabled) {
// Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
// when a peer was added before replication for bulk loaded data was enabled.
this.replicationQueues.addPeerToHFileRefs(id);
this.queueStorage.addPeerToHFileRefs(id);
}
}
AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
try {
this.executor.execute(adoptionWorker);
} catch (RejectedExecutionException ex) {
LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
}
return this.executor.submit(this::adoptAbandonedQueues);
}
/**
@ -264,15 +304,12 @@ public class ReplicationSourceManager implements ReplicationListener {
* need to enqueue the latest log of each wal group and do replication
* @param id the id of the peer cluster
* @return the source that was created
* @throws IOException
*/
@VisibleForTesting
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this,
this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer,
walFileLengthProvider);
ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
synchronized (this.walsById) {
this.sources.add(src);
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
@ -287,11 +324,10 @@ public class ReplicationSourceManager implements ReplicationListener {
logs.add(name);
walsByGroup.put(walPrefix, logs);
try {
this.replicationQueues.addLog(id, name);
this.queueStorage.addWAL(server.getServerName(), id, name);
} catch (ReplicationException e) {
String message =
"Cannot add log to queue when creating a new source, queueId=" + id
+ ", filename=" + name;
String message = "Cannot add log to queue when creating a new source, queueId=" + id +
", filename=" + name;
server.stop(message);
throw e;
}
@ -316,7 +352,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param peerId Id of the peer cluster queue of wals to delete
*/
public void deleteSource(String peerId, boolean closeConnection) {
this.replicationQueues.removeQueue(peerId);
abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
if (closeConnection) {
this.replicationPeers.peerDisconnected(peerId);
}
@ -376,8 +412,8 @@ public class ReplicationSourceManager implements ReplicationListener {
}
@VisibleForTesting
List<String> getAllQueues() {
return replicationQueues.getAllQueues();
List<String> getAllQueues() throws ReplicationException {
return queueStorage.getAllQueues(server.getServerName());
}
void preLogRoll(Path newLog) throws IOException {
@ -411,10 +447,10 @@ public class ReplicationSourceManager implements ReplicationListener {
synchronized (replicationPeers) {
for (String id : replicationPeers.getConnectedPeerIds()) {
try {
this.replicationQueues.addLog(id, logName);
this.queueStorage.addWAL(server.getServerName(), id, logName);
} catch (ReplicationException e) {
throw new IOException("Cannot add log to replication queue"
+ " when creating a new source, queueId=" + id + ", filename=" + logName, e);
throw new IOException("Cannot add log to replication queue" +
" when creating a new source, queueId=" + id + ", filename=" + logName, e);
}
}
}
@ -461,19 +497,11 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* Factory method to create a replication source
* @param conf the configuration to use
* @param fs the file system to use
* @param manager the manager to use
* @param server the server object for this region server
* @param peerId the id of the peer cluster
* @return the created source
* @throws IOException
*/
private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs,
ReplicationSourceManager manager, ReplicationQueues replicationQueues,
ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId,
ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
WALFileLengthProvider walFileLengthProvider) throws IOException {
private ReplicationSourceInterface getReplicationSource(String peerId,
ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException {
RegionServerCoprocessorHost rsServerHost = null;
TableDescriptors tableDescriptors = null;
if (server instanceof HRegionServer) {
@ -490,9 +518,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// Default to HBase inter-cluster replication endpoint
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
}
@SuppressWarnings("rawtypes")
Class c = Class.forName(replicationEndpointImpl);
replicationEndpoint = (ReplicationEndpoint) c.newInstance();
replicationEndpoint = Class.forName(replicationEndpointImpl)
.asSubclass(ReplicationEndpoint.class).newInstance();
if(rsServerHost != null) {
ReplicationEndpoint newReplicationEndPoint = rsServerHost
.postCreateReplicationEndPoint(replicationEndpoint);
@ -509,7 +536,7 @@ public class ReplicationSourceManager implements ReplicationListener {
MetricsSource metrics = new MetricsSource(peerId);
// init replication source
src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId,
src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, clusterId,
replicationEndpoint, walFileLengthProvider, metrics);
// init replication endpoint
@ -520,21 +547,21 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
* Transfer all the queues of the specified to this region server.
* First it tries to grab a lock and if it works it will move the
* znodes and finally will delete the old znodes.
*
* Transfer all the queues of the specified to this region server. First it tries to grab a lock
* and if it works it will move the znodes and finally will delete the old znodes.
* <p>
* It creates one old source for any type of source of the old rs.
* @param rsZnode
*/
private void transferQueues(String rsZnode) {
NodeFailoverWorker transfer =
new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
this.clusterId);
private void transferQueues(ServerName deadRS) {
if (server.getServerName().equals(deadRS)) {
// it's just us, give up
return;
}
NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
try {
this.executor.execute(transfer);
} catch (RejectedExecutionException ex) {
LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
}
}
@ -571,7 +598,7 @@ public class ReplicationSourceManager implements ReplicationListener {
LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
addSource(id);
if (replicationForBulkLoadDataEnabled) {
this.replicationQueues.addPeerToHFileRefs(id);
this.queueStorage.addPeerToHFileRefs(id);
}
}
}
@ -624,12 +651,12 @@ public class ReplicationSourceManager implements ReplicationListener {
deleteSource(id, true);
}
// Remove HFile Refs znode from zookeeper
this.replicationQueues.removePeerFromHFileRefs(id);
abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id));
}
@Override
public void regionServerRemoved(String regionserver) {
transferQueues(regionserver);
transferQueues(ServerName.valueOf(regionserver));
}
/**
@ -638,37 +665,21 @@ public class ReplicationSourceManager implements ReplicationListener {
*/
class NodeFailoverWorker extends Thread {
private String rsZnode;
private final ReplicationQueues rq;
private final ReplicationPeers rp;
private final UUID clusterId;
private final ServerName deadRS;
/**
* @param rsZnode
*/
public NodeFailoverWorker(String rsZnode) {
this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
}
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final UUID clusterId) {
super("Failover-for-"+rsZnode);
this.rsZnode = rsZnode;
this.rq = replicationQueues;
this.rp = replicationPeers;
this.clusterId = clusterId;
@VisibleForTesting
public NodeFailoverWorker(ServerName deadRS) {
super("Failover-for-" + deadRS);
this.deadRS = deadRS;
}
@Override
public void run() {
if (this.rq.isThisOurRegionServer(rsZnode)) {
return;
}
// Wait a bit before transferring the queues, we may be shutting down.
// This sleep may not be enough in some cases.
try {
Thread.sleep(sleepBeforeFailover +
(long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
(long) (ThreadLocalRandom.current().nextFloat() * sleepBeforeFailover));
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
@ -679,25 +690,30 @@ public class ReplicationSourceManager implements ReplicationListener {
return;
}
Map<String, Set<String>> newQueues = new HashMap<>();
List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
while (peers != null && !peers.isEmpty()) {
Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
long sleep = sleepBeforeFailover/2;
if (peer != null) {
newQueues.put(peer.getFirst(), peer.getSecond());
sleep = sleepBeforeFailover;
try {
List<String> peers = queueStorage.getAllQueues(deadRS);
while (!peers.isEmpty()) {
Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS,
peers.get(ThreadLocalRandom.current().nextInt(peers.size())), server.getServerName());
long sleep = sleepBeforeFailover / 2;
if (!peer.getSecond().isEmpty()) {
newQueues.put(peer.getFirst(), peer.getSecond());
sleep = sleepBeforeFailover;
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
}
peers = queueStorage.getAllQueues(deadRS);
}
try {
Thread.sleep(sleep);
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting before transferring a queue.");
Thread.currentThread().interrupt();
if (!peers.isEmpty()) {
queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
}
peers = rq.getUnClaimedQueueIds(rsZnode);
}
if (peers != null) {
rq.removeReplicatorIfQueueIsEmpty(rsZnode);
} catch (ReplicationException e) {
server.abort("Failed to claim queue from dead regionserver", e);
return;
}
// Copying over the failed queue is completed.
if (newQueues.isEmpty()) {
@ -722,8 +738,8 @@ public class ReplicationSourceManager implements ReplicationListener {
+ ex);
}
if (peer == null || peerConfig == null) {
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
replicationQueues.removeQueue(peerId);
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS);
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
continue;
}
// track sources in walsByIdRecoveredQueues
@ -740,13 +756,11 @@ public class ReplicationSourceManager implements ReplicationListener {
}
// enqueue sources
ReplicationSourceInterface src =
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider);
ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer);
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
// see removePeer
synchronized (oldsources) {
if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
src.terminate("Recovered queue doesn't belong to any current peer");
closeRecoveredQueue(src);
continue;
@ -765,29 +779,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
class AdoptAbandonedQueuesWorker extends Thread{
public AdoptAbandonedQueuesWorker() {}
@Override
public void run() {
List<String> currentReplicators = replicationQueues.getListOfReplicators();
if (currentReplicators == null || currentReplicators.isEmpty()) {
return;
}
List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
+ otherRegionServers);
// Look if there's anything to process after a restart
for (String rs : currentReplicators) {
if (!otherRegionServers.contains(rs)) {
transferQueues(rs);
}
}
}
}
/**
* Get the directory where wals are archived
* @return the directory where wals are archived
@ -846,7 +837,11 @@ public class ReplicationSourceManager implements ReplicationListener {
}
public void cleanUpHFileRefs(String peerId, List<String> files) {
this.replicationQueues.removeHFileRefs(peerId, files);
abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
}
int activeFailoverTaskCount() {
return executor.getActiveCount();
}
/**

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
@ -37,22 +36,19 @@ import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* In a scenario of Replication based Disaster/Recovery, when hbase
* Master-Cluster crashes, this tool is used to sync-up the delta from Master to
* Slave using the info from ZooKeeper. The tool will run on Master-Cluser, and
* assume ZK, Filesystem and NetWork still available after hbase crashes
* In a scenario of Replication based Disaster/Recovery, when hbase Master-Cluster crashes, this
* tool is used to sync-up the delta from Master to Slave using the info from ZooKeeper. The tool
* will run on Master-Cluser, and assume ZK, Filesystem and NetWork still available after hbase
* crashes
*
* <pre>
* hbase org.apache.hadoop.hbase.replication.regionserver.ReplicationSyncUp
* </pre>
*/
public class ReplicationSyncUp extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationSyncUp.class.getName());
private static Configuration conf;
private static final long SLEEP_TIME = 10000;
@ -105,13 +101,14 @@ public class ReplicationSyncUp extends Configured implements Tool {
System.out.println("Start Replication Server start");
replication = new Replication(new DummyServer(zkw), fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
manager.init();
manager.init().get();
try {
int numberOfOldSource = 1; // default wait once
while (numberOfOldSource > 0) {
while (manager.activeFailoverTaskCount() > 0) {
Thread.sleep(SLEEP_TIME);
}
while (manager.getOldSources().size() > 0) {
Thread.sleep(SLEEP_TIME);
numberOfOldSource = manager.getOldSources().size();
}
} catch (InterruptedException e) {
System.err.println("didn't wait long enough:" + e);
@ -121,7 +118,7 @@ public class ReplicationSyncUp extends Configured implements Tool {
manager.join();
zkw.close();
return (0);
return 0;
}
static class DummyServer implements Server {

View File

@ -44,9 +44,8 @@ import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -111,9 +110,8 @@ public class TestLogsCleaner {
Replication.decorateMasterConfiguration(conf);
Server server = new DummyServer();
ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(
new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
repQueues.init(server.getServerName().toString());
ReplicationQueueStorage queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs");
String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8");
@ -144,7 +142,7 @@ public class TestLogsCleaner {
// Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
// files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
if (i % (30 / 3) == 1) {
repQueues.addLog(fakeMachineName, fileName.getName());
queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
LOG.info("Replication log file: " + fileName);
}
}

View File

@ -46,9 +46,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -72,19 +71,16 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@Category({ MasterTests.class, SmallTests.class })
public class TestReplicationHFileCleaner {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class);
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static Server server;
private static ReplicationQueues rq;
private static ReplicationQueueStorage rq;
private static ReplicationPeers rp;
private static final String peerId = "TestReplicationHFileCleaner";
private static Configuration conf = TEST_UTIL.getConfiguration();
static FileSystem fs = null;
Path root;
/**
* @throws java.lang.Exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.startMiniZKCluster();
@ -93,20 +89,10 @@ public class TestReplicationHFileCleaner {
Replication.decorateMasterConfiguration(conf);
rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
rp.init();
rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
rq.init(server.getServerName().toString());
try {
fs = FileSystem.get(conf);
} finally {
if (fs != null) {
fs.close();
}
}
rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
fs = FileSystem.get(conf);
}
/**
* @throws java.lang.Exception
*/
@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniZKCluster();

View File

@ -26,10 +26,8 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
@ -49,14 +47,12 @@ public class TestReplicationZKNodeCleaner {
private final Configuration conf;
private final ZKWatcher zkw;
private final ReplicationQueues repQueues;
private final ReplicationQueueStorage repQueues;
public TestReplicationZKNodeCleaner() throws Exception {
conf = TEST_UTIL.getConfiguration();
zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null);
repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null,
zkw));
assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
}
@BeforeClass
@ -72,9 +68,8 @@ public class TestReplicationZKNodeCleaner {
@Test
public void testReplicationZKNodeCleaner() throws Exception {
repQueues.init(SERVER_ONE.getServerName());
// add queue for ID_ONE which isn't exist
repQueues.addLog(ID_ONE, "file1");
repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
@ -84,7 +79,7 @@ public class TestReplicationZKNodeCleaner {
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
// add a recovery queue for ID_TWO which isn't exist
repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
undeletedQueues = cleaner.getUnDeletedQueues();
assertEquals(1, undeletedQueues.size());
@ -100,11 +95,10 @@ public class TestReplicationZKNodeCleaner {
@Test
public void testReplicationZKNodeCleanerChore() throws Exception {
repQueues.init(SERVER_ONE.getServerName());
// add queue for ID_ONE which isn't exist
repQueues.addLog(ID_ONE, "file1");
repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
// add a recovery queue for ID_TWO which isn't exist
repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
// Wait the cleaner chore to run
Thread.sleep(20000);

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -46,9 +45,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
MetricsSource metrics;
WALFileLengthProvider walFileLengthProvider;
AtomicBoolean startup = new AtomicBoolean(false);
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
this.manager = manager;

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -64,7 +64,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
@Before
public void setUp() throws Exception {
HColumnDescriptor fam;
t1_syncupSource = new HTableDescriptor(t1_su);
@ -100,7 +99,7 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
* check's gone Also check the puts and deletes are not replicated back to
* the originating cluster.
*/
@Test(timeout = 300000)
@Test
public void testSyncUpTool() throws Exception {
/**
@ -176,7 +175,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
* verify correctly replicated to Slave
*/
mimicSyncUpAfterPut();
}
protected void setupReplication() throws Exception {

View File

@ -40,7 +40,6 @@ import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -67,10 +66,10 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@ -95,11 +94,13 @@ import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
/**
* An abstract class that tests ReplicationSourceManager. Classes that extend this class should
* set up the proper config for this class and initialize the proper cluster using
@ -328,18 +329,14 @@ public abstract class TestReplicationSourceManager {
@Test
public void testClaimQueues() throws Exception {
final Server server = new DummyServer("hostname0.example.org");
ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
server.getZooKeeper()));
rq.init(server.getServerName().toString());
Server server = new DummyServer("hostname0.example.org");
ReplicationQueueStorage rq = ReplicationStorageFactory
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
rq.addLog("1", file);
rq.addWAL(server.getServerName(), "1", file);
}
// create 3 DummyServers
Server s1 = new DummyServer("dummyserver1.example.org");
@ -347,12 +344,9 @@ public abstract class TestReplicationSourceManager {
Server s3 = new DummyServer("dummyserver3.example.org");
// create 3 DummyNodeFailoverWorkers
DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
server.getServerName().getServerName(), s1);
DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
server.getServerName().getServerName(), s2);
DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
server.getServerName().getServerName(), s3);
DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
latch = new CountDownLatch(3);
// start the threads
@ -371,11 +365,9 @@ public abstract class TestReplicationSourceManager {
@Test
public void testCleanupFailoverQueues() throws Exception {
final Server server = new DummyServer("hostname1.example.org");
ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
server.getZooKeeper()));
rq.init(server.getServerName().toString());
Server server = new DummyServer("hostname1.example.org");
ReplicationQueueStorage rq = ReplicationStorageFactory
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// populate some znodes in the peer znode
SortedSet<String> files = new TreeSet<>();
String group = "testgroup";
@ -384,19 +376,14 @@ public abstract class TestReplicationSourceManager {
files.add(file1);
files.add(file2);
for (String file : files) {
rq.addLog("1", file);
rq.addWAL(server.getServerName(), "1", file);
}
Server s1 = new DummyServer("dummyserver1.example.org");
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
ReplicationPeers rp1 =
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
rp1.init();
NodeFailoverWorker w1 =
manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
new Long(1), new Long(2)));
manager.new NodeFailoverWorker(server.getServerName());
w1.run();
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
String id = "1-" + server.getServerName().getServerName();
@ -408,17 +395,16 @@ public abstract class TestReplicationSourceManager {
@Test
public void testCleanupUnknownPeerZNode() throws Exception {
final Server server = new DummyServer("hostname2.example.org");
ReplicationQueues rq = ReplicationFactory.getReplicationQueues(
new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper()));
rq.init(server.getServerName().toString());
Server server = new DummyServer("hostname2.example.org");
ReplicationQueueStorage rq = ReplicationStorageFactory
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// populate some znodes in the peer znode
// add log to an unknown peer
String group = "testgroup";
rq.addLog("2", group + ".log1");
rq.addLog("2", group + ".log2");
rq.addWAL(server.getServerName(), "2", group + ".log1");
rq.addWAL(server.getServerName(), "2", group + ".log2");
NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName());
w1.run();
// The log of the unknown peer should be removed from zk
@ -481,10 +467,8 @@ public abstract class TestReplicationSourceManager {
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
try {
DummyServer server = new DummyServer();
final ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
server.getConfiguration(), server, server.getZooKeeper()));
rq.init(server.getServerName().toString());
ReplicationQueueStorage rq = ReplicationStorageFactory
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
// Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
// initialization to throw an exception.
conf.set("replication.replicationsource.implementation",
@ -498,11 +482,11 @@ public abstract class TestReplicationSourceManager {
assertNull(manager.getSource(peerId));
// Create a replication queue for the fake peer
rq.addLog(peerId, "FakeFile");
rq.addWAL(server.getServerName(), peerId, "FakeFile");
// Unregister peer, this should remove the peer and clear all queues associated with it
// Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
removePeerAndWait(peerId);
assertFalse(rq.getAllQueues().contains(peerId));
assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
} finally {
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
removePeerAndWait(peerId);
@ -625,11 +609,12 @@ public abstract class TestReplicationSourceManager {
}
}
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
@Override public boolean evaluate() throws Exception {
@Override
public boolean evaluate() throws Exception {
List<String> peers = rp.getAllPeerIds();
return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
&& (!peers.contains(peerId))
&& manager.getSource(peerId) == null;
return (!manager.getAllQueues().contains(peerId)) &&
(rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
manager.getSource(peerId) == null;
}
});
}
@ -672,25 +657,24 @@ public abstract class TestReplicationSourceManager {
static class DummyNodeFailoverWorker extends Thread {
private Map<String, Set<String>> logZnodesMap;
Server server;
private String deadRsZnode;
ReplicationQueues rq;
private ServerName deadRS;
ReplicationQueueStorage rq;
public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
this.deadRsZnode = znode;
public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
this.deadRS = deadRS;
this.server = s;
this.rq =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
server.getZooKeeper()));
this.rq.init(this.server.getServerName().toString());
this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
server.getConfiguration());
}
@Override
public void run() {
try {
logZnodesMap = new HashMap<>();
List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
for(String queue:queues){
Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
List<String> queues = rq.getAllQueues(deadRS);
for (String queue : queues) {
Pair<String, SortedSet<String>> pair =
rq.claimQueue(deadRS, queue, server.getServerName());
if (pair != null) {
logZnodesMap.put(pair.getFirst(), pair.getSecond());
}
@ -729,7 +713,7 @@ public abstract class TestReplicationSourceManager {
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId,
UUID clusterId, ReplicationEndpoint replicationEndpoint,
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
throw new IOException("Failing deliberately");

View File

@ -25,11 +25,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.junit.BeforeClass;
@ -41,8 +40,9 @@ import org.junit.experimental.categories.Category;
* ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
* TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors.
*/
@Category({ReplicationTests.class, MediumTests.class})
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
conf = HBaseConfiguration.create();
@ -58,16 +58,14 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
// Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
@Test
public void testNodeFailoverDeadServerParsing() throws Exception {
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
server.getZooKeeper()));
repQueues.init(server.getServerName().toString());
Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
ReplicationQueueStorage queueStorage =
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
repQueues.addLog("1", file);
queueStorage.addWAL(server.getServerName(), "1", file);
}
// create 3 DummyServers
@ -76,30 +74,22 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
// simulate three servers fail sequentially
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
String serverName = server.getServerName().getServerName();
List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName);
rq1.claimQueue(serverName, unclaimed.get(0)).getSecond();
rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
ReplicationQueues rq2 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
s2.getZooKeeper()));
rq2.init(s2.getServerName().toString());
serverName = s1.getServerName().getServerName();
unclaimed = rq2.getUnClaimedQueueIds(serverName);
rq2.claimQueue(serverName, unclaimed.get(0)).getSecond();
rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
ReplicationQueues rq3 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
s3.getZooKeeper()));
rq3.init(s3.getServerName().toString());
serverName = s2.getServerName().getServerName();
unclaimed = rq3.getUnClaimedQueueIds(serverName);
String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
ServerName serverName = server.getServerName();
List<String> unclaimed = queueStorage.getAllQueues(serverName);
queueStorage.claimQueue(serverName, unclaimed.get(0), s1.getServerName());
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
serverName = s1.getServerName();
unclaimed = queueStorage.getAllQueues(serverName);
queueStorage.claimQueue(serverName, unclaimed.get(0), s2.getServerName());
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
serverName = s2.getServerName();
unclaimed = queueStorage.getAllQueues(serverName);
String queue3 =
queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst();
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
// verify