HBASE-19617 Remove ReplicationQueues, use ReplicationQueueStorage directly
This commit is contained in:
parent
b40c426806
commit
d9b5eb3abb
|
@ -17,12 +17,11 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import org.apache.commons.lang3.reflect.ConstructorUtils;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.Stoppable;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A factory class for instantiating replication objects that deal with replication state.
|
||||
|
@ -30,12 +29,6 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
|||
@InterfaceAudience.Private
|
||||
public class ReplicationFactory {
|
||||
|
||||
public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
|
||||
throws Exception {
|
||||
return (ReplicationQueues) ConstructorUtils.invokeConstructor(ReplicationQueuesZKImpl.class,
|
||||
args);
|
||||
}
|
||||
|
||||
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
|
||||
Abortable abortable) {
|
||||
return getReplicationPeers(zk, conf, null, abortable);
|
||||
|
|
|
@ -1,161 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
||||
/**
|
||||
* This provides an interface for maintaining a region server's replication queues. These queues
|
||||
* keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
|
||||
* that still need to be replicated to remote clusters.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface ReplicationQueues {
|
||||
|
||||
/**
|
||||
* Initialize the region server replication queue interface.
|
||||
* @param serverName The server name of the region server that owns the replication queues this
|
||||
* interface manages.
|
||||
*/
|
||||
void init(String serverName) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove a replication queue.
|
||||
* @param queueId a String that identifies the queue.
|
||||
*/
|
||||
void removeQueue(String queueId);
|
||||
|
||||
/**
|
||||
* Add a new WAL file to the given queue. If the queue does not exist it is created.
|
||||
* @param queueId a String that identifies the queue.
|
||||
* @param filename name of the WAL
|
||||
*/
|
||||
void addLog(String queueId, String filename) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove an WAL file from the given queue.
|
||||
* @param queueId a String that identifies the queue.
|
||||
* @param filename name of the WAL
|
||||
*/
|
||||
void removeLog(String queueId, String filename);
|
||||
|
||||
/**
|
||||
* Set the current position for a specific WAL in a given queue.
|
||||
* @param queueId a String that identifies the queue
|
||||
* @param filename name of the WAL
|
||||
* @param position the current position in the file
|
||||
*/
|
||||
void setLogPosition(String queueId, String filename, long position);
|
||||
|
||||
/**
|
||||
* Get the current position for a specific WAL in a given queue.
|
||||
* @param queueId a String that identifies the queue
|
||||
* @param filename name of the WAL
|
||||
* @return the current position in the file
|
||||
*/
|
||||
long getLogPosition(String queueId, String filename) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove all replication queues for this region server.
|
||||
*/
|
||||
void removeAllQueues();
|
||||
|
||||
/**
|
||||
* Get a list of all WALs in the given queue.
|
||||
* @param queueId a String that identifies the queue
|
||||
* @return a list of WALs, null if no such queue exists for this server
|
||||
*/
|
||||
List<String> getLogsInQueue(String queueId);
|
||||
|
||||
/**
|
||||
* Get a list of all queues for this region server.
|
||||
* @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues
|
||||
*/
|
||||
List<String> getAllQueues();
|
||||
|
||||
/**
|
||||
* Get queueIds from a dead region server, whose queues has not been claimed by other region
|
||||
* servers.
|
||||
* @return empty if the queue exists but no children, null if the queue does not exist.
|
||||
*/
|
||||
List<String> getUnClaimedQueueIds(String regionserver);
|
||||
|
||||
/**
|
||||
* Take ownership for the queue identified by queueId and belongs to a dead region server.
|
||||
* @param regionserver the id of the dead region server
|
||||
* @param queueId the id of the queue
|
||||
* @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
|
||||
*/
|
||||
Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
|
||||
|
||||
/**
|
||||
* Remove the znode of region server if the queue is empty.
|
||||
* @param regionserver
|
||||
*/
|
||||
void removeReplicatorIfQueueIsEmpty(String regionserver);
|
||||
|
||||
/**
|
||||
* Get a list of all region servers that have outstanding replication queues. These servers could
|
||||
* be alive, dead or from a previous run of the cluster.
|
||||
* @return a list of server names
|
||||
* @throws ReplicationException
|
||||
*/
|
||||
List<String> getListOfReplicators() throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Checks if the provided znode is the same as this region server's
|
||||
* @param regionserver the id of the region server
|
||||
* @return if this is this rs's znode
|
||||
*/
|
||||
boolean isThisOurRegionServer(String regionserver);
|
||||
|
||||
/**
|
||||
* Add a peer to hfile reference queue if peer does not exist.
|
||||
* @param peerId peer cluster id to be added
|
||||
* @throws ReplicationException if fails to add a peer id to hfile reference queue
|
||||
*/
|
||||
void addPeerToHFileRefs(String peerId) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove a peer from hfile reference queue.
|
||||
* @param peerId peer cluster id to be removed
|
||||
*/
|
||||
void removePeerFromHFileRefs(String peerId);
|
||||
|
||||
/**
|
||||
* Add new hfile references to the queue.
|
||||
* @param peerId peer cluster id to which the hfiles need to be replicated
|
||||
* @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
|
||||
* will be added in the queue }
|
||||
* @throws ReplicationException if fails to add a hfile reference
|
||||
*/
|
||||
void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
|
||||
|
||||
/**
|
||||
* Remove hfile references from the queue.
|
||||
* @param peerId peer cluster id from which this hfile references needs to be removed
|
||||
* @param files list of hfile references to be removed
|
||||
*/
|
||||
void removeHFileRefs(String peerId, List<String> files);
|
||||
}
|
|
@ -1,70 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Wrapper around common arguments used to construct ReplicationQueues. Used to construct various
|
||||
* ReplicationQueues Implementations with different constructor arguments by reflection.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationQueuesArguments {
|
||||
|
||||
private ZKWatcher zk;
|
||||
private Configuration conf;
|
||||
private Abortable abort;
|
||||
|
||||
public ReplicationQueuesArguments(Configuration conf, Abortable abort) {
|
||||
this.conf = conf;
|
||||
this.abort = abort;
|
||||
}
|
||||
|
||||
public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZKWatcher zk) {
|
||||
this(conf, abort);
|
||||
setZk(zk);
|
||||
}
|
||||
|
||||
public ZKWatcher getZk() {
|
||||
return zk;
|
||||
}
|
||||
|
||||
public void setZk(ZKWatcher zk) {
|
||||
this.zk = zk;
|
||||
}
|
||||
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
public Abortable getAbortable() {
|
||||
return abort;
|
||||
}
|
||||
|
||||
public void setAbortable(Abortable abort) {
|
||||
this.abort = abort;
|
||||
}
|
||||
}
|
|
@ -1,417 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.SortedSet;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* This class provides an implementation of the
|
||||
* interface using ZooKeeper. The
|
||||
* base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
|
||||
* all outstanding WAL files on this region server that need to be replicated. The myQueuesZnode is
|
||||
* the regionserver name (a concatenation of the region server’s hostname, client port and start
|
||||
* code). For example:
|
||||
*
|
||||
* /hbase/replication/rs/hostname.example.org,6020,1234
|
||||
*
|
||||
* Within this znode, the region server maintains a set of WAL replication queues. These queues are
|
||||
* represented by child znodes named using there give queue id. For example:
|
||||
*
|
||||
* /hbase/replication/rs/hostname.example.org,6020,1234/1
|
||||
* /hbase/replication/rs/hostname.example.org,6020,1234/2
|
||||
*
|
||||
* Each queue has one child znode for every WAL that still needs to be replicated. The value of
|
||||
* these WAL child znodes is the latest position that has been replicated. This position is updated
|
||||
* every time a WAL entry is replicated. For example:
|
||||
*
|
||||
* /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
|
||||
|
||||
/** Znode containing all replication queues for this region server. */
|
||||
private String myQueuesZnode;
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);
|
||||
|
||||
public ReplicationQueuesZKImpl(ReplicationQueuesArguments args) {
|
||||
this(args.getZk(), args.getConf(), args.getAbortable());
|
||||
}
|
||||
|
||||
public ReplicationQueuesZKImpl(final ZKWatcher zk, Configuration conf,
|
||||
Abortable abortable) {
|
||||
super(zk, conf, abortable);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init(String serverName) throws ReplicationException {
|
||||
this.myQueuesZnode = ZNodePaths.joinZNode(this.queuesZNode, serverName);
|
||||
try {
|
||||
if (ZKUtil.checkExists(this.zookeeper, this.myQueuesZnode) < 0) {
|
||||
ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not initialize replication queues.", e);
|
||||
}
|
||||
if (conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
|
||||
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT)) {
|
||||
try {
|
||||
if (ZKUtil.checkExists(this.zookeeper, this.hfileRefsZNode) < 0) {
|
||||
ZKUtil.createWithParents(this.zookeeper, this.hfileRefsZNode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Could not initialize hfile references replication queue.",
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getListOfReplicators() throws ReplicationException {
|
||||
try {
|
||||
return super.getListOfReplicatorsZK();
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("getListOfReplicators() from ZK failed", e);
|
||||
throw new ReplicationException("getListOfReplicators() from ZK failed", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeQueue(String queueId) {
|
||||
try {
|
||||
ZKUtil.deleteNodeRecursively(this.zookeeper,
|
||||
ZNodePaths.joinZNode(this.myQueuesZnode, queueId));
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Failed to delete queue (queueId=" + queueId + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addLog(String queueId, String filename) throws ReplicationException {
|
||||
String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
|
||||
znode = ZNodePaths.joinZNode(znode, filename);
|
||||
try {
|
||||
ZKUtil.createWithParents(this.zookeeper, znode);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException(
|
||||
"Could not add log because znode could not be created. queueId=" + queueId
|
||||
+ ", filename=" + filename);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeLog(String queueId, String filename) {
|
||||
try {
|
||||
String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
|
||||
znode = ZNodePaths.joinZNode(znode, filename);
|
||||
ZKUtil.deleteNode(this.zookeeper, znode);
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
|
||||
+ filename + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setLogPosition(String queueId, String filename, long position) {
|
||||
try {
|
||||
String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
|
||||
znode = ZNodePaths.joinZNode(znode, filename);
|
||||
// Why serialize String of Long and not Long as bytes?
|
||||
ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Failed to write replication wal position (filename=" + filename
|
||||
+ ", position=" + position + ")", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getLogPosition(String queueId, String filename) throws ReplicationException {
|
||||
String clusterZnode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
|
||||
String znode = ZNodePaths.joinZNode(clusterZnode, filename);
|
||||
byte[] bytes = null;
|
||||
try {
|
||||
bytes = ZKUtil.getData(this.zookeeper, znode);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Internal Error: could not get position in log for queueId="
|
||||
+ queueId + ", filename=" + filename, e);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
return 0;
|
||||
}
|
||||
try {
|
||||
return ZKUtil.parseWALPositionFrom(bytes);
|
||||
} catch (DeserializationException de) {
|
||||
LOG.warn("Failed to parse WALPosition for queueId=" + queueId + " and wal=" + filename
|
||||
+ " znode content, continuing.");
|
||||
}
|
||||
// if we can not parse the position, start at the beginning of the wal file
|
||||
// again
|
||||
return 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isThisOurRegionServer(String regionserver) {
|
||||
return ZNodePaths.joinZNode(this.queuesZNode, regionserver).equals(this.myQueuesZnode);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getUnClaimedQueueIds(String regionserver) {
|
||||
if (isThisOurRegionServer(regionserver)) {
|
||||
return null;
|
||||
}
|
||||
String rsZnodePath = ZNodePaths.joinZNode(this.queuesZNode, regionserver);
|
||||
List<String> queues = null;
|
||||
try {
|
||||
queues = ZKUtil.listChildrenNoWatch(this.zookeeper, rsZnodePath);
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Failed to getUnClaimedQueueIds for RS" + regionserver, e);
|
||||
}
|
||||
return queues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId) {
|
||||
LOG.info("Atomically moving " + regionserver + "/" + queueId + "'s WALs to my queue");
|
||||
return moveQueueUsingMulti(regionserver, queueId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeReplicatorIfQueueIsEmpty(String regionserver) {
|
||||
String rsPath = ZNodePaths.joinZNode(this.queuesZNode, regionserver);
|
||||
try {
|
||||
List<String> list = ZKUtil.listChildrenNoWatch(this.zookeeper, rsPath);
|
||||
if (list != null && list.isEmpty()){
|
||||
ZKUtil.deleteNode(this.zookeeper, rsPath);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.warn("Got error while removing replicator", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeAllQueues() {
|
||||
try {
|
||||
ZKUtil.deleteNodeRecursively(this.zookeeper, this.myQueuesZnode);
|
||||
} catch (KeeperException e) {
|
||||
// if the znode is already expired, don't bother going further
|
||||
if (e instanceof KeeperException.SessionExpiredException) {
|
||||
return;
|
||||
}
|
||||
this.abortable.abort("Failed to delete replication queues for region server: "
|
||||
+ this.myQueuesZnode, e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getLogsInQueue(String queueId) {
|
||||
String znode = ZNodePaths.joinZNode(this.myQueuesZnode, queueId);
|
||||
List<String> result = null;
|
||||
try {
|
||||
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Failed to get list of wals for queueId=" + queueId, e);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getAllQueues() {
|
||||
List<String> listOfQueues = null;
|
||||
try {
|
||||
listOfQueues = ZKUtil.listChildrenNoWatch(this.zookeeper, this.myQueuesZnode);
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Failed to get a list of queues for region server: "
|
||||
+ this.myQueuesZnode, e);
|
||||
}
|
||||
return listOfQueues == null ? new ArrayList<>() : listOfQueues;
|
||||
}
|
||||
|
||||
/**
|
||||
* It "atomically" copies one peer's wals queue from another dead region server and returns them
|
||||
* all sorted. The new peer id is equal to the old peer id appended with the dead server's znode.
|
||||
* @param znode pertaining to the region server to copy the queues from
|
||||
* @peerId peerId pertaining to the queue need to be copied
|
||||
*/
|
||||
private Pair<String, SortedSet<String>> moveQueueUsingMulti(String znode, String peerId) {
|
||||
try {
|
||||
// hbase/replication/rs/deadrs
|
||||
String deadRSZnodePath = ZNodePaths.joinZNode(this.queuesZNode, znode);
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<>();
|
||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
|
||||
|
||||
String newPeerId = peerId + "-" + znode;
|
||||
String newPeerZnode = ZNodePaths.joinZNode(this.myQueuesZnode, newPeerId);
|
||||
// check the logs queue for the old peer cluster
|
||||
String oldClusterZnode = ZNodePaths.joinZNode(deadRSZnodePath, peerId);
|
||||
List<String> wals = ZKUtil.listChildrenNoWatch(this.zookeeper, oldClusterZnode);
|
||||
|
||||
if (!peerExists(replicationQueueInfo.getPeerId())) {
|
||||
LOG.warn("Peer " + replicationQueueInfo.getPeerId() +
|
||||
" didn't exist, will move its queue to avoid the failure of multi op");
|
||||
for (String wal : wals) {
|
||||
String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
|
||||
}
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||
return null;
|
||||
}
|
||||
|
||||
SortedSet<String> logQueue = new TreeSet<>();
|
||||
if (wals == null || wals.isEmpty()) {
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||
} else {
|
||||
// create the new cluster znode
|
||||
ZKUtilOp op = ZKUtilOp.createAndFailSilent(newPeerZnode, HConstants.EMPTY_BYTE_ARRAY);
|
||||
listOfOps.add(op);
|
||||
// get the offset of the logs and set it to new znodes
|
||||
for (String wal : wals) {
|
||||
String oldWalZnode = ZNodePaths.joinZNode(oldClusterZnode, wal);
|
||||
byte[] logOffset = ZKUtil.getData(this.zookeeper, oldWalZnode);
|
||||
LOG.debug("Creating " + wal + " with data " + Bytes.toString(logOffset));
|
||||
String newLogZnode = ZNodePaths.joinZNode(newPeerZnode, wal);
|
||||
listOfOps.add(ZKUtilOp.createAndFailSilent(newLogZnode, logOffset));
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldWalZnode));
|
||||
logQueue.add(wal);
|
||||
}
|
||||
// add delete op for peer
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
|
||||
|
||||
if (LOG.isTraceEnabled())
|
||||
LOG.trace(" The multi list size is: " + listOfOps.size());
|
||||
}
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
|
||||
|
||||
LOG.info("Atomically moved " + znode + "/" + peerId + "'s WALs to my queue");
|
||||
return new Pair<>(newPeerId, logQueue);
|
||||
} catch (KeeperException e) {
|
||||
// Multi call failed; it looks like some other regionserver took away the logs.
|
||||
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Got exception in copyQueuesFromRSUsingMulti: ", e);
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
|
||||
throws ReplicationException {
|
||||
String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
|
||||
boolean debugEnabled = LOG.isDebugEnabled();
|
||||
if (debugEnabled) {
|
||||
LOG.debug("Adding hfile references " + pairs + " in queue " + peerZnode);
|
||||
}
|
||||
|
||||
int size = pairs.size();
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<>(size);
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
listOfOps.add(ZKUtilOp.createAndFailSilent(
|
||||
ZNodePaths.joinZNode(peerZnode, pairs.get(i).getSecond().getName()),
|
||||
HConstants.EMPTY_BYTE_ARRAY));
|
||||
}
|
||||
if (debugEnabled) {
|
||||
LOG.debug(" The multi list size for adding hfile references in zk for node " + peerZnode
|
||||
+ " is " + listOfOps.size());
|
||||
}
|
||||
try {
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to create hfile reference znode=" + e.getPath(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removeHFileRefs(String peerId, List<String> files) {
|
||||
String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
|
||||
boolean debugEnabled = LOG.isDebugEnabled();
|
||||
if (debugEnabled) {
|
||||
LOG.debug("Removing hfile references " + files + " from queue " + peerZnode);
|
||||
}
|
||||
|
||||
int size = files.size();
|
||||
List<ZKUtilOp> listOfOps = new ArrayList<>(size);
|
||||
|
||||
for (int i = 0; i < size; i++) {
|
||||
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(ZNodePaths.joinZNode(peerZnode, files.get(i))));
|
||||
}
|
||||
if (debugEnabled) {
|
||||
LOG.debug(" The multi list size for removing hfile references in zk for node " + peerZnode
|
||||
+ " is " + listOfOps.size());
|
||||
}
|
||||
try {
|
||||
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, true);
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Failed to remove hfile reference znode=" + e.getPath(), e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addPeerToHFileRefs(String peerId) throws ReplicationException {
|
||||
String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
|
||||
try {
|
||||
if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
|
||||
LOG.info("Adding peer " + peerId + " to hfile reference queue.");
|
||||
ZKUtil.createWithParents(this.zookeeper, peerZnode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
throw new ReplicationException("Failed to add peer " + peerId + " to hfile reference queue.",
|
||||
e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void removePeerFromHFileRefs(String peerId) {
|
||||
final String peerZnode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
|
||||
try {
|
||||
if (ZKUtil.checkExists(this.zookeeper, peerZnode) == -1) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Peer " + peerZnode + " not found in hfile reference queue.");
|
||||
}
|
||||
return;
|
||||
} else {
|
||||
LOG.info("Removing peer " + peerZnode + " from hfile reference queue.");
|
||||
ZKUtil.deleteNodeRecursively(this.zookeeper, peerZnode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
LOG.error("Ignoring the exception to remove peer " + peerId + " from hfile reference queue.",
|
||||
e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,442 +0,0 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import org.apache.hadoop.hbase.CompareOperator;
|
||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.NamespaceDescriptor;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.client.Admin;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||
import org.apache.hadoop.hbase.client.Get;
|
||||
import org.apache.hadoop.hbase.client.Result;
|
||||
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||
import org.apache.hadoop.hbase.client.Scan;
|
||||
import org.apache.hadoop.hbase.client.Table;
|
||||
import org.apache.hadoop.hbase.filter.CompareFilter;
|
||||
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
|
||||
import org.apache.hadoop.hbase.regionserver.BloomType;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.RetryCounter;
|
||||
import org.apache.hadoop.hbase.util.RetryCounterFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
/*
|
||||
* Abstract class that provides an interface to the Replication Table. Which is currently
|
||||
* being used for WAL offset tracking.
|
||||
* The basic schema of this table will store each individual queue as a
|
||||
* seperate row. The row key will be a unique identifier of the creating server's name and the
|
||||
* queueId. Each queue must have the following two columns:
|
||||
* COL_QUEUE_OWNER: tracks which server is currently responsible for tracking the queue
|
||||
* COL_QUEUE_OWNER_HISTORY: a "|" delimited list of the previous server's that have owned this
|
||||
* queue. The most recent previous owner is the leftmost entry.
|
||||
* They will also have columns mapping [WAL filename : offset]
|
||||
* The most flexible method of interacting with the Replication Table is by calling
|
||||
* getOrBlockOnReplicationTable() which will return a new copy of the Replication Table. It is up
|
||||
* to the caller to close the returned table.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
abstract class ReplicationTableBase {
|
||||
|
||||
/** Name of the HBase Table used for tracking replication*/
|
||||
public static final TableName REPLICATION_TABLE_NAME =
|
||||
TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
|
||||
|
||||
// Column family and column names for Queues in the Replication Table
|
||||
public static final byte[] CF_QUEUE = Bytes.toBytes("q");
|
||||
public static final byte[] COL_QUEUE_OWNER = Bytes.toBytes("o");
|
||||
public static final byte[] COL_QUEUE_OWNER_HISTORY = Bytes.toBytes("h");
|
||||
|
||||
// Column Descriptor for the Replication Table
|
||||
private static final HColumnDescriptor REPLICATION_COL_DESCRIPTOR =
|
||||
new HColumnDescriptor(CF_QUEUE).setMaxVersions(1)
|
||||
.setInMemory(true)
|
||||
.setScope(HConstants.REPLICATION_SCOPE_LOCAL)
|
||||
// TODO: Figure out which bloom filter to use
|
||||
.setBloomFilterType(BloomType.NONE);
|
||||
|
||||
// The value used to delimit the queueId and server name inside of a queue's row key. Currently a
|
||||
// hyphen, because it is guaranteed that queueId (which is a cluster id) cannot contain hyphens.
|
||||
// See HBASE-11394.
|
||||
public static final String ROW_KEY_DELIMITER = "-";
|
||||
|
||||
// The value used to delimit server names in the queue history list
|
||||
public static final String QUEUE_HISTORY_DELIMITER = "|";
|
||||
|
||||
/*
|
||||
* Make sure that HBase table operations for replication have a high number of retries. This is
|
||||
* because the server is aborted if any HBase table operation fails. Each RPC will be attempted
|
||||
* 3600 times before exiting. This provides each operation with 2 hours of retries
|
||||
* before the server is aborted.
|
||||
*/
|
||||
private static final int CLIENT_RETRIES = 3600;
|
||||
private static final int RPC_TIMEOUT = 2000;
|
||||
private static final int OPERATION_TIMEOUT = CLIENT_RETRIES * RPC_TIMEOUT;
|
||||
|
||||
// We only need a single thread to initialize the Replication Table
|
||||
private static final int NUM_INITIALIZE_WORKERS = 1;
|
||||
|
||||
protected final Configuration conf;
|
||||
protected final Abortable abortable;
|
||||
private final Connection connection;
|
||||
private final Executor executor;
|
||||
private volatile CountDownLatch replicationTableInitialized;
|
||||
|
||||
public ReplicationTableBase(Configuration conf, Abortable abort) throws IOException {
|
||||
this.conf = new Configuration(conf);
|
||||
this.abortable = abort;
|
||||
decorateConf();
|
||||
this.connection = ConnectionFactory.createConnection(this.conf);
|
||||
this.executor = setUpExecutor();
|
||||
this.replicationTableInitialized = new CountDownLatch(1);
|
||||
createReplicationTableInBackground();
|
||||
}
|
||||
|
||||
/**
|
||||
* Modify the connection's config so that operations run on the Replication Table have longer and
|
||||
* a larger number of retries
|
||||
*/
|
||||
private void decorateConf() {
|
||||
this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, CLIENT_RETRIES);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets up the thread pool executor used to build the Replication Table in the background
|
||||
* @return the configured executor
|
||||
*/
|
||||
private Executor setUpExecutor() {
|
||||
ThreadPoolExecutor tempExecutor = new ThreadPoolExecutor(NUM_INITIALIZE_WORKERS,
|
||||
NUM_INITIALIZE_WORKERS, 100, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
|
||||
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
|
||||
tfb.setNameFormat("ReplicationTableExecutor-%d");
|
||||
tfb.setDaemon(true);
|
||||
tempExecutor.setThreadFactory(tfb.build());
|
||||
return tempExecutor;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get whether the Replication Table has been successfully initialized yet
|
||||
* @return whether the Replication Table is initialized
|
||||
*/
|
||||
public boolean getInitializationStatus() {
|
||||
return replicationTableInitialized.getCount() == 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Increases the RPC and operations timeouts for the Replication Table
|
||||
*/
|
||||
private Table setReplicationTableTimeOuts(Table replicationTable) {
|
||||
replicationTable.setRpcTimeout(RPC_TIMEOUT);
|
||||
replicationTable.setOperationTimeout(OPERATION_TIMEOUT);
|
||||
return replicationTable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Build the row key for the given queueId. This will uniquely identify it from all other queues
|
||||
* in the cluster.
|
||||
* @param serverName The owner of the queue
|
||||
* @param queueId String identifier of the queue
|
||||
* @return String representation of the queue's row key
|
||||
*/
|
||||
protected String buildQueueRowKey(String serverName, String queueId) {
|
||||
return queueId + ROW_KEY_DELIMITER + serverName;
|
||||
}
|
||||
|
||||
/**
|
||||
* Parse the original queueId from a row key
|
||||
* @param rowKey String representation of a queue's row key
|
||||
* @return the original queueId
|
||||
*/
|
||||
protected String getRawQueueIdFromRowKey(String rowKey) {
|
||||
return rowKey.split(ROW_KEY_DELIMITER)[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a queue's row key given either its raw or reclaimed queueId
|
||||
*
|
||||
* @param queueId queueId of the queue
|
||||
* @return byte representation of the queue's row key
|
||||
*/
|
||||
protected byte[] queueIdToRowKey(String serverName, String queueId) {
|
||||
// Cluster id's are guaranteed to have no hyphens, so if the passed in queueId has no hyphen
|
||||
// then this is not a reclaimed queue.
|
||||
if (!queueId.contains(ROW_KEY_DELIMITER)) {
|
||||
return Bytes.toBytes(buildQueueRowKey(serverName, queueId));
|
||||
// If the queueId contained some hyphen it was reclaimed. In this case, the queueId is the
|
||||
// queue's row key
|
||||
} else {
|
||||
return Bytes.toBytes(queueId);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a "|" delimited record of the queue's past region server owners.
|
||||
*
|
||||
* @param originalHistory the queue's original owner history
|
||||
* @param oldServer the name of the server that used to own the queue
|
||||
* @return the queue's new owner history
|
||||
*/
|
||||
protected String buildClaimedQueueHistory(String originalHistory, String oldServer) {
|
||||
return oldServer + QUEUE_HISTORY_DELIMITER + originalHistory;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a list of all region servers that have outstanding replication queues. These servers could
|
||||
* be alive, dead or from a previous run of the cluster.
|
||||
* @return a list of server names
|
||||
*/
|
||||
protected List<String> getListOfReplicators() {
|
||||
// scan all of the queues and return a list of all unique OWNER values
|
||||
Set<String> peerServers = new HashSet<>();
|
||||
ResultScanner allQueuesInCluster = null;
|
||||
try (Table replicationTable = getOrBlockOnReplicationTable()){
|
||||
Scan scan = new Scan();
|
||||
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
|
||||
allQueuesInCluster = replicationTable.getScanner(scan);
|
||||
for (Result queue : allQueuesInCluster) {
|
||||
peerServers.add(Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER)));
|
||||
}
|
||||
} catch (IOException e) {
|
||||
String errMsg = "Failed getting list of replicators";
|
||||
abortable.abort(errMsg, e);
|
||||
} finally {
|
||||
if (allQueuesInCluster != null) {
|
||||
allQueuesInCluster.close();
|
||||
}
|
||||
}
|
||||
return new ArrayList<>(peerServers);
|
||||
}
|
||||
|
||||
protected List<String> getAllQueues(String serverName) {
|
||||
List<String> allQueues = new ArrayList<>();
|
||||
ResultScanner queueScanner = null;
|
||||
try {
|
||||
queueScanner = getQueuesBelongingToServer(serverName);
|
||||
for (Result queue : queueScanner) {
|
||||
String rowKey = Bytes.toString(queue.getRow());
|
||||
// If the queue does not have a Owner History, then we must be its original owner. So we
|
||||
// want to return its queueId in raw form
|
||||
if (Bytes.toString(queue.getValue(CF_QUEUE, COL_QUEUE_OWNER_HISTORY)).length() == 0) {
|
||||
allQueues.add(getRawQueueIdFromRowKey(rowKey));
|
||||
} else {
|
||||
allQueues.add(rowKey);
|
||||
}
|
||||
}
|
||||
return allQueues;
|
||||
} catch (IOException e) {
|
||||
String errMsg = "Failed getting list of all replication queues for serverName=" + serverName;
|
||||
abortable.abort(errMsg, e);
|
||||
return null;
|
||||
} finally {
|
||||
if (queueScanner != null) {
|
||||
queueScanner.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected List<String> getLogsInQueue(String serverName, String queueId) {
|
||||
String rowKey = queueId;
|
||||
if (!queueId.contains(ROW_KEY_DELIMITER)) {
|
||||
rowKey = buildQueueRowKey(serverName, queueId);
|
||||
}
|
||||
return getLogsInQueue(Bytes.toBytes(rowKey));
|
||||
}
|
||||
|
||||
protected List<String> getLogsInQueue(byte[] rowKey) {
|
||||
String errMsg = "Failed getting logs in queue queueId=" + Bytes.toString(rowKey);
|
||||
try (Table replicationTable = getOrBlockOnReplicationTable()) {
|
||||
Get getQueue = new Get(rowKey);
|
||||
Result queue = replicationTable.get(getQueue);
|
||||
if (queue == null || queue.isEmpty()) {
|
||||
abortable.abort(errMsg, new ReplicationException(errMsg));
|
||||
return null;
|
||||
}
|
||||
return readWALsFromResult(queue);
|
||||
} catch (IOException e) {
|
||||
abortable.abort(errMsg, e);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Read all of the WAL's from a queue into a list
|
||||
*
|
||||
* @param queue HBase query result containing the queue
|
||||
* @return a list of all the WAL filenames
|
||||
*/
|
||||
protected List<String> readWALsFromResult(Result queue) {
|
||||
List<String> wals = new ArrayList<>();
|
||||
Map<byte[], byte[]> familyMap = queue.getFamilyMap(CF_QUEUE);
|
||||
for (byte[] cQualifier : familyMap.keySet()) {
|
||||
// Ignore the meta data fields of the queue
|
||||
if (Arrays.equals(cQualifier, COL_QUEUE_OWNER) || Arrays.equals(cQualifier,
|
||||
COL_QUEUE_OWNER_HISTORY)) {
|
||||
continue;
|
||||
}
|
||||
wals.add(Bytes.toString(cQualifier));
|
||||
}
|
||||
return wals;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the queue id's and meta data (Owner and History) for the queues belonging to the named
|
||||
* server
|
||||
*
|
||||
* @param server name of the server
|
||||
* @return a ResultScanner over the QueueIds belonging to the server
|
||||
* @throws IOException
|
||||
*/
|
||||
protected ResultScanner getQueuesBelongingToServer(String server) throws IOException {
|
||||
Scan scan = new Scan();
|
||||
SingleColumnValueFilter filterMyQueues = new SingleColumnValueFilter(CF_QUEUE, COL_QUEUE_OWNER,
|
||||
CompareOperator.EQUAL, Bytes.toBytes(server));
|
||||
scan.setFilter(filterMyQueues);
|
||||
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER);
|
||||
scan.addColumn(CF_QUEUE, COL_QUEUE_OWNER_HISTORY);
|
||||
try (Table replicationTable = getOrBlockOnReplicationTable()) {
|
||||
ResultScanner results = replicationTable.getScanner(scan);
|
||||
return results;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to acquire the Replication Table. This operation will block until it is assigned by
|
||||
* the CreateReplicationWorker thread. It is up to the caller of this method to close the
|
||||
* returned Table
|
||||
* @return the Replication Table when it is created
|
||||
* @throws IOException
|
||||
*/
|
||||
protected Table getOrBlockOnReplicationTable() throws IOException {
|
||||
// Sleep until the Replication Table becomes available
|
||||
try {
|
||||
replicationTableInitialized.await();
|
||||
} catch (InterruptedException e) {
|
||||
String errMsg = "Unable to acquire the Replication Table due to InterruptedException: " +
|
||||
e.getMessage();
|
||||
throw new InterruptedIOException(errMsg);
|
||||
}
|
||||
return getAndSetUpReplicationTable();
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new copy of the Replication Table and sets up the proper Table time outs for it
|
||||
*
|
||||
* @return the Replication Table
|
||||
* @throws IOException
|
||||
*/
|
||||
private Table getAndSetUpReplicationTable() throws IOException {
|
||||
Table replicationTable = connection.getTable(REPLICATION_TABLE_NAME);
|
||||
setReplicationTableTimeOuts(replicationTable);
|
||||
return replicationTable;
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds the Replication Table in a background thread. Any method accessing the Replication Table
|
||||
* should do so through getOrBlockOnReplicationTable()
|
||||
*
|
||||
* @return the Replication Table
|
||||
* @throws IOException if the Replication Table takes too long to build
|
||||
*/
|
||||
private void createReplicationTableInBackground() throws IOException {
|
||||
executor.execute(new CreateReplicationTableWorker());
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to build the Replication Table. Will continue blocking until we have a valid
|
||||
* Table for the Replication Table.
|
||||
*/
|
||||
private class CreateReplicationTableWorker implements Runnable {
|
||||
|
||||
private Admin admin;
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
admin = connection.getAdmin();
|
||||
if (!replicationTableExists()) {
|
||||
createReplicationTable();
|
||||
}
|
||||
int maxRetries = conf.getInt("hbase.replication.queues.createtable.retries.number",
|
||||
CLIENT_RETRIES);
|
||||
RetryCounterFactory counterFactory = new RetryCounterFactory(maxRetries, RPC_TIMEOUT);
|
||||
RetryCounter retryCounter = counterFactory.create();
|
||||
while (!replicationTableExists()) {
|
||||
retryCounter.sleepUntilNextRetry();
|
||||
if (!retryCounter.shouldRetry()) {
|
||||
throw new IOException("Unable to acquire the Replication Table");
|
||||
}
|
||||
}
|
||||
replicationTableInitialized.countDown();
|
||||
} catch (IOException | InterruptedException e) {
|
||||
abortable.abort("Failed building Replication Table", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the replication table with the provided HColumnDescriptor REPLICATION_COL_DESCRIPTOR
|
||||
* in TableBasedReplicationQueuesImpl
|
||||
*
|
||||
* @throws IOException
|
||||
*/
|
||||
private void createReplicationTable() throws IOException {
|
||||
HTableDescriptor replicationTableDescriptor = new HTableDescriptor(REPLICATION_TABLE_NAME);
|
||||
replicationTableDescriptor.addFamily(REPLICATION_COL_DESCRIPTOR);
|
||||
try {
|
||||
admin.createTable(replicationTableDescriptor);
|
||||
} catch (TableExistsException e) {
|
||||
// In this case we can just continue as normal
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks whether the Replication Table exists yet
|
||||
*
|
||||
* @return whether the Replication Table exists
|
||||
* @throws IOException
|
||||
*/
|
||||
private boolean replicationTableExists() {
|
||||
try {
|
||||
return admin.tableExists(REPLICATION_TABLE_NAME);
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -54,6 +53,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
|
|||
super(zookeeper, conf, abortable);
|
||||
this.stopper = stopper;
|
||||
this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
|
||||
// watch the changes
|
||||
refreshOtherRegionServersList(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -71,7 +72,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
|
|||
*/
|
||||
@Override
|
||||
public List<String> getListOfRegionServers() {
|
||||
refreshOtherRegionServersList();
|
||||
refreshOtherRegionServersList(false);
|
||||
|
||||
List<String> list = null;
|
||||
synchronized (otherRegionServers) {
|
||||
|
@ -137,7 +138,7 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
|
|||
if (!path.startsWith(this.watcher.znodePaths.rsZNode)) {
|
||||
return false;
|
||||
}
|
||||
return refreshOtherRegionServersList();
|
||||
return refreshOtherRegionServersList(true);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -157,8 +158,8 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
|
|||
* @return true if the local list of the other region servers was updated with the ZK data (even
|
||||
* if it was empty), false if the data was missing in ZK
|
||||
*/
|
||||
private boolean refreshOtherRegionServersList() {
|
||||
List<String> newRsList = getRegisteredRegionServers();
|
||||
private boolean refreshOtherRegionServersList(boolean watch) {
|
||||
List<String> newRsList = getRegisteredRegionServers(watch);
|
||||
if (newRsList == null) {
|
||||
return false;
|
||||
} else {
|
||||
|
@ -174,10 +175,14 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
|
|||
* Get a list of all the other region servers in this cluster and set a watch
|
||||
* @return a list of server nanes
|
||||
*/
|
||||
private List<String> getRegisteredRegionServers() {
|
||||
private List<String> getRegisteredRegionServers(boolean watch) {
|
||||
List<String> result = null;
|
||||
try {
|
||||
if (watch) {
|
||||
result = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
|
||||
} else {
|
||||
result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.zookeeper.znodePaths.rsZNode);
|
||||
}
|
||||
} catch (KeeperException e) {
|
||||
this.abortable.abort("Get list of registered region servers", e);
|
||||
}
|
||||
|
|
|
@ -54,6 +54,28 @@ import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTe
|
|||
|
||||
/**
|
||||
* ZK based replication queue storage.
|
||||
* <p>
|
||||
* The base znode for each regionserver is the regionserver name. For example:
|
||||
*
|
||||
* <pre>
|
||||
* /hbase/replication/rs/hostname.example.org,6020,1234
|
||||
* </pre>
|
||||
*
|
||||
* Within this znode, the region server maintains a set of WAL replication queues. These queues are
|
||||
* represented by child znodes named using there give queue id. For example:
|
||||
*
|
||||
* <pre>
|
||||
* /hbase/replication/rs/hostname.example.org,6020,1234/1
|
||||
* /hbase/replication/rs/hostname.example.org,6020,1234/2
|
||||
* </pre>
|
||||
*
|
||||
* Each queue has one child znode for every WAL that still needs to be replicated. The value of
|
||||
* these WAL child znodes is the latest position that has been replicated. This position is updated
|
||||
* every time a WAL entry is replicated. For example:
|
||||
*
|
||||
* <pre>
|
||||
* /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
|
||||
* </pre>
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class ZKReplicationQueueStorage extends ZKReplicationStorageBase
|
||||
|
|
|
@ -42,9 +42,8 @@ import org.slf4j.LoggerFactory;
|
|||
*/
|
||||
public abstract class TestReplicationStateBasic {
|
||||
|
||||
protected ReplicationQueues rq1;
|
||||
protected ReplicationQueues rq2;
|
||||
protected ReplicationQueues rq3;
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
|
||||
|
||||
protected ReplicationQueueStorage rqs;
|
||||
protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
|
||||
protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
|
||||
|
@ -63,8 +62,6 @@ public abstract class TestReplicationStateBasic {
|
|||
protected static final int ZK_MAX_COUNT = 300;
|
||||
protected static final int ZK_SLEEP_INTERVAL = 100; // millis
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
|
||||
|
||||
@Test
|
||||
public void testReplicationQueueStorage() throws ReplicationException {
|
||||
// Test methods with empty state
|
||||
|
@ -76,15 +73,13 @@ public abstract class TestReplicationStateBasic {
|
|||
* Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
|
||||
* server2: zero queues
|
||||
*/
|
||||
rq1.init(server1.getServerName());
|
||||
rq2.init(server2.getServerName());
|
||||
rq1.addLog("qId1", "trash");
|
||||
rq1.removeLog("qId1", "trash");
|
||||
rq1.addLog("qId2", "filename1");
|
||||
rq1.addLog("qId3", "filename2");
|
||||
rq1.addLog("qId3", "filename3");
|
||||
rq2.addLog("trash", "trash");
|
||||
rq2.removeQueue("trash");
|
||||
rqs.addWAL(server1, "qId1", "trash");
|
||||
rqs.removeWAL(server1, "qId1", "trash");
|
||||
rqs.addWAL(server1,"qId2", "filename1");
|
||||
rqs.addWAL(server1,"qId3", "filename2");
|
||||
rqs.addWAL(server1,"qId3", "filename3");
|
||||
rqs.addWAL(server2,"trash", "trash");
|
||||
rqs.removeQueue(server2,"trash");
|
||||
|
||||
List<ServerName> reps = rqs.getListOfReplicators();
|
||||
assertEquals(2, reps.size());
|
||||
|
@ -105,62 +100,55 @@ public abstract class TestReplicationStateBasic {
|
|||
assertTrue(list.contains("qId3"));
|
||||
}
|
||||
|
||||
private void removeAllQueues(ServerName serverName) throws ReplicationException {
|
||||
for (String queue: rqs.getAllQueues(serverName)) {
|
||||
rqs.removeQueue(serverName, queue);
|
||||
}
|
||||
}
|
||||
@Test
|
||||
public void testReplicationQueues() throws ReplicationException {
|
||||
rq1.init(server1.getServerName());
|
||||
rq2.init(server2.getServerName());
|
||||
rq3.init(server3.getServerName());
|
||||
// Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
|
||||
rp.init();
|
||||
|
||||
// 3 replicators should exist
|
||||
assertEquals(3, rq1.getListOfReplicators().size());
|
||||
rq1.removeQueue("bogus");
|
||||
rq1.removeLog("bogus", "bogus");
|
||||
rq1.removeAllQueues();
|
||||
assertEquals(0, rq1.getAllQueues().size());
|
||||
assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
|
||||
assertNull(rq1.getLogsInQueue("bogus"));
|
||||
assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString()));
|
||||
|
||||
rq1.setLogPosition("bogus", "bogus", 5L);
|
||||
rqs.removeQueue(server1, "bogus");
|
||||
rqs.removeWAL(server1, "bogus", "bogus");
|
||||
removeAllQueues(server1);
|
||||
assertEquals(0, rqs.getAllQueues(server1).size());
|
||||
assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
|
||||
assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
|
||||
assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty());
|
||||
|
||||
populateQueues();
|
||||
|
||||
assertEquals(3, rq1.getListOfReplicators().size());
|
||||
assertEquals(0, rq2.getLogsInQueue("qId1").size());
|
||||
assertEquals(5, rq3.getLogsInQueue("qId5").size());
|
||||
assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
|
||||
rq3.setLogPosition("qId5", "filename4", 354L);
|
||||
assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
|
||||
assertEquals(3, rqs.getListOfReplicators().size());
|
||||
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
|
||||
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
|
||||
assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
|
||||
rqs.setWALPosition(server3, "qId5", "filename4", 354L);
|
||||
assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
|
||||
|
||||
assertEquals(5, rq3.getLogsInQueue("qId5").size());
|
||||
assertEquals(0, rq2.getLogsInQueue("qId1").size());
|
||||
assertEquals(0, rq1.getAllQueues().size());
|
||||
assertEquals(1, rq2.getAllQueues().size());
|
||||
assertEquals(5, rq3.getAllQueues().size());
|
||||
assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
|
||||
assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
|
||||
assertEquals(0, rqs.getAllQueues(server1).size());
|
||||
assertEquals(1, rqs.getAllQueues(server2).size());
|
||||
assertEquals(5, rqs.getAllQueues(server3).size());
|
||||
|
||||
assertEquals(0, rq3.getUnClaimedQueueIds(server1.getServerName()).size());
|
||||
rq3.removeReplicatorIfQueueIsEmpty(server1.getServerName());
|
||||
assertEquals(2, rq3.getListOfReplicators().size());
|
||||
assertEquals(0, rqs.getAllQueues(server1).size());
|
||||
rqs.removeReplicatorIfQueueIsEmpty(server1);
|
||||
assertEquals(2, rqs.getListOfReplicators().size());
|
||||
|
||||
List<String> queues = rq2.getUnClaimedQueueIds(server3.getServerName());
|
||||
List<String> queues = rqs.getAllQueues(server3);
|
||||
assertEquals(5, queues.size());
|
||||
for (String queue : queues) {
|
||||
rq2.claimQueue(server3.getServerName(), queue);
|
||||
rqs.claimQueue(server3, queue, server2);
|
||||
}
|
||||
rq2.removeReplicatorIfQueueIsEmpty(server3.getServerName());
|
||||
assertEquals(1, rq2.getListOfReplicators().size());
|
||||
rqs.removeReplicatorIfQueueIsEmpty(server3);
|
||||
assertEquals(1, rqs.getListOfReplicators().size());
|
||||
|
||||
// Try to claim our own queues
|
||||
assertNull(rq2.getUnClaimedQueueIds(server2.getServerName()));
|
||||
rq2.removeReplicatorIfQueueIsEmpty(server2.getServerName());
|
||||
|
||||
assertEquals(6, rq2.getAllQueues().size());
|
||||
|
||||
rq2.removeAllQueues();
|
||||
|
||||
assertEquals(0, rq2.getListOfReplicators().size());
|
||||
assertEquals(6, rqs.getAllQueues(server2).size());
|
||||
removeAllQueues(server2);
|
||||
rqs.removeReplicatorIfQueueIsEmpty(server2);
|
||||
assertEquals(0, rqs.getListOfReplicators().size());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -197,7 +185,6 @@ public abstract class TestReplicationStateBasic {
|
|||
@Test
|
||||
public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
|
||||
rp.init();
|
||||
rq1.init(server1.getServerName());
|
||||
|
||||
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
|
||||
files1.add(new Pair<>(null, new Path("file_1")));
|
||||
|
@ -206,8 +193,8 @@ public abstract class TestReplicationStateBasic {
|
|||
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
|
||||
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
|
||||
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
|
||||
rq1.addPeerToHFileRefs(ID_ONE);
|
||||
rq1.addHFileRefs(ID_ONE, files1);
|
||||
rqs.addPeerToHFileRefs(ID_ONE);
|
||||
rqs.addHFileRefs(ID_ONE, files1);
|
||||
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
|
||||
assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
|
||||
List<String> hfiles2 = new ArrayList<>(files1.size());
|
||||
|
@ -215,43 +202,41 @@ public abstract class TestReplicationStateBasic {
|
|||
hfiles2.add(p.getSecond().getName());
|
||||
}
|
||||
String removedString = hfiles2.remove(0);
|
||||
rq1.removeHFileRefs(ID_ONE, hfiles2);
|
||||
rqs.removeHFileRefs(ID_ONE, hfiles2);
|
||||
assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
|
||||
hfiles2 = new ArrayList<>(1);
|
||||
hfiles2.add(removedString);
|
||||
rq1.removeHFileRefs(ID_ONE, hfiles2);
|
||||
rqs.removeHFileRefs(ID_ONE, hfiles2);
|
||||
assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
|
||||
rp.unregisterPeer(ID_ONE);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
|
||||
rq1.init(server1.getServerName());
|
||||
|
||||
rp.init();
|
||||
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
|
||||
rq1.addPeerToHFileRefs(ID_ONE);
|
||||
rqs.addPeerToHFileRefs(ID_ONE);
|
||||
rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
|
||||
rq1.addPeerToHFileRefs(ID_TWO);
|
||||
rqs.addPeerToHFileRefs(ID_TWO);
|
||||
|
||||
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
|
||||
files1.add(new Pair<>(null, new Path("file_1")));
|
||||
files1.add(new Pair<>(null, new Path("file_2")));
|
||||
files1.add(new Pair<>(null, new Path("file_3")));
|
||||
rq1.addHFileRefs(ID_ONE, files1);
|
||||
rq1.addHFileRefs(ID_TWO, files1);
|
||||
rqs.addHFileRefs(ID_ONE, files1);
|
||||
rqs.addHFileRefs(ID_TWO, files1);
|
||||
assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
|
||||
assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
|
||||
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
|
||||
|
||||
rp.unregisterPeer(ID_ONE);
|
||||
rq1.removePeerFromHFileRefs(ID_ONE);
|
||||
rqs.removePeerFromHFileRefs(ID_ONE);
|
||||
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
|
||||
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
|
||||
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
|
||||
|
||||
rp.unregisterPeer(ID_TWO);
|
||||
rq1.removePeerFromHFileRefs(ID_TWO);
|
||||
rqs.removePeerFromHFileRefs(ID_TWO);
|
||||
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
|
||||
assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
|
||||
}
|
||||
|
@ -363,15 +348,15 @@ public abstract class TestReplicationStateBasic {
|
|||
* 3, 4, 5 log files respectively
|
||||
*/
|
||||
protected void populateQueues() throws ReplicationException {
|
||||
rq1.addLog("trash", "trash");
|
||||
rq1.removeQueue("trash");
|
||||
rqs.addWAL(server1, "trash", "trash");
|
||||
rqs.removeQueue(server1, "trash");
|
||||
|
||||
rq2.addLog("qId1", "trash");
|
||||
rq2.removeLog("qId1", "trash");
|
||||
rqs.addWAL(server2, "qId1", "trash");
|
||||
rqs.removeWAL(server2, "qId1", "trash");
|
||||
|
||||
for (int i = 1; i < 6; i++) {
|
||||
for (int j = 0; j < i; j++) {
|
||||
rq3.addLog("qId" + i, "filename" + j);
|
||||
rqs.addWAL(server3, "qId" + i, "filename" + j);
|
||||
}
|
||||
// Add peers for the corresponding queues so they are not orphans
|
||||
rp.registerPeer("qId" + i,
|
||||
|
|
|
@ -17,10 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.replication;
|
||||
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.Abortable;
|
||||
|
@ -41,7 +37,6 @@ import org.junit.AfterClass;
|
|||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
@ -59,7 +54,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
|
|||
private static HBaseZKTestingUtility utility;
|
||||
private static ZKWatcher zkw;
|
||||
private static String replicationZNode;
|
||||
private ReplicationQueuesZKImpl rqZK;
|
||||
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
|
@ -89,23 +83,9 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
|
|||
@Before
|
||||
public void setUp() {
|
||||
zkTimeoutCount = 0;
|
||||
WarnOnlyAbortable abortable = new WarnOnlyAbortable();
|
||||
try {
|
||||
rq1 = ReplicationFactory
|
||||
.getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
|
||||
rq2 = ReplicationFactory
|
||||
.getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
|
||||
rq3 = ReplicationFactory
|
||||
.getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
|
||||
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
|
||||
} catch (Exception e) {
|
||||
// This should not occur, because getReplicationQueues() only throws for
|
||||
// TableBasedReplicationQueuesImpl
|
||||
fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
|
||||
}
|
||||
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
|
||||
rp = ReplicationFactory.getReplicationPeers(zkw, conf, new WarnOnlyAbortable());
|
||||
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
|
||||
rqZK = new ReplicationQueuesZKImpl(zkw, conf, abortable);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -118,23 +98,6 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
|
|||
utility.shutdownMiniZKCluster();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsPeerPath_PathToParentOfPeerNode() {
|
||||
assertFalse(rqZK.isPeerPath(rqZK.peersZNode));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsPeerPath_PathToChildOfPeerNode() {
|
||||
String peerChild = ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child");
|
||||
assertFalse(rqZK.isPeerPath(peerChild));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIsPeerPath_ActualPeerPath() {
|
||||
String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1");
|
||||
assertTrue(rqZK.isPeerPath(peerPath));
|
||||
}
|
||||
|
||||
private static class WarnOnlyAbortable implements Abortable {
|
||||
|
||||
@Override
|
||||
|
|
|
@ -49,8 +49,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
|
||||
|
@ -307,14 +305,10 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||
boolean hdfs) throws Exception {
|
||||
ReplicationQueueStorage queueStorage;
|
||||
ReplicationPeers replicationPeers;
|
||||
ReplicationQueues replicationQueues;
|
||||
ReplicationTracker replicationTracker;
|
||||
ReplicationQueuesArguments replicationArgs =
|
||||
new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), zkw);
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
||||
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
|
||||
replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs);
|
||||
replicationPeers =
|
||||
ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection);
|
||||
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
|
||||
|
@ -328,7 +322,6 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||
}
|
||||
for (ServerName regionserver : regionservers) {
|
||||
List<String> queueIds = queueStorage.getAllQueues(regionserver);
|
||||
replicationQueues.init(regionserver.getServerName());
|
||||
if (!liveRegionServers.contains(regionserver.getServerName())) {
|
||||
deadRegionServers.add(regionserver.getServerName());
|
||||
}
|
||||
|
@ -338,17 +331,17 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||
if (!peerIds.contains(queueInfo.getPeerId())) {
|
||||
deletedQueues.add(regionserver + "/" + queueId);
|
||||
sb.append(
|
||||
formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
|
||||
formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
|
||||
} else {
|
||||
sb.append(
|
||||
formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
|
||||
formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
|
||||
}
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private String formatQueue(ServerName regionserver, ReplicationQueues replicationQueues,
|
||||
private String formatQueue(ServerName regionserver, ReplicationQueueStorage queueStorage,
|
||||
ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
|
||||
boolean hdfs) throws Exception {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -370,7 +363,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
|
|||
peersQueueSize.addAndGet(queueInfo.getPeerId(), wals.size());
|
||||
|
||||
for (String wal : wals) {
|
||||
long position = replicationQueues.getLogPosition(queueInfo.getPeerId(), wal);
|
||||
long position = queueStorage.getWALPosition(regionserver, queueInfo.getPeerId(), wal);
|
||||
sb.append(" Replication position for " + wal + ": " + (position > 0 ? position : "0"
|
||||
+ " (not started or nothing to replicate)") + "\n");
|
||||
}
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -29,15 +28,15 @@ import org.apache.hadoop.fs.FileSystem;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.util.FSUtils;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Class that handles the recovered source of a replication stream, which is transfered from
|
||||
|
@ -52,10 +51,10 @@ public class RecoveredReplicationSource extends ReplicationSource {
|
|||
|
||||
@Override
|
||||
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
|
||||
ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server,
|
||||
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
|
||||
super.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerClusterZnode,
|
||||
super.init(conf, fs, manager, queueStorage, replicationPeers, server, peerClusterZnode,
|
||||
clusterId, replicationEndpoint, walFileLengthProvider, metrics);
|
||||
this.actualPeerId = this.replicationQueueInfo.getPeerId();
|
||||
}
|
||||
|
@ -64,7 +63,7 @@ public class RecoveredReplicationSource extends ReplicationSource {
|
|||
protected void tryStartNewShipper(String walGroupId, PriorityBlockingQueue<Path> queue) {
|
||||
final RecoveredReplicationSourceShipper worker =
|
||||
new RecoveredReplicationSourceShipper(conf, walGroupId, queue, this,
|
||||
this.replicationQueues);
|
||||
this.queueStorage);
|
||||
ReplicationSourceShipper extant = workerThreads.putIfAbsent(walGroupId, worker);
|
||||
if (extant != null) {
|
||||
LOG.debug("Someone has beat us to start a worker thread for wal group " + walGroupId);
|
||||
|
|
|
@ -23,13 +23,13 @@ import java.util.concurrent.PriorityBlockingQueue;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReader.WALEntryBatch;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
|
||||
/**
|
||||
* Used by a {@link RecoveredReplicationSource}.
|
||||
|
@ -40,14 +40,14 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
|
|||
LoggerFactory.getLogger(RecoveredReplicationSourceShipper.class);
|
||||
|
||||
protected final RecoveredReplicationSource source;
|
||||
private final ReplicationQueues replicationQueues;
|
||||
private final ReplicationQueueStorage replicationQueues;
|
||||
|
||||
public RecoveredReplicationSourceShipper(Configuration conf, String walGroupId,
|
||||
PriorityBlockingQueue<Path> queue, RecoveredReplicationSource source,
|
||||
ReplicationQueues replicationQueues) {
|
||||
ReplicationQueueStorage queueStorage) {
|
||||
super(conf, walGroupId, queue, source);
|
||||
this.source = source;
|
||||
this.replicationQueues = replicationQueues;
|
||||
this.replicationQueues = queueStorage;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -115,11 +115,11 @@ public class RecoveredReplicationSourceShipper extends ReplicationSourceShipper
|
|||
long startPosition = 0;
|
||||
String peerClusterZnode = source.getPeerClusterZnode();
|
||||
try {
|
||||
startPosition = this.replicationQueues.getLogPosition(peerClusterZnode,
|
||||
this.queue.peek().getName());
|
||||
startPosition = this.replicationQueues.getWALPosition(source.getServerWALsBelongTo(),
|
||||
peerClusterZnode, this.queue.peek().getName());
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position "
|
||||
+ startPosition);
|
||||
LOG.trace("Recovered queue started with log " + this.queue.peek() + " at position " +
|
||||
startPosition);
|
||||
}
|
||||
} catch (ReplicationException e) {
|
||||
terminate("Couldn't get the position of this recovered queue " + peerClusterZnode, e);
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -38,8 +37,8 @@ import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationUtils;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
|
@ -53,7 +52,6 @@ import org.slf4j.LoggerFactory;
|
|||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||
|
||||
/**
|
||||
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
|
||||
*/
|
||||
|
@ -63,7 +61,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
|||
LoggerFactory.getLogger(Replication.class);
|
||||
private boolean isReplicationForBulkLoadDataEnabled;
|
||||
private ReplicationSourceManager replicationManager;
|
||||
private ReplicationQueues replicationQueues;
|
||||
private ReplicationQueueStorage queueStorage;
|
||||
private ReplicationPeers replicationPeers;
|
||||
private ReplicationTracker replicationTracker;
|
||||
private Configuration conf;
|
||||
|
@ -106,10 +104,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
|||
}
|
||||
|
||||
try {
|
||||
this.replicationQueues =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server,
|
||||
server.getZooKeeper()));
|
||||
this.replicationQueues.init(this.server.getServerName().toString());
|
||||
this.queueStorage =
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
|
||||
this.replicationPeers =
|
||||
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
|
||||
this.replicationPeers.init();
|
||||
|
@ -125,8 +121,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
|
|||
} catch (KeeperException ke) {
|
||||
throw new IOException("Could not read cluster id", ke);
|
||||
}
|
||||
this.replicationManager = new ReplicationSourceManager(replicationQueues, replicationPeers,
|
||||
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
|
||||
this.replicationManager = new ReplicationSourceManager(queueStorage, replicationPeers, replicationTracker, conf,
|
||||
this.server, fs, logDir, oldLogDir, clusterId,
|
||||
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
|
||||
if (walProvider != null) {
|
||||
walProvider
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -31,7 +30,6 @@ import java.util.concurrent.PriorityBlockingQueue;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
|
@ -49,7 +47,7 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationPeer;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
|
||||
import org.apache.hadoop.hbase.replication.WALEntryFilter;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -83,7 +81,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
private Map<String, PriorityBlockingQueue<Path>> queues = new HashMap<>();
|
||||
// per group queue size, keep no more than this number of logs in each wal group
|
||||
protected int queueSizePerGroup;
|
||||
protected ReplicationQueues replicationQueues;
|
||||
protected ReplicationQueueStorage queueStorage;
|
||||
private ReplicationPeers replicationPeers;
|
||||
|
||||
protected Configuration conf;
|
||||
|
@ -148,7 +146,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
*/
|
||||
@Override
|
||||
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
|
||||
ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server,
|
||||
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
|
||||
this.server = server;
|
||||
|
@ -161,7 +159,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
this.maxRetriesMultiplier =
|
||||
this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
|
||||
this.queueSizePerGroup = this.conf.getInt("hbase.regionserver.maxlogs", 32);
|
||||
this.replicationQueues = replicationQueues;
|
||||
this.queueStorage = queueStorage;
|
||||
this.replicationPeers = replicationPeers;
|
||||
this.manager = manager;
|
||||
this.fs = fs;
|
||||
|
@ -230,7 +228,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
List<String> tableCfs = tableCFMap.get(tableName);
|
||||
if (tableCFMap.containsKey(tableName)
|
||||
&& (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
|
||||
this.replicationQueues.addHFileRefs(peerId, pairs);
|
||||
this.queueStorage.addHFileRefs(peerId, pairs);
|
||||
metrics.incrSizeOfHFileRefsQueue(pairs.size());
|
||||
} else {
|
||||
LOG.debug("HFiles will not be replicated belonging to the table " + tableName + " family "
|
||||
|
@ -239,7 +237,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
|
|||
} else {
|
||||
// user has explicitly not defined any table cfs for replication, means replicate all the
|
||||
// data
|
||||
this.replicationQueues.addHFileRefs(peerId, pairs);
|
||||
this.queueStorage.addHFileRefs(peerId, pairs);
|
||||
metrics.incrSizeOfHFileRefsQueue(pairs.size());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,6 @@ import java.io.IOException;
|
|||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -32,9 +31,10 @@ import org.apache.hadoop.hbase.TableName;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationException;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* Interface that defines a replication source
|
||||
|
@ -47,15 +47,10 @@ public interface ReplicationSourceInterface {
|
|||
* @param conf the configuration to use
|
||||
* @param fs the file system to use
|
||||
* @param manager the manager to use
|
||||
* @param replicationQueues
|
||||
* @param replicationPeers
|
||||
* @param server the server for this region server
|
||||
* @param peerClusterZnode
|
||||
* @param clusterId
|
||||
* @throws IOException
|
||||
*/
|
||||
void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues replicationQueues, ReplicationPeers replicationPeers, Server server,
|
||||
ReplicationQueueStorage queueStorage, ReplicationPeers replicationPeers, Server server,
|
||||
String peerClusterZnode, UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException;
|
||||
|
||||
|
|
|
@ -33,17 +33,20 @@ import java.util.TreeSet;
|
|||
import java.util.UUID;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.RejectedExecutionException;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.stream.Collectors;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableDescriptors;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
|
@ -56,7 +59,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationTracker;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
|
||||
|
@ -91,7 +94,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
private final List<ReplicationSourceInterface> sources;
|
||||
// List of all the sources we got from died RSs
|
||||
private final List<ReplicationSourceInterface> oldsources;
|
||||
private final ReplicationQueues replicationQueues;
|
||||
private final ReplicationQueueStorage queueStorage;
|
||||
private final ReplicationTracker replicationTracker;
|
||||
private final ReplicationPeers replicationPeers;
|
||||
// UUID for this cluster
|
||||
|
@ -124,7 +127,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
|
||||
/**
|
||||
* Creates a replication manager and sets the watch on all the other registered region servers
|
||||
* @param replicationQueues the interface for manipulating replication queues
|
||||
* @param queueStorage the interface for manipulating replication queues
|
||||
* @param replicationPeers
|
||||
* @param replicationTracker
|
||||
* @param conf the configuration to use
|
||||
|
@ -134,14 +137,14 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* @param oldLogDir the directory where old logs are archived
|
||||
* @param clusterId
|
||||
*/
|
||||
public ReplicationSourceManager(ReplicationQueues replicationQueues,
|
||||
public ReplicationSourceManager(ReplicationQueueStorage queueStorage,
|
||||
ReplicationPeers replicationPeers, ReplicationTracker replicationTracker, Configuration conf,
|
||||
Server server, FileSystem fs, Path logDir, Path oldLogDir, UUID clusterId,
|
||||
WALFileLengthProvider walFileLengthProvider) throws IOException {
|
||||
//CopyOnWriteArrayList is thread-safe.
|
||||
//Generally, reading is more than modifying.
|
||||
this.sources = new CopyOnWriteArrayList<>();
|
||||
this.replicationQueues = replicationQueues;
|
||||
this.queueStorage = queueStorage;
|
||||
this.replicationPeers = replicationPeers;
|
||||
this.replicationTracker = replicationTracker;
|
||||
this.server = server;
|
||||
|
@ -174,6 +177,19 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
|
||||
}
|
||||
|
||||
@FunctionalInterface
|
||||
private interface ReplicationQueueOperation {
|
||||
void exec() throws ReplicationException;
|
||||
}
|
||||
|
||||
private void abortWhenFail(ReplicationQueueOperation op) {
|
||||
try {
|
||||
op.exec();
|
||||
} catch (ReplicationException e) {
|
||||
server.abort("Failed to operate on replication queue", e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Provide the id of the peer and a log key and this method will figure which
|
||||
* wal it belongs to and will log, for this region server, the current
|
||||
|
@ -185,10 +201,11 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* @param queueRecovered indicates if this queue comes from another region server
|
||||
* @param holdLogInZK if true then the log is retained in ZK
|
||||
*/
|
||||
public void logPositionAndCleanOldLogs(Path log, String id, long position,
|
||||
boolean queueRecovered, boolean holdLogInZK) {
|
||||
public void logPositionAndCleanOldLogs(Path log, String id, long position, boolean queueRecovered,
|
||||
boolean holdLogInZK) {
|
||||
String fileName = log.getName();
|
||||
this.replicationQueues.setLogPosition(id, fileName, position);
|
||||
abortWhenFail(
|
||||
() -> this.queueStorage.setWALPosition(server.getServerName(), id, fileName, position));
|
||||
if (holdLogInZK) {
|
||||
return;
|
||||
}
|
||||
|
@ -221,32 +238,55 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
|
||||
private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
|
||||
SortedSet<String> walSet = wals.headSet(key);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
|
||||
}
|
||||
for (String wal : walSet) {
|
||||
this.replicationQueues.removeLog(id, wal);
|
||||
abortWhenFail(() -> this.queueStorage.removeWAL(server.getServerName(), id, wal));
|
||||
}
|
||||
walSet.clear();
|
||||
}
|
||||
|
||||
private void adoptAbandonedQueues() {
|
||||
List<ServerName> currentReplicators = null;
|
||||
try {
|
||||
currentReplicators = queueStorage.getListOfReplicators();
|
||||
} catch (ReplicationException e) {
|
||||
server.abort("Failed to get all replicators", e);
|
||||
return;
|
||||
}
|
||||
if (currentReplicators == null || currentReplicators.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<ServerName> otherRegionServers = replicationTracker.getListOfRegionServers().stream()
|
||||
.map(ServerName::valueOf).collect(Collectors.toList());
|
||||
LOG.info(
|
||||
"Current list of replicators: " + currentReplicators + " other RSs: " + otherRegionServers);
|
||||
|
||||
// Look if there's anything to process after a restart
|
||||
for (ServerName rs : currentReplicators) {
|
||||
if (!otherRegionServers.contains(rs)) {
|
||||
transferQueues(rs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a normal source per registered peer cluster and tries to process all
|
||||
* old region server wal queues
|
||||
* Adds a normal source per registered peer cluster and tries to process all old region server wal
|
||||
* queues
|
||||
* <p>
|
||||
* The returned future is for adoptAbandonedQueues task.
|
||||
*/
|
||||
void init() throws IOException, ReplicationException {
|
||||
Future<?> init() throws IOException, ReplicationException {
|
||||
for (String id : this.replicationPeers.getConnectedPeerIds()) {
|
||||
addSource(id);
|
||||
if (replicationForBulkLoadDataEnabled) {
|
||||
// Check if peer exists in hfile-refs queue, if not add it. This can happen in the case
|
||||
// when a peer was added before replication for bulk loaded data was enabled.
|
||||
this.replicationQueues.addPeerToHFileRefs(id);
|
||||
this.queueStorage.addPeerToHFileRefs(id);
|
||||
}
|
||||
}
|
||||
AdoptAbandonedQueuesWorker adoptionWorker = new AdoptAbandonedQueuesWorker();
|
||||
try {
|
||||
this.executor.execute(adoptionWorker);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
LOG.info("Cancelling the adoption of abandoned queues because of " + ex.getMessage());
|
||||
}
|
||||
return this.executor.submit(this::adoptAbandonedQueues);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -254,15 +294,12 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* need to enqueue the latest log of each wal group and do replication
|
||||
* @param id the id of the peer cluster
|
||||
* @return the source that was created
|
||||
* @throws IOException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
ReplicationSourceInterface addSource(String id) throws IOException, ReplicationException {
|
||||
ReplicationPeerConfig peerConfig = replicationPeers.getReplicationPeerConfig(id);
|
||||
ReplicationPeer peer = replicationPeers.getConnectedPeer(id);
|
||||
ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this,
|
||||
this.replicationQueues, this.replicationPeers, server, id, this.clusterId, peerConfig, peer,
|
||||
walFileLengthProvider);
|
||||
ReplicationSourceInterface src = getReplicationSource(id, peerConfig, peer);
|
||||
synchronized (this.walsById) {
|
||||
this.sources.add(src);
|
||||
Map<String, SortedSet<String>> walsByGroup = new HashMap<>();
|
||||
|
@ -277,11 +314,10 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
logs.add(name);
|
||||
walsByGroup.put(walPrefix, logs);
|
||||
try {
|
||||
this.replicationQueues.addLog(id, name);
|
||||
this.queueStorage.addWAL(server.getServerName(), id, name);
|
||||
} catch (ReplicationException e) {
|
||||
String message =
|
||||
"Cannot add log to queue when creating a new source, queueId=" + id
|
||||
+ ", filename=" + name;
|
||||
String message = "Cannot add log to queue when creating a new source, queueId=" + id +
|
||||
", filename=" + name;
|
||||
server.stop(message);
|
||||
throw e;
|
||||
}
|
||||
|
@ -306,7 +342,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
* @param peerId Id of the peer cluster queue of wals to delete
|
||||
*/
|
||||
public void deleteSource(String peerId, boolean closeConnection) {
|
||||
this.replicationQueues.removeQueue(peerId);
|
||||
abortWhenFail(() -> this.queueStorage.removeQueue(server.getServerName(), peerId));
|
||||
if (closeConnection) {
|
||||
this.replicationPeers.peerDisconnected(peerId);
|
||||
}
|
||||
|
@ -366,8 +402,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
List<String> getAllQueues() {
|
||||
return replicationQueues.getAllQueues();
|
||||
List<String> getAllQueues() throws ReplicationException {
|
||||
return queueStorage.getAllQueues(server.getServerName());
|
||||
}
|
||||
|
||||
// public because of we call it in TestReplicationEmptyWALRecovery
|
||||
|
@ -403,10 +439,10 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
synchronized (replicationPeers) {
|
||||
for (String id : replicationPeers.getConnectedPeerIds()) {
|
||||
try {
|
||||
this.replicationQueues.addLog(id, logName);
|
||||
this.queueStorage.addWAL(server.getServerName(), id, logName);
|
||||
} catch (ReplicationException e) {
|
||||
throw new IOException("Cannot add log to replication queue"
|
||||
+ " when creating a new source, queueId=" + id + ", filename=" + logName, e);
|
||||
throw new IOException("Cannot add log to replication queue" +
|
||||
" when creating a new source, queueId=" + id + ", filename=" + logName, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -455,19 +491,11 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
|
||||
/**
|
||||
* Factory method to create a replication source
|
||||
* @param conf the configuration to use
|
||||
* @param fs the file system to use
|
||||
* @param manager the manager to use
|
||||
* @param server the server object for this region server
|
||||
* @param peerId the id of the peer cluster
|
||||
* @return the created source
|
||||
* @throws IOException
|
||||
*/
|
||||
private ReplicationSourceInterface getReplicationSource(Configuration conf, FileSystem fs,
|
||||
ReplicationSourceManager manager, ReplicationQueues replicationQueues,
|
||||
ReplicationPeers replicationPeers, Server server, String peerId, UUID clusterId,
|
||||
ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer,
|
||||
WALFileLengthProvider walFileLengthProvider) throws IOException {
|
||||
private ReplicationSourceInterface getReplicationSource(String peerId,
|
||||
ReplicationPeerConfig peerConfig, ReplicationPeer replicationPeer) throws IOException {
|
||||
RegionServerCoprocessorHost rsServerHost = null;
|
||||
TableDescriptors tableDescriptors = null;
|
||||
if (server instanceof HRegionServer) {
|
||||
|
@ -484,9 +512,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
// Default to HBase inter-cluster replication endpoint
|
||||
replicationEndpointImpl = HBaseInterClusterReplicationEndpoint.class.getName();
|
||||
}
|
||||
@SuppressWarnings("rawtypes")
|
||||
Class c = Class.forName(replicationEndpointImpl);
|
||||
replicationEndpoint = (ReplicationEndpoint) c.newInstance();
|
||||
replicationEndpoint = Class.forName(replicationEndpointImpl)
|
||||
.asSubclass(ReplicationEndpoint.class).newInstance();
|
||||
if(rsServerHost != null) {
|
||||
ReplicationEndpoint newReplicationEndPoint = rsServerHost
|
||||
.postCreateReplicationEndPoint(replicationEndpoint);
|
||||
|
@ -503,7 +530,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
|
||||
MetricsSource metrics = new MetricsSource(peerId);
|
||||
// init replication source
|
||||
src.init(conf, fs, manager, replicationQueues, replicationPeers, server, peerId, clusterId,
|
||||
src.init(conf, fs, this, queueStorage, replicationPeers, server, peerId, clusterId,
|
||||
replicationEndpoint, walFileLengthProvider, metrics);
|
||||
|
||||
// init replication endpoint
|
||||
|
@ -514,21 +541,21 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
}
|
||||
|
||||
/**
|
||||
* Transfer all the queues of the specified to this region server.
|
||||
* First it tries to grab a lock and if it works it will move the
|
||||
* znodes and finally will delete the old znodes.
|
||||
*
|
||||
* Transfer all the queues of the specified to this region server. First it tries to grab a lock
|
||||
* and if it works it will move the znodes and finally will delete the old znodes.
|
||||
* <p>
|
||||
* It creates one old source for any type of source of the old rs.
|
||||
* @param rsZnode
|
||||
*/
|
||||
private void transferQueues(String rsZnode) {
|
||||
NodeFailoverWorker transfer =
|
||||
new NodeFailoverWorker(rsZnode, this.replicationQueues, this.replicationPeers,
|
||||
this.clusterId);
|
||||
private void transferQueues(ServerName deadRS) {
|
||||
if (server.getServerName().equals(deadRS)) {
|
||||
// it's just us, give up
|
||||
return;
|
||||
}
|
||||
NodeFailoverWorker transfer = new NodeFailoverWorker(deadRS);
|
||||
try {
|
||||
this.executor.execute(transfer);
|
||||
} catch (RejectedExecutionException ex) {
|
||||
LOG.info("Cancelling the transfer of " + rsZnode + " because of " + ex.getMessage());
|
||||
LOG.info("Cancelling the transfer of " + deadRS + " because of " + ex.getMessage());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -565,7 +592,7 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
LOG.info("Peer " + id + " connected success, trying to start the replication source thread.");
|
||||
addSource(id);
|
||||
if (replicationForBulkLoadDataEnabled) {
|
||||
this.replicationQueues.addPeerToHFileRefs(id);
|
||||
this.queueStorage.addPeerToHFileRefs(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -618,12 +645,12 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
deleteSource(id, true);
|
||||
}
|
||||
// Remove HFile Refs znode from zookeeper
|
||||
this.replicationQueues.removePeerFromHFileRefs(id);
|
||||
abortWhenFail(() -> this.queueStorage.removePeerFromHFileRefs(id));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void regionServerRemoved(String regionserver) {
|
||||
transferQueues(regionserver);
|
||||
transferQueues(ServerName.valueOf(regionserver));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -632,32 +659,16 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
*/
|
||||
class NodeFailoverWorker extends Thread {
|
||||
|
||||
private String rsZnode;
|
||||
private final ReplicationQueues rq;
|
||||
private final ReplicationPeers rp;
|
||||
private final UUID clusterId;
|
||||
private final ServerName deadRS;
|
||||
|
||||
/**
|
||||
* @param rsZnode
|
||||
*/
|
||||
public NodeFailoverWorker(String rsZnode) {
|
||||
this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
|
||||
}
|
||||
|
||||
public NodeFailoverWorker(String rsZnode, final ReplicationQueues replicationQueues,
|
||||
final ReplicationPeers replicationPeers, final UUID clusterId) {
|
||||
super("Failover-for-"+rsZnode);
|
||||
this.rsZnode = rsZnode;
|
||||
this.rq = replicationQueues;
|
||||
this.rp = replicationPeers;
|
||||
this.clusterId = clusterId;
|
||||
@VisibleForTesting
|
||||
public NodeFailoverWorker(ServerName deadRS) {
|
||||
super("Failover-for-" + deadRS);
|
||||
this.deadRS = deadRS;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
if (this.rq.isThisOurRegionServer(rsZnode)) {
|
||||
return;
|
||||
}
|
||||
// Wait a bit before transferring the queues, we may be shutting down.
|
||||
// This sleep may not be enough in some cases.
|
||||
try {
|
||||
|
@ -673,12 +684,13 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
return;
|
||||
}
|
||||
Map<String, Set<String>> newQueues = new HashMap<>();
|
||||
List<String> peers = rq.getUnClaimedQueueIds(rsZnode);
|
||||
while (peers != null && !peers.isEmpty()) {
|
||||
Pair<String, SortedSet<String>> peer = this.rq.claimQueue(rsZnode,
|
||||
peers.get(ThreadLocalRandom.current().nextInt(peers.size())));
|
||||
long sleep = sleepBeforeFailover/2;
|
||||
if (peer != null) {
|
||||
try {
|
||||
List<String> peers = queueStorage.getAllQueues(deadRS);
|
||||
while (!peers.isEmpty()) {
|
||||
Pair<String, SortedSet<String>> peer = queueStorage.claimQueue(deadRS,
|
||||
peers.get(ThreadLocalRandom.current().nextInt(peers.size())), server.getServerName());
|
||||
long sleep = sleepBeforeFailover / 2;
|
||||
if (!peer.getSecond().isEmpty()) {
|
||||
newQueues.put(peer.getFirst(), peer.getSecond());
|
||||
sleep = sleepBeforeFailover;
|
||||
}
|
||||
|
@ -688,10 +700,14 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
LOG.warn("Interrupted while waiting before transferring a queue.");
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
peers = rq.getUnClaimedQueueIds(rsZnode);
|
||||
peers = queueStorage.getAllQueues(deadRS);
|
||||
}
|
||||
if (peers != null) {
|
||||
rq.removeReplicatorIfQueueIsEmpty(rsZnode);
|
||||
if (!peers.isEmpty()) {
|
||||
queueStorage.removeReplicatorIfQueueIsEmpty(deadRS);
|
||||
}
|
||||
} catch (ReplicationException e) {
|
||||
server.abort("Failed to claim queue from dead regionserver", e);
|
||||
return;
|
||||
}
|
||||
// Copying over the failed queue is completed.
|
||||
if (newQueues.isEmpty()) {
|
||||
|
@ -716,8 +732,8 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
+ ex);
|
||||
}
|
||||
if (peer == null || peerConfig == null) {
|
||||
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node" + rsZnode);
|
||||
replicationQueues.removeQueue(peerId);
|
||||
LOG.warn("Skipping failover for peer:" + actualPeerId + " of node " + deadRS);
|
||||
abortWhenFail(() -> queueStorage.removeQueue(server.getServerName(), peerId));
|
||||
continue;
|
||||
}
|
||||
if (server instanceof ReplicationSyncUp.DummyServer
|
||||
|
@ -741,13 +757,11 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
}
|
||||
|
||||
// enqueue sources
|
||||
ReplicationSourceInterface src =
|
||||
getReplicationSource(conf, fs, ReplicationSourceManager.this, this.rq, this.rp,
|
||||
server, peerId, this.clusterId, peerConfig, peer, walFileLengthProvider);
|
||||
ReplicationSourceInterface src = getReplicationSource(peerId, peerConfig, peer);
|
||||
// synchronized on oldsources to avoid adding recovered source for the to-be-removed peer
|
||||
// see removePeer
|
||||
synchronized (oldsources) {
|
||||
if (!this.rp.getConnectedPeerIds().contains(src.getPeerId())) {
|
||||
if (!replicationPeers.getConnectedPeerIds().contains(src.getPeerId())) {
|
||||
src.terminate("Recovered queue doesn't belong to any current peer");
|
||||
closeRecoveredQueue(src);
|
||||
continue;
|
||||
|
@ -766,35 +780,6 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
}
|
||||
}
|
||||
|
||||
class AdoptAbandonedQueuesWorker extends Thread{
|
||||
|
||||
public AdoptAbandonedQueuesWorker() {}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
List<String> currentReplicators = null;
|
||||
try {
|
||||
currentReplicators = replicationQueues.getListOfReplicators();
|
||||
} catch (ReplicationException e) {
|
||||
server.abort("Failed to get all replicators", e);
|
||||
return;
|
||||
}
|
||||
if (currentReplicators == null || currentReplicators.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
List<String> otherRegionServers = replicationTracker.getListOfRegionServers();
|
||||
LOG.info("Current list of replicators: " + currentReplicators + " other RSs: "
|
||||
+ otherRegionServers);
|
||||
|
||||
// Look if there's anything to process after a restart
|
||||
for (String rs : currentReplicators) {
|
||||
if (!otherRegionServers.contains(rs)) {
|
||||
transferQueues(rs);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the directory where wals are archived
|
||||
* @return the directory where wals are archived
|
||||
|
@ -849,6 +834,10 @@ public class ReplicationSourceManager implements ReplicationListener {
|
|||
}
|
||||
|
||||
public void cleanUpHFileRefs(String peerId, List<String> files) {
|
||||
this.replicationQueues.removeHFileRefs(peerId, files);
|
||||
abortWhenFail(() -> this.queueStorage.removeHFileRefs(peerId, files));
|
||||
}
|
||||
|
||||
int activeFailoverTaskCount() {
|
||||
return executor.getActiveCount();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -83,11 +83,12 @@ public class ReplicationSyncUp extends Configured implements Tool {
|
|||
Replication replication = new Replication();
|
||||
replication.initialize(new DummyServer(zkw), fs, logDir, oldLogDir, null);
|
||||
ReplicationSourceManager manager = replication.getReplicationManager();
|
||||
manager.init();
|
||||
int numberOfOldSource = 1; // default wait once
|
||||
while (numberOfOldSource > 0) {
|
||||
manager.init().get();
|
||||
while (manager.activeFailoverTaskCount() > 0) {
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
}
|
||||
while (manager.getOldSources().size() > 0) {
|
||||
Thread.sleep(SLEEP_TIME);
|
||||
numberOfOldSource = manager.getOldSources().size();
|
||||
}
|
||||
manager.join();
|
||||
} catch (InterruptedException e) {
|
||||
|
|
|
@ -46,9 +46,8 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException;
|
|||
import org.apache.hadoop.hbase.client.ClusterConnection;
|
||||
import org.apache.hadoop.hbase.client.Connection;
|
||||
import org.apache.hadoop.hbase.master.HMaster;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
|
@ -116,9 +115,8 @@ public class TestLogsCleaner {
|
|||
|
||||
HMaster.decorateMasterConfiguration(conf);
|
||||
Server server = new DummyServer();
|
||||
ReplicationQueues repQueues = ReplicationFactory.getReplicationQueues(
|
||||
new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
|
||||
repQueues.init(server.getServerName().toString());
|
||||
ReplicationQueueStorage queueStorage =
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
|
||||
final Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
|
||||
final Path oldProcedureWALDir = new Path(oldLogDir, "masterProcedureWALs");
|
||||
String fakeMachineName = URLEncoder.encode(server.getServerName().toString(), "UTF8");
|
||||
|
@ -149,7 +147,7 @@ public class TestLogsCleaner {
|
|||
// Case 4: put 3 WALs in ZK indicating that they are scheduled for replication so these
|
||||
// files would pass TimeToLiveLogCleaner but would be rejected by ReplicationLogCleaner
|
||||
if (i % (30 / 3) == 1) {
|
||||
repQueues.addLog(fakeMachineName, fileName.getName());
|
||||
queueStorage.addWAL(server.getServerName(), fakeMachineName, fileName.getName());
|
||||
LOG.info("Replication log file: " + fileName);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,8 +48,8 @@ import org.apache.hadoop.hbase.replication.ReplicationException;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
|
@ -81,16 +81,13 @@ public class TestReplicationHFileCleaner {
|
|||
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationHFileCleaner.class);
|
||||
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
|
||||
private static Server server;
|
||||
private static ReplicationQueues rq;
|
||||
private static ReplicationQueueStorage rq;
|
||||
private static ReplicationPeers rp;
|
||||
private static final String peerId = "TestReplicationHFileCleaner";
|
||||
private static Configuration conf = TEST_UTIL.getConfiguration();
|
||||
static FileSystem fs = null;
|
||||
Path root;
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TEST_UTIL.startMiniZKCluster();
|
||||
|
@ -99,20 +96,10 @@ public class TestReplicationHFileCleaner {
|
|||
HMaster.decorateMasterConfiguration(conf);
|
||||
rp = ReplicationFactory.getReplicationPeers(server.getZooKeeper(), conf, server);
|
||||
rp.init();
|
||||
rq = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server, server.getZooKeeper()));
|
||||
rq.init(server.getServerName().toString());
|
||||
try {
|
||||
rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
|
||||
fs = FileSystem.get(conf);
|
||||
} finally {
|
||||
if (fs != null) {
|
||||
fs.close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @throws java.lang.Exception
|
||||
*/
|
||||
@AfterClass
|
||||
public static void tearDownAfterClass() throws Exception {
|
||||
TEST_UTIL.shutdownMiniZKCluster();
|
||||
|
|
|
@ -26,10 +26,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.MasterTests;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
|
||||
|
@ -55,14 +53,12 @@ public class TestReplicationZKNodeCleaner {
|
|||
|
||||
private final Configuration conf;
|
||||
private final ZKWatcher zkw;
|
||||
private final ReplicationQueues repQueues;
|
||||
private final ReplicationQueueStorage repQueues;
|
||||
|
||||
public TestReplicationZKNodeCleaner() throws Exception {
|
||||
conf = TEST_UTIL.getConfiguration();
|
||||
zkw = new ZKWatcher(conf, "TestReplicationZKNodeCleaner", null);
|
||||
repQueues = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, null,
|
||||
zkw));
|
||||
assertTrue(repQueues instanceof ReplicationQueuesZKImpl);
|
||||
repQueues = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
|
||||
}
|
||||
|
||||
@BeforeClass
|
||||
|
@ -78,9 +74,8 @@ public class TestReplicationZKNodeCleaner {
|
|||
|
||||
@Test
|
||||
public void testReplicationZKNodeCleaner() throws Exception {
|
||||
repQueues.init(SERVER_ONE.getServerName());
|
||||
// add queue for ID_ONE which isn't exist
|
||||
repQueues.addLog(ID_ONE, "file1");
|
||||
repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
|
||||
|
||||
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
|
||||
Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
|
||||
|
@ -90,7 +85,7 @@ public class TestReplicationZKNodeCleaner {
|
|||
assertTrue(undeletedQueues.get(SERVER_ONE).contains(ID_ONE));
|
||||
|
||||
// add a recovery queue for ID_TWO which isn't exist
|
||||
repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
|
||||
repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
|
||||
|
||||
undeletedQueues = cleaner.getUnDeletedQueues();
|
||||
assertEquals(1, undeletedQueues.size());
|
||||
|
@ -106,11 +101,10 @@ public class TestReplicationZKNodeCleaner {
|
|||
|
||||
@Test
|
||||
public void testReplicationZKNodeCleanerChore() throws Exception {
|
||||
repQueues.init(SERVER_ONE.getServerName());
|
||||
// add queue for ID_ONE which isn't exist
|
||||
repQueues.addLog(ID_ONE, "file1");
|
||||
repQueues.addWAL(SERVER_ONE, ID_ONE, "file1");
|
||||
// add a recovery queue for ID_TWO which isn't exist
|
||||
repQueues.addLog(ID_TWO + "-" + SERVER_TWO, "file2");
|
||||
repQueues.addWAL(SERVER_ONE, ID_TWO + "-" + SERVER_TWO, "file2");
|
||||
|
||||
// Wait the cleaner chore to run
|
||||
Thread.sleep(20000);
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -46,9 +45,10 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
|
|||
MetricsSource metrics;
|
||||
WALFileLengthProvider walFileLengthProvider;
|
||||
AtomicBoolean startup = new AtomicBoolean(false);
|
||||
|
||||
@Override
|
||||
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
|
||||
ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId,
|
||||
UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
|
||||
this.manager = manager;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -70,7 +70,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
|
|||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
|
||||
HColumnDescriptor fam;
|
||||
|
||||
t1_syncupSource = new HTableDescriptor(t1_su);
|
||||
|
@ -182,7 +181,6 @@ public class TestReplicationSyncUpTool extends TestReplicationBase {
|
|||
* verify correctly replicated to Slave
|
||||
*/
|
||||
mimicSyncUpAfterPut();
|
||||
|
||||
}
|
||||
|
||||
protected void setupReplication() throws Exception {
|
||||
|
|
|
@ -68,10 +68,10 @@ import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
|
|||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationPeers;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
|
@ -338,18 +338,14 @@ public abstract class TestReplicationSourceManager {
|
|||
|
||||
@Test
|
||||
public void testClaimQueues() throws Exception {
|
||||
final Server server = new DummyServer("hostname0.example.org");
|
||||
|
||||
|
||||
ReplicationQueues rq =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
|
||||
server.getZooKeeper()));
|
||||
rq.init(server.getServerName().toString());
|
||||
Server server = new DummyServer("hostname0.example.org");
|
||||
ReplicationQueueStorage rq = ReplicationStorageFactory
|
||||
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
|
||||
// populate some znodes in the peer znode
|
||||
files.add("log1");
|
||||
files.add("log2");
|
||||
for (String file : files) {
|
||||
rq.addLog("1", file);
|
||||
rq.addWAL(server.getServerName(), "1", file);
|
||||
}
|
||||
// create 3 DummyServers
|
||||
Server s1 = new DummyServer("dummyserver1.example.org");
|
||||
|
@ -357,12 +353,9 @@ public abstract class TestReplicationSourceManager {
|
|||
Server s3 = new DummyServer("dummyserver3.example.org");
|
||||
|
||||
// create 3 DummyNodeFailoverWorkers
|
||||
DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
|
||||
server.getServerName().getServerName(), s1);
|
||||
DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(
|
||||
server.getServerName().getServerName(), s2);
|
||||
DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(
|
||||
server.getServerName().getServerName(), s3);
|
||||
DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(server.getServerName(), s1);
|
||||
DummyNodeFailoverWorker w2 = new DummyNodeFailoverWorker(server.getServerName(), s2);
|
||||
DummyNodeFailoverWorker w3 = new DummyNodeFailoverWorker(server.getServerName(), s3);
|
||||
|
||||
latch = new CountDownLatch(3);
|
||||
// start the threads
|
||||
|
@ -381,11 +374,9 @@ public abstract class TestReplicationSourceManager {
|
|||
|
||||
@Test
|
||||
public void testCleanupFailoverQueues() throws Exception {
|
||||
final Server server = new DummyServer("hostname1.example.org");
|
||||
ReplicationQueues rq =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
|
||||
server.getZooKeeper()));
|
||||
rq.init(server.getServerName().toString());
|
||||
Server server = new DummyServer("hostname1.example.org");
|
||||
ReplicationQueueStorage rq = ReplicationStorageFactory
|
||||
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
|
||||
// populate some znodes in the peer znode
|
||||
SortedSet<String> files = new TreeSet<>();
|
||||
String group = "testgroup";
|
||||
|
@ -394,19 +385,14 @@ public abstract class TestReplicationSourceManager {
|
|||
files.add(file1);
|
||||
files.add(file2);
|
||||
for (String file : files) {
|
||||
rq.addLog("1", file);
|
||||
rq.addWAL(server.getServerName(), "1", file);
|
||||
}
|
||||
Server s1 = new DummyServer("dummyserver1.example.org");
|
||||
ReplicationQueues rq1 =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
|
||||
s1.getZooKeeper()));
|
||||
rq1.init(s1.getServerName().toString());
|
||||
ReplicationPeers rp1 =
|
||||
ReplicationFactory.getReplicationPeers(s1.getZooKeeper(), s1.getConfiguration(), s1);
|
||||
rp1.init();
|
||||
NodeFailoverWorker w1 =
|
||||
manager.new NodeFailoverWorker(server.getServerName().getServerName(), rq1, rp1, new UUID(
|
||||
new Long(1), new Long(2)));
|
||||
manager.new NodeFailoverWorker(server.getServerName());
|
||||
w1.run();
|
||||
assertEquals(1, manager.getWalsByIdRecoveredQueues().size());
|
||||
String id = "1-" + server.getServerName().getServerName();
|
||||
|
@ -418,17 +404,16 @@ public abstract class TestReplicationSourceManager {
|
|||
|
||||
@Test
|
||||
public void testCleanupUnknownPeerZNode() throws Exception {
|
||||
final Server server = new DummyServer("hostname2.example.org");
|
||||
ReplicationQueues rq = ReplicationFactory.getReplicationQueues(
|
||||
new ReplicationQueuesArguments(server.getConfiguration(), server, server.getZooKeeper()));
|
||||
rq.init(server.getServerName().toString());
|
||||
Server server = new DummyServer("hostname2.example.org");
|
||||
ReplicationQueueStorage rq = ReplicationStorageFactory
|
||||
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
|
||||
// populate some znodes in the peer znode
|
||||
// add log to an unknown peer
|
||||
String group = "testgroup";
|
||||
rq.addLog("2", group + ".log1");
|
||||
rq.addLog("2", group + ".log2");
|
||||
rq.addWAL(server.getServerName(), "2", group + ".log1");
|
||||
rq.addWAL(server.getServerName(), "2", group + ".log2");
|
||||
|
||||
NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName().getServerName());
|
||||
NodeFailoverWorker w1 = manager.new NodeFailoverWorker(server.getServerName());
|
||||
w1.run();
|
||||
|
||||
// The log of the unknown peer should be removed from zk
|
||||
|
@ -506,10 +491,8 @@ public abstract class TestReplicationSourceManager {
|
|||
.setClusterKey("localhost:" + utility.getZkCluster().getClientPort() + ":/hbase");
|
||||
try {
|
||||
DummyServer server = new DummyServer();
|
||||
final ReplicationQueues rq =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(
|
||||
server.getConfiguration(), server, server.getZooKeeper()));
|
||||
rq.init(server.getServerName().toString());
|
||||
ReplicationQueueStorage rq = ReplicationStorageFactory
|
||||
.getReplicationQueueStorage(server.getZooKeeper(), server.getConfiguration());
|
||||
// Purposely fail ReplicationSourceManager.addSource() by causing ReplicationSourceInterface
|
||||
// initialization to throw an exception.
|
||||
conf.set("replication.replicationsource.implementation",
|
||||
|
@ -523,11 +506,11 @@ public abstract class TestReplicationSourceManager {
|
|||
assertNull(manager.getSource(peerId));
|
||||
|
||||
// Create a replication queue for the fake peer
|
||||
rq.addLog(peerId, "FakeFile");
|
||||
rq.addWAL(server.getServerName(), peerId, "FakeFile");
|
||||
// Unregister peer, this should remove the peer and clear all queues associated with it
|
||||
// Need to wait for the ReplicationTracker to pick up the changes and notify listeners.
|
||||
removePeerAndWait(peerId);
|
||||
assertFalse(rq.getAllQueues().contains(peerId));
|
||||
assertFalse(rq.getAllQueues(server.getServerName()).contains(peerId));
|
||||
} finally {
|
||||
conf.set("replication.replicationsource.implementation", replicationSourceImplName);
|
||||
removePeerAndWait(peerId);
|
||||
|
@ -650,11 +633,12 @@ public abstract class TestReplicationSourceManager {
|
|||
}
|
||||
}
|
||||
Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
|
||||
@Override public boolean evaluate() throws Exception {
|
||||
@Override
|
||||
public boolean evaluate() throws Exception {
|
||||
List<String> peers = rp.getAllPeerIds();
|
||||
return (!manager.getAllQueues().contains(peerId)) && (rp.getConnectedPeer(peerId) == null)
|
||||
&& (!peers.contains(peerId))
|
||||
&& manager.getSource(peerId) == null;
|
||||
return (!manager.getAllQueues().contains(peerId)) &&
|
||||
(rp.getConnectedPeer(peerId) == null) && (!peers.contains(peerId)) &&
|
||||
manager.getSource(peerId) == null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
@ -697,25 +681,24 @@ public abstract class TestReplicationSourceManager {
|
|||
static class DummyNodeFailoverWorker extends Thread {
|
||||
private Map<String, Set<String>> logZnodesMap;
|
||||
Server server;
|
||||
private String deadRsZnode;
|
||||
ReplicationQueues rq;
|
||||
private ServerName deadRS;
|
||||
ReplicationQueueStorage rq;
|
||||
|
||||
public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
|
||||
this.deadRsZnode = znode;
|
||||
public DummyNodeFailoverWorker(ServerName deadRS, Server s) throws Exception {
|
||||
this.deadRS = deadRS;
|
||||
this.server = s;
|
||||
this.rq =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(server.getConfiguration(), server,
|
||||
server.getZooKeeper()));
|
||||
this.rq.init(this.server.getServerName().toString());
|
||||
this.rq = ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(),
|
||||
server.getConfiguration());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
logZnodesMap = new HashMap<>();
|
||||
List<String> queues = rq.getUnClaimedQueueIds(deadRsZnode);
|
||||
for(String queue:queues){
|
||||
Pair<String, SortedSet<String>> pair = rq.claimQueue(deadRsZnode, queue);
|
||||
List<String> queues = rq.getAllQueues(deadRS);
|
||||
for (String queue : queues) {
|
||||
Pair<String, SortedSet<String>> pair =
|
||||
rq.claimQueue(deadRS, queue, server.getServerName());
|
||||
if (pair != null) {
|
||||
logZnodesMap.put(pair.getFirst(), pair.getSecond());
|
||||
}
|
||||
|
@ -754,7 +737,7 @@ public abstract class TestReplicationSourceManager {
|
|||
|
||||
@Override
|
||||
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
|
||||
ReplicationQueues rq, ReplicationPeers rp, Server server, String peerClusterId,
|
||||
ReplicationQueueStorage rq, ReplicationPeers rp, Server server, String peerClusterId,
|
||||
UUID clusterId, ReplicationEndpoint replicationEndpoint,
|
||||
WALFileLengthProvider walFileLengthProvider, MetricsSource metrics) throws IOException {
|
||||
throw new IOException("Failing deliberately");
|
||||
|
|
|
@ -25,11 +25,10 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
|
|||
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationFactory;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueues;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
|
||||
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
|
||||
import org.junit.BeforeClass;
|
||||
|
@ -42,7 +41,7 @@ import org.junit.experimental.categories.Category;
|
|||
* ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
|
||||
* TestReplicationSourceManager that test ReplicationQueueZkImpl-specific behaviors.
|
||||
*/
|
||||
@Category({ReplicationTests.class, MediumTests.class})
|
||||
@Category({ ReplicationTests.class, MediumTests.class })
|
||||
public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceManager {
|
||||
|
||||
@ClassRule
|
||||
|
@ -64,16 +63,14 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
|
|||
// Tests the naming convention of adopted queues for ReplicationQueuesZkImpl
|
||||
@Test
|
||||
public void testNodeFailoverDeadServerParsing() throws Exception {
|
||||
final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
|
||||
ReplicationQueues repQueues =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, server,
|
||||
server.getZooKeeper()));
|
||||
repQueues.init(server.getServerName().toString());
|
||||
Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
|
||||
ReplicationQueueStorage queueStorage =
|
||||
ReplicationStorageFactory.getReplicationQueueStorage(server.getZooKeeper(), conf);
|
||||
// populate some znodes in the peer znode
|
||||
files.add("log1");
|
||||
files.add("log2");
|
||||
for (String file : files) {
|
||||
repQueues.addLog("1", file);
|
||||
queueStorage.addWAL(server.getServerName(), "1", file);
|
||||
}
|
||||
|
||||
// create 3 DummyServers
|
||||
|
@ -82,30 +79,22 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
|
|||
Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
|
||||
|
||||
// simulate three servers fail sequentially
|
||||
ReplicationQueues rq1 =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
|
||||
s1.getZooKeeper()));
|
||||
rq1.init(s1.getServerName().toString());
|
||||
String serverName = server.getServerName().getServerName();
|
||||
List<String> unclaimed = rq1.getUnClaimedQueueIds(serverName);
|
||||
rq1.claimQueue(serverName, unclaimed.get(0)).getSecond();
|
||||
rq1.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
|
||||
ReplicationQueues rq2 =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s2.getConfiguration(), s2,
|
||||
s2.getZooKeeper()));
|
||||
rq2.init(s2.getServerName().toString());
|
||||
serverName = s1.getServerName().getServerName();
|
||||
unclaimed = rq2.getUnClaimedQueueIds(serverName);
|
||||
rq2.claimQueue(serverName, unclaimed.get(0)).getSecond();
|
||||
rq2.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
|
||||
ReplicationQueues rq3 =
|
||||
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s3.getConfiguration(), s3,
|
||||
s3.getZooKeeper()));
|
||||
rq3.init(s3.getServerName().toString());
|
||||
serverName = s2.getServerName().getServerName();
|
||||
unclaimed = rq3.getUnClaimedQueueIds(serverName);
|
||||
String queue3 = rq3.claimQueue(serverName, unclaimed.get(0)).getFirst();
|
||||
rq3.removeReplicatorIfQueueIsEmpty(unclaimed.get(0));
|
||||
ServerName serverName = server.getServerName();
|
||||
List<String> unclaimed = queueStorage.getAllQueues(serverName);
|
||||
queueStorage.claimQueue(serverName, unclaimed.get(0), s1.getServerName());
|
||||
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
|
||||
|
||||
serverName = s1.getServerName();
|
||||
unclaimed = queueStorage.getAllQueues(serverName);
|
||||
queueStorage.claimQueue(serverName, unclaimed.get(0), s2.getServerName());
|
||||
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
|
||||
|
||||
serverName = s2.getServerName();
|
||||
unclaimed = queueStorage.getAllQueues(serverName);
|
||||
String queue3 =
|
||||
queueStorage.claimQueue(serverName, unclaimed.get(0), s3.getServerName()).getFirst();
|
||||
queueStorage.removeReplicatorIfQueueIsEmpty(serverName);
|
||||
|
||||
ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(queue3);
|
||||
List<ServerName> result = replicationQueueInfo.getDeadRegionServers();
|
||||
// verify
|
||||
|
|
Loading…
Reference in New Issue