HBASE-19599 Remove ReplicationQueuesClient, use ReplicationQueueStorage directly

This commit is contained in:
zhangduo 2017-12-25 18:49:56 +08:00
parent 5e6c303528
commit c4fa568b47
23 changed files with 474 additions and 899 deletions

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -39,20 +38,14 @@ public final class ReplicationFactory {
args);
}
public static ReplicationQueuesClient
getReplicationQueuesClient(ReplicationQueuesClientArguments args) throws Exception {
return (ReplicationQueuesClient) ConstructorUtils
.invokeConstructor(ReplicationQueuesClientZKImpl.class, args);
}
public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf,
Abortable abortable) {
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
Abortable abortable) {
return getReplicationPeers(zk, conf, null, abortable);
}
public static ReplicationPeers getReplicationPeers(final ZKWatcher zk, Configuration conf,
final ReplicationQueuesClient queuesClient, Abortable abortable) {
return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);
public static ReplicationPeers getReplicationPeers(ZKWatcher zk, Configuration conf,
ReplicationQueueStorage queueStorage, Abortable abortable) {
return new ReplicationPeersZKImpl(zk, conf, queueStorage, abortable);
}
public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,

View File

@ -32,6 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
@ -80,17 +81,17 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
// Map of peer clusters keyed by their id
private Map<String, ReplicationPeerZKImpl> peerClusters;
private final ReplicationQueuesClient queuesClient;
private final ReplicationQueueStorage queueStorage;
private Abortable abortable;
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeersZKImpl.class);
public ReplicationPeersZKImpl(final ZKWatcher zk, final Configuration conf,
final ReplicationQueuesClient queuesClient, Abortable abortable) {
public ReplicationPeersZKImpl(ZKWatcher zk, Configuration conf,
ReplicationQueueStorage queueStorage, Abortable abortable) {
super(zk, conf, abortable);
this.abortable = abortable;
this.peerClusters = new ConcurrentHashMap<>();
this.queuesClient = queuesClient;
this.queueStorage = queueStorage;
}
@Override
@ -512,17 +513,16 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
private void checkQueuesDeleted(String peerId) throws ReplicationException {
if (queuesClient == null) {
if (queueStorage == null) {
return;
}
try {
List<String> replicators = queuesClient.getListOfReplicators();
List<ServerName> replicators = queueStorage.getListOfReplicators();
if (replicators == null || replicators.isEmpty()) {
return;
}
for (String replicator : replicators) {
List<String> queueIds = queuesClient.getAllQueues(replicator);
for (ServerName replicator : replicators) {
List<String> queueIds = queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (queueInfo.getPeerId().equals(peerId)) {
@ -533,7 +533,7 @@ public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements Re
}
// Check for hfile-refs queue
if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
&& queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
&& queueStorage.getAllPeersFromHFileRefsQueue().contains(peerId)) {
throw new IllegalArgumentException("Undeleted queue for peerId: " + peerId
+ ", found in hfile-refs node path " + hfileRefsZNode);
}

View File

@ -77,6 +77,14 @@ public interface ReplicationQueueStorage {
long getWALPosition(ServerName serverName, String queueId, String fileName)
throws ReplicationException;
/**
* Get a list of all WALs in the given queue on the given region server.
* @param serverName the server name of the region server that owns the queue
* @param queueId a String that identifies the queue
* @return a list of WALs
*/
List<String> getWALsInQueue(ServerName serverName, String queueId) throws ReplicationException;
/**
* Get a list of all queues for the specified region server.
* @param serverName the server name of the region server that owns the set of queues
@ -108,8 +116,8 @@ public interface ReplicationQueueStorage {
/**
* Load all wals in all replication queues. This method guarantees to return a snapshot which
* contains all WALs in the zookeeper at the start of this call even there is concurrent queue
* failover. However, some newly created WALs during the call may not be included.
* contains all WALs at the start of this call even there is concurrent queue failover. However,
* some newly created WALs during the call may not be included.
*/
Set<String> getAllWALs() throws ReplicationException;
@ -142,13 +150,6 @@ public interface ReplicationQueueStorage {
*/
void removeHFileRefs(String peerId, List<String> files) throws ReplicationException;
/**
* Get the change version number of replication hfile references node. This can be used as
* optimistic locking to get a consistent snapshot of the replication queues of hfile references.
* @return change version number of hfile references node
*/
int getHFileRefsNodeChangeVersion() throws ReplicationException;
/**
* Get list of all peers from hfile reference queue.
* @return a list of peer ids
@ -161,4 +162,11 @@ public interface ReplicationQueueStorage {
* @return a list of hfile references
*/
List<String> getReplicableHFiles(String peerId) throws ReplicationException;
/**
* Load all hfile references in all replication queues. This method guarantees to return a
* snapshot which contains all hfile references at the start of this call. However, some newly
* created hfile references during the call may not be included.
*/
Set<String> getAllHFileRefs() throws ReplicationException;
}

View File

@ -1,93 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Set;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for clients of replication to view replication queues. These queues
* keep track of the sources(WALs/HFile references) that still need to be replicated to remote
* clusters.
*/
@InterfaceAudience.Private
public interface ReplicationQueuesClient {
/**
* Initialize the replication queue client interface.
*/
public void init() throws ReplicationException;
/**
* Get a list of all region servers that have outstanding replication queues. These servers could
* be alive, dead or from a previous run of the cluster.
* @return a list of server names
* @throws KeeperException zookeeper exception
*/
List<String> getListOfReplicators() throws KeeperException;
/**
* Get a list of all WALs in the given queue on the given region server.
* @param serverName the server name of the region server that owns the queue
* @param queueId a String that identifies the queue
* @return a list of WALs, null if this region server is dead and has no outstanding queues
* @throws KeeperException zookeeper exception
*/
List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
/**
* Get a list of all queues for the specified region server.
* @param serverName the server name of the region server that owns the set of queues
* @return a list of queueIds, null if this region server is not a replicator.
*/
List<String> getAllQueues(String serverName) throws KeeperException;
/**
* Load all wals in all replication queues from ZK. This method guarantees to return a
* snapshot which contains all WALs in the zookeeper at the start of this call even there
* is concurrent queue failover. However, some newly created WALs during the call may
* not be included.
*/
Set<String> getAllWALs() throws KeeperException;
/**
* Get the change version number of replication hfile references node. This can be used as
* optimistic locking to get a consistent snapshot of the replication queues of hfile references.
* @return change version number of hfile references node
*/
int getHFileRefsNodeChangeVersion() throws KeeperException;
/**
* Get list of all peers from hfile reference queue.
* @return a list of peer ids
* @throws KeeperException zookeeper exception
*/
List<String> getAllPeersFromHFileRefsQueue() throws KeeperException;
/**
* Get a list of all hfile references in the given peer.
* @param peerId a String that identifies the peer
* @return a list of hfile references, null if not found any
* @throws KeeperException zookeeper exception
*/
List<String> getReplicableHFiles(String peerId) throws KeeperException;
}

View File

@ -1,40 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct
* various ReplicationQueuesClient Implementations with different constructor arguments by
* reflection.
*/
@InterfaceAudience.Private
public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments {
public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,
ZKWatcher zk) {
super(conf, abort, zk);
}
public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) {
super(conf, abort);
}
}

View File

@ -1,177 +0,0 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
@InterfaceAudience.Private
public class ReplicationQueuesClientZKImpl extends ReplicationStateZKBase implements
ReplicationQueuesClient {
Logger LOG = LoggerFactory.getLogger(ReplicationQueuesClientZKImpl.class);
public ReplicationQueuesClientZKImpl(ReplicationQueuesClientArguments args) {
this(args.getZk(), args.getConf(), args.getAbortable());
}
public ReplicationQueuesClientZKImpl(final ZKWatcher zk, Configuration conf,
Abortable abortable) {
super(zk, conf, abortable);
}
@Override
public void init() throws ReplicationException {
try {
if (ZKUtil.checkExists(this.zookeeper, this.queuesZNode) < 0) {
ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
}
} catch (KeeperException e) {
throw new ReplicationException("Internal error while initializing a queues client", e);
}
}
@Override
public List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException {
String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName);
znode = ZNodePaths.joinZNode(znode, queueId);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of wals for queueId=" + queueId
+ " and serverName=" + serverName, e);
throw e;
}
return result;
}
@Override
public List<String> getAllQueues(String serverName) throws KeeperException {
String znode = ZNodePaths.joinZNode(this.queuesZNode, serverName);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of queues for serverName=" + serverName, e);
throw e;
}
return result;
}
@Override
public Set<String> getAllWALs() throws KeeperException {
/**
* Load all wals in all replication queues from ZK. This method guarantees to return a
* snapshot which contains all WALs in the zookeeper at the start of this call even there
* is concurrent queue failover. However, some newly created WALs during the call may
* not be included.
*/
for (int retry = 0; ; retry++) {
int v0 = getQueuesZNodeCversion();
List<String> rss = getListOfReplicators();
if (rss == null || rss.isEmpty()) {
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
return ImmutableSet.of();
}
Set<String> wals = Sets.newHashSet();
for (String rs : rss) {
List<String> listOfPeers = getAllQueues(rs);
// if rs just died, this will be null
if (listOfPeers == null) {
continue;
}
for (String id : listOfPeers) {
List<String> peersWals = getLogsInQueue(rs, id);
if (peersWals != null) {
wals.addAll(peersWals);
}
}
}
int v1 = getQueuesZNodeCversion();
if (v0 == v1) {
return wals;
}
LOG.info(String.format("Replication queue node cversion changed from %d to %d, retry = %d",
v0, v1, retry));
}
}
public int getQueuesZNodeCversion() throws KeeperException {
try {
Stat stat = new Stat();
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
return stat.getCversion();
} catch (KeeperException e) {
this.abortable.abort("Failed to get stat of replication rs node", e);
throw e;
}
}
@Override
public int getHFileRefsNodeChangeVersion() throws KeeperException {
Stat stat = new Stat();
try {
ZKUtil.getDataNoWatch(this.zookeeper, this.hfileRefsZNode, stat);
} catch (KeeperException e) {
this.abortable.abort("Failed to get stat of replication hfile references node.", e);
throw e;
}
return stat.getCversion();
}
@Override
public List<String> getAllPeersFromHFileRefsQueue() throws KeeperException {
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.hfileRefsZNode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of all peers in hfile references node.", e);
throw e;
}
return result;
}
@Override
public List<String> getReplicableHFiles(String peerId) throws KeeperException {
String znode = ZNodePaths.joinZNode(this.hfileRefsZNode, peerId);
List<String> result = null;
try {
result = ZKUtil.listChildrenNoWatch(this.zookeeper, znode);
} catch (KeeperException e) {
this.abortable.abort("Failed to get list of hfile references for peerId=" + peerId, e);
throw e;
}
return result;
}
}

View File

@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.util.CollectionUtils.nullToEmpty;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
@ -49,7 +50,7 @@ import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Sets;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
/**
* ZK based replication queue storage.
@ -61,7 +62,7 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
"zookeeper.znode.replication.hfile.refs";
"zookeeper.znode.replication.hfile.refs";
public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
/**
@ -256,11 +257,23 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
}
private List<String> getLogsInQueue0(ServerName serverName, String queueId)
private List<String> getWALsInQueue0(ServerName serverName, String queueId)
throws KeeperException {
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getQueueNode(serverName, queueId)));
}
@Override
public List<String> getWALsInQueue(ServerName serverName, String queueId)
throws ReplicationException {
try {
return getWALsInQueue0(serverName, queueId);
} catch (KeeperException e) {
throw new ReplicationException(
"Failed to get wals in queue (serverName=" + serverName + ", queueId=" + queueId + ")",
e);
}
}
private List<String> getAllQueues0(ServerName serverName) throws KeeperException {
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, getRsNode(serverName)));
}
@ -274,7 +287,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
}
private int getQueuesZNodeCversion() throws KeeperException {
// will be overridden in UTs
@VisibleForTesting
protected int getQueuesZNodeCversion() throws KeeperException {
Stat stat = new Stat();
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
return stat.getCversion();
@ -290,10 +305,10 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
LOG.debug("Didn't find any region server that replicates, won't prevent any deletions.");
return Collections.emptySet();
}
Set<String> wals = Sets.newHashSet();
Set<String> wals = new HashSet<>();
for (ServerName rs : rss) {
for (String queueId : getAllQueues0(rs)) {
wals.addAll(getLogsInQueue0(rs, queueId));
wals.addAll(getWALsInQueue0(rs, queueId));
}
}
int v1 = getQueuesZNodeCversion();
@ -356,9 +371,9 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
if (debugEnabled) {
LOG.debug("Adding hfile references " + pairs + " in queue " + peerNode);
}
List<ZKUtilOp> listOfOps =
pairs.stream().map(p -> p.getSecond().getName()).map(n -> getHFileNode(peerNode, n))
.map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList());
List<ZKUtilOp> listOfOps = pairs.stream().map(p -> p.getSecond().getName())
.map(n -> getHFileNode(peerNode, n))
.map(f -> ZKUtilOp.createAndFailSilent(f, HConstants.EMPTY_BYTE_ARRAY)).collect(toList());
if (debugEnabled) {
LOG.debug("The multi list size for adding hfile references in zk for node " + peerNode +
" is " + listOfOps.size());
@ -391,8 +406,37 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
}
private List<String> getAllPeersFromHFileRefsQueue0() throws KeeperException {
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode));
}
@Override
public int getHFileRefsNodeChangeVersion() throws ReplicationException {
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
try {
return getAllPeersFromHFileRefsQueue0();
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of all peers in hfile references node.",
e);
}
}
private List<String> getReplicableHFiles0(String peerId) throws KeeperException {
return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId)));
}
@Override
public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
try {
return getReplicableHFiles0(peerId);
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of hfile references for peer " + peerId,
e);
}
}
// will be overridden in UTs
@VisibleForTesting
protected int getHFileRefsZNodeCversion() throws ReplicationException {
Stat stat = new Stat();
try {
ZKUtil.getDataNoWatch(zookeeper, hfileRefsZNode, stat);
@ -403,23 +447,29 @@ class ZKReplicationQueueStorage extends ZKReplicationStorageBase
}
@Override
public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
public Set<String> getAllHFileRefs() throws ReplicationException {
try {
return nullToEmpty(ZKUtil.listChildrenNoWatch(zookeeper, hfileRefsZNode));
for (int retry = 0;; retry++) {
int v0 = getHFileRefsZNodeCversion();
List<String> peers = getAllPeersFromHFileRefsQueue();
if (peers.isEmpty()) {
LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions.");
return Collections.emptySet();
}
Set<String> hfileRefs = new HashSet<>();
for (String peer : peers) {
hfileRefs.addAll(getReplicableHFiles0(peer));
}
int v1 = getHFileRefsZNodeCversion();
if (v0 == v1) {
return hfileRefs;
}
LOG.debug(String.format(
"Replication hfile references node cversion changed from " + "%d to %d, retry = %d", v0,
v1, retry));
}
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of all peers in hfile references node.",
e);
throw new ReplicationException("Failed to get all hfile refs", e);
}
}
@Override
public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
try {
return nullToEmpty(ZKUtil.listChildrenNoWatch(this.zookeeper, getHFileRefsPeerNode(peerId)));
} catch (KeeperException e) {
throw new ReplicationException("Failed to get list of hfile references for peer " + peerId,
e);
}
}
}

View File

@ -15,20 +15,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.zookeeper.KeeperException;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -42,10 +45,10 @@ public abstract class TestReplicationStateBasic {
protected ReplicationQueues rq1;
protected ReplicationQueues rq2;
protected ReplicationQueues rq3;
protected ReplicationQueuesClient rqc;
protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString();
protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString();
protected ReplicationQueueStorage rqs;
protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345);
protected ReplicationPeers rp;
protected static final String ID_ONE = "1";
protected static final String ID_TWO = "2";
@ -62,25 +65,19 @@ public abstract class TestReplicationStateBasic {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
@Before
public void setUp() {
zkTimeoutCount = 0;
}
@Test
public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
rqc.init();
public void testReplicationQueueStorage() throws ReplicationException {
// Test methods with empty state
assertEquals(0, rqc.getListOfReplicators().size());
assertNull(rqc.getLogsInQueue(server1, "qId1"));
assertNull(rqc.getAllQueues(server1));
assertEquals(0, rqs.getListOfReplicators().size());
assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty());
assertTrue(rqs.getAllQueues(server1).isEmpty());
/*
* Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
* server2: zero queues
*/
rq1.init(server1);
rq2.init(server2);
rq1.init(server1.getServerName());
rq2.init(server2.getServerName());
rq1.addLog("qId1", "trash");
rq1.removeLog("qId1", "trash");
rq1.addLog("qId2", "filename1");
@ -89,20 +86,20 @@ public abstract class TestReplicationStateBasic {
rq2.addLog("trash", "trash");
rq2.removeQueue("trash");
List<String> reps = rqc.getListOfReplicators();
List<ServerName> reps = rqs.getListOfReplicators();
assertEquals(2, reps.size());
assertTrue(server1, reps.contains(server1));
assertTrue(server2, reps.contains(server2));
assertTrue(server1.getServerName(), reps.contains(server1));
assertTrue(server2.getServerName(), reps.contains(server2));
assertNull(rqc.getLogsInQueue("bogus", "bogus"));
assertNull(rqc.getLogsInQueue(server1, "bogus"));
assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty());
assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size());
assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size());
assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0));
assertNull(rqc.getAllQueues("bogus"));
assertEquals(0, rqc.getAllQueues(server2).size());
List<String> list = rqc.getAllQueues(server1);
assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty());
assertEquals(0, rqs.getAllQueues(server2).size());
List<String> list = rqs.getAllQueues(server1);
assertEquals(3, list.size());
assertTrue(list.contains("qId2"));
assertTrue(list.contains("qId3"));
@ -110,10 +107,10 @@ public abstract class TestReplicationStateBasic {
@Test
public void testReplicationQueues() throws ReplicationException {
rq1.init(server1);
rq2.init(server2);
rq3.init(server3);
//Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
rq1.init(server1.getServerName());
rq2.init(server2.getServerName());
rq3.init(server3.getServerName());
// Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
rp.init();
// 3 replicators should exist
@ -124,8 +121,7 @@ public abstract class TestReplicationStateBasic {
assertEquals(0, rq1.getAllQueues().size());
assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
assertNull(rq1.getLogsInQueue("bogus"));
assertNull(rq1.getUnClaimedQueueIds(
ServerName.valueOf("bogus", 1234, -1L).toString()));
assertNull(rq1.getUnClaimedQueueIds(ServerName.valueOf("bogus", 1234, -1L).toString()));
rq1.setLogPosition("bogus", "bogus", 5L);
@ -144,21 +140,21 @@ public abstract class TestReplicationStateBasic {
assertEquals(1, rq2.getAllQueues().size());
assertEquals(5, rq3.getAllQueues().size());
assertEquals(0, rq3.getUnClaimedQueueIds(server1).size());
rq3.removeReplicatorIfQueueIsEmpty(server1);
assertEquals(0, rq3.getUnClaimedQueueIds(server1.getServerName()).size());
rq3.removeReplicatorIfQueueIsEmpty(server1.getServerName());
assertEquals(2, rq3.getListOfReplicators().size());
List<String> queues = rq2.getUnClaimedQueueIds(server3);
List<String> queues = rq2.getUnClaimedQueueIds(server3.getServerName());
assertEquals(5, queues.size());
for(String queue: queues) {
rq2.claimQueue(server3, queue);
for (String queue : queues) {
rq2.claimQueue(server3.getServerName(), queue);
}
rq2.removeReplicatorIfQueueIsEmpty(server3);
rq2.removeReplicatorIfQueueIsEmpty(server3.getServerName());
assertEquals(1, rq2.getListOfReplicators().size());
// Try to claim our own queues
assertNull(rq2.getUnClaimedQueueIds(server2));
rq2.removeReplicatorIfQueueIsEmpty(server2);
assertNull(rq2.getUnClaimedQueueIds(server2.getServerName()));
rq2.removeReplicatorIfQueueIsEmpty(server2.getServerName());
assertEquals(6, rq2.getAllQueues().size());
@ -174,8 +170,8 @@ public abstract class TestReplicationStateBasic {
try {
rp.registerPeer(ID_ONE,
new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
fail("Should throw an IllegalArgumentException because "
+ "zookeeper.znode.parent is missing leading '/'.");
fail("Should throw an IllegalArgumentException because " +
"zookeeper.znode.parent is missing leading '/'.");
} catch (IllegalArgumentException e) {
// Expected.
}
@ -191,8 +187,8 @@ public abstract class TestReplicationStateBasic {
try {
rp.registerPeer(ID_ONE,
new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
fail("Should throw an IllegalArgumentException because "
+ "hbase.zookeeper.property.clientPort is missing.");
fail("Should throw an IllegalArgumentException because " +
"hbase.zookeeper.property.clientPort is missing.");
} catch (IllegalArgumentException e) {
// Expected.
}
@ -201,38 +197,36 @@ public abstract class TestReplicationStateBasic {
@Test
public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
rp.init();
rq1.init(server1);
rqc.init();
rq1.init(server1.getServerName());
List<Pair<Path, Path>> files1 = new ArrayList<>(3);
files1.add(new Pair<>(null, new Path("file_1")));
files1.add(new Pair<>(null, new Path("file_2")));
files1.add(new Pair<>(null, new Path("file_3")));
assertNull(rqc.getReplicableHFiles(ID_ONE));
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
rq1.addPeerToHFileRefs(ID_ONE);
rq1.addHFileRefs(ID_ONE, files1);
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
List<String> hfiles2 = new ArrayList<>(files1.size());
for (Pair<Path, Path> p : files1) {
hfiles2.add(p.getSecond().getName());
}
String removedString = hfiles2.remove(0);
rq1.removeHFileRefs(ID_ONE, hfiles2);
assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
hfiles2 = new ArrayList<>(1);
hfiles2.add(removedString);
rq1.removeHFileRefs(ID_ONE, hfiles2);
assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
rp.unregisterPeer(ID_ONE);
}
@Test
public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
rq1.init(server1);
rqc.init();
rq1.init(server1.getServerName());
rp.init();
rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
@ -246,20 +240,20 @@ public abstract class TestReplicationStateBasic {
files1.add(new Pair<>(null, new Path("file_3")));
rq1.addHFileRefs(ID_ONE, files1);
rq1.addHFileRefs(ID_TWO, files1);
assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
rp.unregisterPeer(ID_ONE);
rq1.removePeerFromHFileRefs(ID_ONE);
assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
assertNull(rqc.getReplicableHFiles(ID_ONE));
assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
rp.unregisterPeer(ID_TWO);
rq1.removePeerFromHFileRefs(ID_TWO);
assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
assertNull(rqc.getReplicableHFiles(ID_TWO));
assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
}
@Test
@ -316,8 +310,14 @@ public abstract class TestReplicationStateBasic {
assertNumberOfPeers(2);
assertTrue(rp.getStatusOfPeer(ID_ONE));
rp.disablePeer(ID_ONE);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually...
assertEquals(PeerState.DISABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true));
assertConnectedPeerStatus(false, ID_ONE);
rp.enablePeer(ID_ONE);
// now we do not rely on zk watcher to trigger the state change so we need to trigger it
// manually...
assertEquals(PeerState.ENABLED, rp.getConnectedPeer(ID_ONE).getPeerState(true));
assertConnectedPeerStatus(true, ID_ONE);
// Disconnect peer
@ -340,8 +340,8 @@ public abstract class TestReplicationStateBasic {
return;
}
if (zkTimeoutCount < ZK_MAX_COUNT) {
LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
+ ", sleeping and trying again.");
LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status +
", sleeping and trying again.");
Thread.sleep(ZK_SLEEP_INTERVAL);
} else {
fail("Timed out waiting for ConnectedPeerStatus to be " + status);
@ -370,9 +370,9 @@ public abstract class TestReplicationStateBasic {
for (int j = 0; j < i; j++) {
rq3.addLog("qId" + i, "filename" + j);
}
//Add peers for the corresponding queues so they are not orphans
rp.registerPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
// Add peers for the corresponding queues so they are not orphans
rp.registerPeer("qId" + i,
new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
}
}
}

View File

@ -24,19 +24,12 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@ -52,24 +45,24 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Category({ReplicationTests.class, MediumTests.class})
@Category({ ReplicationTests.class, MediumTests.class })
public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class);
private static Configuration conf;
private static HBaseTestingUtility utility;
private static HBaseZKTestingUtility utility;
private static ZKWatcher zkw;
private static String replicationZNode;
private ReplicationQueuesZKImpl rqZK;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
utility = new HBaseTestingUtility();
utility = new HBaseZKTestingUtility();
utility.startMiniZKCluster();
conf = utility.getConfiguration();
conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
zkw = utility.getZooKeeperWatcher();
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName);
KEY_ONE = initPeerClusterState("/hbase1");
@ -89,18 +82,17 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
}
@Before
@Override
public void setUp() {
super.setUp();
DummyServer ds1 = new DummyServer(server1);
DummyServer ds2 = new DummyServer(server2);
DummyServer ds3 = new DummyServer(server3);
zkTimeoutCount = 0;
WarnOnlyAbortable abortable = new WarnOnlyAbortable();
try {
rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
rqc = ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(conf, ds1, zkw));
rq1 = ReplicationFactory
.getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
rq2 = ReplicationFactory
.getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
rq3 = ReplicationFactory
.getReplicationQueues(new ReplicationQueuesArguments(conf, abortable, zkw));
rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
} catch (Exception e) {
// This should not occur, because getReplicationQueues() only throws for
// TableBasedReplicationQueuesImpl
@ -108,7 +100,7 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
}
rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
rqZK = new ReplicationQueuesZKImpl(zkw, conf, abortable);
}
@After
@ -138,90 +130,19 @@ public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
assertTrue(rqZK.isPeerPath(peerPath));
}
static class DummyServer implements Server {
private String serverName;
private boolean isAborted = false;
private boolean isStopped = false;
public DummyServer(String serverName) {
this.serverName = serverName;
}
@Override
public Configuration getConfiguration() {
return conf;
}
@Override
public ZKWatcher getZooKeeper() {
return zkw;
}
@Override
public CoordinatedStateManager getCoordinatedStateManager() {
return null;
}
@Override
public ClusterConnection getConnection() {
return null;
}
@Override
public MetaTableLocator getMetaTableLocator() {
return null;
}
@Override
public ServerName getServerName() {
return ServerName.valueOf(this.serverName);
}
private static class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.info("Aborting " + serverName);
this.isAborted = true;
LOG.warn("TestReplicationStateZKImpl received abort, ignoring. Reason: " + why);
if (LOG.isDebugEnabled()) {
LOG.debug(e.toString(), e);
}
}
@Override
public boolean isAborted() {
return this.isAborted;
}
@Override
public void stop(String why) {
this.isStopped = true;
}
@Override
public boolean isStopped() {
return this.isStopped;
}
@Override
public ChoreService getChoreService() {
return null;
}
@Override
public ClusterConnection getClusterConnection() {
// TODO Auto-generated method stub
return null;
}
@Override
public FileSystem getFileSystem() {
return null;
}
@Override
public boolean isStopping() {
return false;
}
@Override
public Connection createConnection(Configuration conf) throws IOException {
return null;
}
}
}

View File

@ -23,15 +23,18 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.SortedSet;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -115,6 +118,15 @@ public class TestZKReplicationQueueStorage {
assertEquals(2, queueIds.size());
assertThat(queueIds, hasItems("1", "2"));
List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
assertEquals(10, wals1.size());
assertEquals(10, wals1.size());
for (int i = 0; i < 10; i++) {
assertThat(wals1, hasItems(getFileName("file1", i)));
assertThat(wals2, hasItems(getFileName("file2", i)));
}
for (int i = 0; i < 10; i++) {
assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
@ -157,10 +169,20 @@ public class TestZKReplicationQueueStorage {
queueIds = STORAGE.getAllQueues(serverName1);
assertEquals(1, queueIds.size());
assertThat(queueIds, hasItems("2"));
wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
assertEquals(5, wals2.size());
for (i = 0; i < 10; i += 2) {
assertThat(wals2, hasItems(getFileName("file2", i)));
}
queueIds = STORAGE.getAllQueues(serverName2);
assertEquals(1, queueIds.size());
assertThat(queueIds, hasItems(peer1.getFirst()));
wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst());
assertEquals(5, wals1.size());
for (i = 1; i < 10; i += 2) {
assertThat(wals1, hasItems(getFileName("file1", i)));
}
Set<String> allWals = STORAGE.getAllWALs();
assertEquals(10, allWals.size());
@ -168,4 +190,56 @@ public class TestZKReplicationQueueStorage {
assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i)));
}
}
// For HBASE-12865
@Test
public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException {
ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
STORAGE.addWAL(serverName1, "1", "file");
int v0 = STORAGE.getQueuesZNodeCversion();
ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
STORAGE.claimQueue(serverName1, "1", serverName2);
int v1 = STORAGE.getQueuesZNodeCversion();
// cversion should increase by 1 since a child node is deleted
assertEquals(1, v1 - v0);
}
private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException {
return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
private int called = 0;
@Override
protected int getQueuesZNodeCversion() throws KeeperException {
if (called < 4) {
called++;
}
return called;
}
};
}
@Test
public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
ZKReplicationQueueStorage storage = createWithUnstableCversion();
storage.addWAL(getServerName(0), "1", "file");
// This should return eventually when cversion stabilizes
Set<String> allWals = storage.getAllWALs();
assertEquals(1, allWals.size());
assertThat(allWals, hasItems("file"));
}
// For HBASE-14621
@Test
public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
ZKReplicationQueueStorage storage = createWithUnstableCversion();
storage.addPeerToHFileRefs("1");
Path p = new Path("/test");
storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
// This should return eventually when cversion stabilizes
Set<String> allHFileRefs = storage.getAllHFileRefs();
assertEquals(1, allHFileRefs.size());
assertThat(allHFileRefs, hasItems("test"));
}
}

View File

@ -23,21 +23,23 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -48,23 +50,19 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class ReplicationZKNodeCleaner {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationZKNodeCleaner.class);
private final ZKWatcher zkw;
private final ReplicationQueuesClient queuesClient;
private final ReplicationQueueStorage queueStorage;
private final ReplicationPeers replicationPeers;
private final ReplicationQueueDeletor queueDeletor;
public ReplicationZKNodeCleaner(Configuration conf, ZKWatcher zkw, Abortable abortable)
throws IOException {
try {
this.zkw = zkw;
this.queuesClient = ReplicationFactory
.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf, abortable, zkw));
this.queuesClient.init();
this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf, this.queuesClient,
abortable);
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
this.replicationPeers =
ReplicationFactory.getReplicationPeers(zkw, conf, this.queueStorage, abortable);
this.replicationPeers.init();
this.queueDeletor = new ReplicationQueueDeletor(zkw, conf, abortable);
} catch (Exception e) {
} catch (ReplicationException e) {
throw new IOException("failed to construct ReplicationZKNodeCleaner", e);
}
}
@ -73,16 +71,16 @@ public class ReplicationZKNodeCleaner {
* @return undeletedQueues replicator with its queueIds for removed peers
* @throws IOException
*/
public Map<String, List<String>> getUnDeletedQueues() throws IOException {
Map<String, List<String>> undeletedQueues = new HashMap<>();
public Map<ServerName, List<String>> getUnDeletedQueues() throws IOException {
Map<ServerName, List<String>> undeletedQueues = new HashMap<>();
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
try {
List<String> replicators = this.queuesClient.getListOfReplicators();
List<ServerName> replicators = this.queueStorage.getListOfReplicators();
if (replicators == null || replicators.isEmpty()) {
return undeletedQueues;
}
for (String replicator : replicators) {
List<String> queueIds = this.queuesClient.getAllQueues(replicator);
for (ServerName replicator : replicators) {
List<String> queueIds = this.queueStorage.getAllQueues(replicator);
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (!peerIds.contains(queueInfo.getPeerId())) {
@ -96,7 +94,7 @@ public class ReplicationZKNodeCleaner {
}
}
}
} catch (KeeperException ke) {
} catch (ReplicationException ke) {
throw new IOException("Failed to get the replication queues of all replicators", ke);
}
return undeletedQueues;
@ -105,25 +103,21 @@ public class ReplicationZKNodeCleaner {
/**
* @return undeletedHFileRefsQueue replicator with its undeleted queueIds for removed peers in
* hfile-refs queue
* @throws IOException
*/
public Set<String> getUnDeletedHFileRefsQueues() throws IOException {
Set<String> undeletedHFileRefsQueue = new HashSet<>();
Set<String> peerIds = new HashSet<>(this.replicationPeers.getAllPeerIds());
String hfileRefsZNode = queueDeletor.getHfileRefsZNode();
try {
if (-1 == ZKUtil.checkExists(zkw, hfileRefsZNode)) {
return null;
}
List<String> listOfPeers = this.queuesClient.getAllPeersFromHFileRefsQueue();
List<String> listOfPeers = this.queueStorage.getAllPeersFromHFileRefsQueue();
Set<String> peers = new HashSet<>(listOfPeers);
peers.removeAll(peerIds);
if (!peers.isEmpty()) {
undeletedHFileRefsQueue.addAll(peers);
}
} catch (KeeperException e) {
throw new IOException("Failed to get list of all peers from hfile-refs znode "
+ hfileRefsZNode, e);
} catch (ReplicationException e) {
throw new IOException(
"Failed to get list of all peers from hfile-refs znode " + hfileRefsZNode, e);
}
return undeletedHFileRefsQueue;
}
@ -137,21 +131,20 @@ public class ReplicationZKNodeCleaner {
/**
* @param replicator The regionserver which has undeleted queue
* @param queueId The undeleted queue id
* @throws IOException
*/
public void removeQueue(final String replicator, final String queueId) throws IOException {
String queueZnodePath =
ZNodePaths.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator), queueId);
public void removeQueue(final ServerName replicator, final String queueId) throws IOException {
String queueZnodePath = ZNodePaths
.joinZNode(ZNodePaths.joinZNode(this.queuesZNode, replicator.getServerName()), queueId);
try {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
if (!replicationPeers.getAllPeerIds().contains(queueInfo.getPeerId())) {
ZKUtil.deleteNodeRecursively(this.zookeeper, queueZnodePath);
LOG.info("Successfully removed replication queue, replicator: " + replicator
+ ", queueId: " + queueId);
LOG.info("Successfully removed replication queue, replicator: " + replicator +
", queueId: " + queueId);
}
} catch (KeeperException e) {
throw new IOException("Failed to delete queue, replicator: " + replicator + ", queueId: "
+ queueId);
throw new IOException(
"Failed to delete queue, replicator: " + replicator + ", queueId: " + queueId);
}
}
@ -183,9 +176,9 @@ public class ReplicationZKNodeCleaner {
* @param undeletedQueues replicator with its queueIds for removed peers
* @throws IOException
*/
public void removeQueues(final Map<String, List<String>> undeletedQueues) throws IOException {
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) {
String replicator = replicatorAndQueueIds.getKey();
public void removeQueues(final Map<ServerName, List<String>> undeletedQueues) throws IOException {
for (Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueues.entrySet()) {
ServerName replicator = replicatorAndQueueIds.getKey();
for (String queueId : replicatorAndQueueIds.getValue()) {
queueDeletor.removeQueue(replicator, queueId);
}

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import java.io.IOException;
@ -23,6 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@ -45,11 +45,10 @@ public class ReplicationZKNodeCleanerChore extends ScheduledChore {
@Override
protected void chore() {
try {
Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
cleaner.removeQueues(undeletedQueues);
} catch (IOException e) {
LOG.warn("Failed to clean replication zk node", e);
}
}
}

View File

@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.BaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfigBuilder;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
@ -147,28 +148,13 @@ public final class ReplicationPeerManager {
}
}
private ReplicationPeerConfig copy(ReplicationPeerConfig peerConfig) {
ReplicationPeerConfig copiedPeerConfig = new ReplicationPeerConfig();
copiedPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration());
copiedPeerConfig.getPeerData().putAll(peerConfig.getPeerData());
copiedPeerConfig.setTableCFsMap(peerConfig.getTableCFsMap());
copiedPeerConfig.setNamespaces(peerConfig.getNamespaces());
copiedPeerConfig.setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap());
copiedPeerConfig.setExcludeNamespaces(peerConfig.getExcludeNamespaces());
copiedPeerConfig.setBandwidth(peerConfig.getBandwidth());
copiedPeerConfig.setReplicateAllUserTables(peerConfig.replicateAllUserTables());
copiedPeerConfig.setClusterKey(peerConfig.getClusterKey());
copiedPeerConfig.setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl());
return copiedPeerConfig;
}
public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
throws ReplicationException {
if (peers.containsKey(peerId)) {
// this should be a retry, just return
return;
}
ReplicationPeerConfig copiedPeerConfig = copy(peerConfig);
ReplicationPeerConfig copiedPeerConfig = ReplicationPeerConfig.newBuilder(peerConfig).build();
peerStorage.addPeer(peerId, copiedPeerConfig, enabled);
peers.put(peerId, new ReplicationPeerDescription(peerId, enabled, copiedPeerConfig));
}
@ -205,13 +191,14 @@ public final class ReplicationPeerManager {
// the checking rules are too complicated here so we give up checking whether this is a retry.
ReplicationPeerDescription desc = peers.get(peerId);
ReplicationPeerConfig oldPeerConfig = desc.getPeerConfig();
ReplicationPeerConfig newPeerConfig = copy(peerConfig);
ReplicationPeerConfigBuilder newPeerConfigBuilder =
ReplicationPeerConfig.newBuilder(peerConfig);
// we need to use the new conf to overwrite the old one.
newPeerConfig.getConfiguration().putAll(oldPeerConfig.getConfiguration());
newPeerConfig.getConfiguration().putAll(peerConfig.getConfiguration());
newPeerConfig.getPeerData().putAll(oldPeerConfig.getPeerData());
newPeerConfig.getPeerData().putAll(peerConfig.getPeerData());
newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
newPeerConfigBuilder.putAllConfiguration(oldPeerConfig.getConfiguration());
newPeerConfigBuilder.putAllConfiguration(peerConfig.getConfiguration());
ReplicationPeerConfig newPeerConfig = newPeerConfigBuilder.build();
peerStorage.updatePeerConfig(peerId, newPeerConfig);
peers.put(peerId, new ReplicationPeerDescription(peerId, desc.isEnabled(), newPeerConfig));
}

View File

@ -1,42 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.master;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate;
import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Predicate;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
/**
* Implementation of a file cleaner that checks if a hfile is still scheduled for replication before
* deleting it from hfile archive directory.
@ -45,7 +46,7 @@ import org.slf4j.LoggerFactory;
public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationHFileCleaner.class);
private ZKWatcher zkw;
private ReplicationQueuesClient rqc;
private ReplicationQueueStorage rqs;
private boolean stopped = false;
@Override
@ -60,8 +61,8 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
try {
// The concurrently created new hfile entries in ZK may not be included in the return list,
// but they won't be deleted because they're not in the checking set.
hfileRefs = loadHFileRefsFromPeers();
} catch (KeeperException e) {
hfileRefs = rqs.getAllHFileRefs();
} catch (ReplicationException e) {
LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable files");
return Collections.emptyList();
}
@ -82,37 +83,6 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
});
}
/**
* Load all hfile references in all replication queues from ZK. This method guarantees to return a
* snapshot which contains all hfile references in the zookeeper at the start of this call.
* However, some newly created hfile references during the call may not be included.
*/
private Set<String> loadHFileRefsFromPeers() throws KeeperException {
Set<String> hfileRefs = Sets.newHashSet();
List<String> listOfPeers;
for (int retry = 0;; retry++) {
int v0 = rqc.getHFileRefsNodeChangeVersion();
hfileRefs.clear();
listOfPeers = rqc.getAllPeersFromHFileRefsQueue();
if (listOfPeers == null) {
LOG.debug("Didn't find any peers with hfile references, won't prevent any deletions.");
return ImmutableSet.of();
}
for (String id : listOfPeers) {
List<String> peerHFileRefs = rqc.getReplicableHFiles(id);
if (peerHFileRefs != null) {
hfileRefs.addAll(peerHFileRefs);
}
}
int v1 = rqc.getHFileRefsNodeChangeVersion();
if (v0 == v1) {
return hfileRefs;
}
LOG.debug(String.format("Replication hfile references node cversion changed from "
+ "%d to %d, retry = %d", v0, v1, retry));
}
}
@Override
public void setConf(Configuration config) {
// If either replication or replication of bulk load hfiles is disabled, keep all members null
@ -139,17 +109,15 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
public void setConf(Configuration conf, ZKWatcher zk) {
super.setConf(conf);
try {
initReplicationQueuesClient(conf, zk);
initReplicationQueueStorage(conf, zk);
} catch (Exception e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
}
private void initReplicationQueuesClient(Configuration conf, ZKWatcher zk)
throws Exception {
private void initReplicationQueueStorage(Configuration conf, ZKWatcher zk) {
this.zkw = zk;
this.rqc = ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(
conf, new WarnOnlyAbortable(), zkw));
this.rqs = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf);
}
@Override
@ -179,25 +147,12 @@ public class ReplicationHFileCleaner extends BaseHFileCleanerDelegate {
}
try {
hfileRefsFromQueue = loadHFileRefsFromPeers();
} catch (KeeperException e) {
hfileRefsFromQueue = rqs.getAllHFileRefs();
} catch (ReplicationException e) {
LOG.warn("Failed to read hfile references from zookeeper, skipping checking deletable "
+ "file for " + fStat.getPath());
return false;
}
return !hfileRefsFromQueue.contains(fStat.getPath().getName());
}
private static class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("ReplicationHFileCleaner received abort, ignoring. Reason: " + why);
LOG.debug(e.toString(), e);
}
@Override
public boolean isAborted() {
return false;
}
}
}

View File

@ -1,5 +1,4 @@
/*
*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -24,16 +23,14 @@ import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,7 +46,7 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogCleaner.class);
private ZKWatcher zkw;
private ReplicationQueuesClient replicationQueues;
private ReplicationQueueStorage queueStorage;
private boolean stopped = false;
private Set<String> wals;
private long readZKTimestamp = 0;
@ -60,8 +57,8 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
try {
// The concurrently created new WALs may not be included in the return list,
// but they won't be deleted because they're not in the checking set.
wals = replicationQueues.getAllWALs();
} catch (KeeperException e) {
wals = queueStorage.getAllWALs();
} catch (ReplicationException e) {
LOG.warn("Failed to read zookeeper, skipping checking deletable files");
wals = null;
}
@ -110,9 +107,7 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
super.setConf(conf);
try {
this.zkw = zk;
this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(conf, new WarnOnlyAbortable(), zkw));
this.replicationQueues.init();
this.queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zk, conf);
} catch (Exception e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
}
@ -132,18 +127,4 @@ public class ReplicationLogCleaner extends BaseLogCleanerDelegate {
public boolean isStopped() {
return this.stopped;
}
private static class WarnOnlyAbortable implements Abortable {
@Override
public void abort(String why, Throwable e) {
LOG.warn("ReplicationLogCleaner received abort, ignoring. Reason: " + why);
LOG.debug(e.toString(), e);
}
@Override
public boolean isAborted() {
return false;
}
}
}

View File

@ -21,13 +21,13 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@ -48,17 +48,18 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
/**
@ -303,57 +304,53 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set<String> peerIds,
boolean hdfs) throws Exception {
ReplicationQueuesClient queuesClient;
boolean hdfs) throws Exception {
ReplicationQueueStorage queueStorage;
ReplicationPeers replicationPeers;
ReplicationQueues replicationQueues;
ReplicationTracker replicationTracker;
ReplicationQueuesClientArguments replicationArgs =
new ReplicationQueuesClientArguments(getConf(), new WarnOnlyAbortable(), zkw);
ReplicationQueuesArguments replicationArgs =
new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), zkw);
StringBuilder sb = new StringBuilder();
queuesClient = ReplicationFactory.getReplicationQueuesClient(replicationArgs);
queuesClient.init();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs);
replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection);
replicationPeers =
ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection);
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
new WarnOnlyAbortable(), new WarnOnlyStoppable());
List<String> liveRegionServers = replicationTracker.getListOfRegionServers();
Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
// Loops each peer on each RS and dumps the queues
try {
List<String> regionservers = queuesClient.getListOfReplicators();
if (regionservers == null || regionservers.isEmpty()) {
return sb.toString();
List<ServerName> regionservers = queueStorage.getListOfReplicators();
if (regionservers == null || regionservers.isEmpty()) {
return sb.toString();
}
for (ServerName regionserver : regionservers) {
List<String> queueIds = queueStorage.getAllQueues(regionserver);
replicationQueues.init(regionserver.getServerName());
if (!liveRegionServers.contains(regionserver.getServerName())) {
deadRegionServers.add(regionserver.getServerName());
}
for (String regionserver : regionservers) {
List<String> queueIds = queuesClient.getAllQueues(regionserver);
replicationQueues.init(regionserver);
if (!liveRegionServers.contains(regionserver)) {
deadRegionServers.add(regionserver);
}
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId);
if (!peerIds.contains(queueInfo.getPeerId())) {
deletedQueues.add(regionserver + "/" + queueId);
sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true,
hdfs));
} else {
sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false,
hdfs));
}
for (String queueId : queueIds) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
if (!peerIds.contains(queueInfo.getPeerId())) {
deletedQueues.add(regionserver + "/" + queueId);
sb.append(
formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
} else {
sb.append(
formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
}
}
} catch (KeeperException ke) {
throw new IOException(ke);
}
return sb.toString();
}
private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo,
String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception {
private String formatQueue(ServerName regionserver, ReplicationQueues replicationQueues,
ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
boolean hdfs) throws Exception {
StringBuilder sb = new StringBuilder();
List<ServerName> deadServers;
@ -389,13 +386,14 @@ public class DumpReplicationQueues extends Configured implements Tool {
/**
* return total size in bytes from a list of WALs
*/
private long getTotalWALSize(FileSystem fs, List<String> wals, String server) throws IOException {
private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
throws IOException {
long size = 0;
FileStatus fileStatus;
for (String wal : wals) {
try {
fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs);
fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs);
} catch (IOException e) {
if (e instanceof FileNotFoundException) {
numWalsNotFound++;

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util.hbck;
import java.io.IOException;
@ -27,22 +26,23 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
/*
/**
* Check and fix undeleted replication queues for removed peerId.
*/
@InterfaceAudience.Private
public class ReplicationChecker {
private final ErrorReporter errorReporter;
// replicator with its queueIds for removed peers
private Map<String, List<String>> undeletedQueueIds = new HashMap<>();
private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>();
// replicator with its undeleted queueIds for removed peers in hfile-refs queue
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
private final ReplicationZKNodeCleaner cleaner;
@ -60,8 +60,8 @@ public class ReplicationChecker {
public void checkUnDeletedQueues() throws IOException {
undeletedQueueIds = cleaner.getUnDeletedQueues();
for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
String replicator = replicatorAndQueueIds.getKey();
for (Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
ServerName replicator = replicatorAndQueueIds.getKey();
for (String queueId : replicatorAndQueueIds.getValue()) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
String msg = "Undeleted replication queue for removed peer found: "

View File

@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@ -34,12 +35,16 @@ import java.util.Set;
import java.util.concurrent.CompletionException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@ -56,8 +61,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
private final String ID_ONE = "1";
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
private final String ID_SECOND = "2";
private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
private final String ID_TWO = "2";
private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@ -65,21 +70,27 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
TEST_UTIL.startMiniCluster();
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@After
public void cleanupPeer() {
public void clearPeerAndQueues() throws IOException, ReplicationException {
try {
admin.removeReplicationPeer(ID_ONE).join();
} catch (Exception e) {
LOG.debug("Replication peer " + ID_ONE + " may already be removed");
}
try {
admin.removeReplicationPeer(ID_SECOND).join();
admin.removeReplicationPeer(ID_TWO).join();
} catch (Exception e) {
LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
}
ReplicationQueueStorage queueStorage = ReplicationStorageFactory
.getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
for (ServerName serverName : queueStorage.getListOfReplicators()) {
for (String queue : queueStorage.getAllQueues(serverName)) {
queueStorage.removeQueue(serverName, queue);
}
}
}
@ -88,7 +99,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE);
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
rpc2.setClusterKey(KEY_SECOND);
rpc2.setClusterKey(KEY_TWO);
// Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join();
// try adding the same (fails)
@ -101,19 +112,19 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
assertEquals(1, admin.listReplicationPeers().get().size());
// Try to remove an inexisting peer
try {
admin.removeReplicationPeer(ID_SECOND).join();
admin.removeReplicationPeer(ID_TWO).join();
fail("Test case should fail as removing a inexisting peer.");
} catch (CompletionException e) {
// OK!
}
assertEquals(1, admin.listReplicationPeers().get().size());
// Add a second since multi-slave is supported
admin.addReplicationPeer(ID_SECOND, rpc2).join();
admin.addReplicationPeer(ID_TWO, rpc2).join();
assertEquals(2, admin.listReplicationPeers().get().size());
// Remove the first peer we added
admin.removeReplicationPeer(ID_ONE).join();
assertEquals(1, admin.listReplicationPeers().get().size());
admin.removeReplicationPeer(ID_SECOND).join();
admin.removeReplicationPeer(ID_TWO).join();
assertEquals(0, admin.listReplicationPeers().get().size());
}

View File

@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -82,6 +83,7 @@ public class TestReplicationAdmin {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
TEST_UTIL.startMiniCluster();
admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
hbaseAdmin = TEST_UTIL.getAdmin();

View File

@ -1,4 +1,4 @@
/*
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@ -24,16 +24,12 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.lang.reflect.Field;
import java.net.URLEncoder;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@ -51,7 +47,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@ -65,10 +60,11 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@Category({MasterTests.class, MediumTests.class})
public class TestLogsCleaner {
@ -195,24 +191,6 @@ public class TestLogsCleaner {
}
}
@Test(timeout=5000)
public void testZnodeCversionChange() throws Exception {
Configuration conf = TEST_UTIL.getConfiguration();
ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
cleaner.setConf(conf);
ReplicationQueuesClientZKImpl rqcMock = Mockito.mock(ReplicationQueuesClientZKImpl.class);
Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
rqc.setAccessible(true);
rqc.set(cleaner, rqcMock);
// This should return eventually when cversion stabilizes
cleaner.getDeletableFiles(new LinkedList<>());
}
/**
* ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting.
*/

View File

@ -1,12 +1,19 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
* law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
* BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
* for the specific language governing permissions and limitations under the License.
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
@ -17,14 +24,10 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@ -45,7 +48,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
@ -63,10 +65,11 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@Category({ MasterTests.class, SmallTests.class })
public class TestReplicationHFileCleaner {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);
@ -188,32 +191,6 @@ public class TestReplicationHFileCleaner {
assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
}
/*
* Test for HBASE-14621. This test will not assert directly anything. Without the fix the test
* will end up in a infinite loop, so it will timeout.
*/
@Test(timeout = 15000)
public void testForDifferntHFileRefsZnodeVersion() throws Exception {
// 1. Create a file
Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
fs.createNewFile(file);
// 2. Assert file is successfully created
assertTrue("Test file not created!", fs.exists(file));
ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
cleaner.setConf(conf);
ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class);
//Return different znode version for each call
Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2);
Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
Field rqc = cleanerClass.getDeclaredField("rqc");
rqc.setAccessible(true);
rqc.set(cleaner, replicationQueuesClient);
cleaner.isFileDeletable(fs.getFileStatus(file));
}
/**
* ReplicationHFileCleaner should be able to ride over ZooKeeper errors without aborting.
*/

View File

@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
@ -26,6 +25,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
@ -43,9 +43,9 @@ public class TestReplicationZKNodeCleaner {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final String ID_ONE = "1";
private final String SERVER_ONE = "server1";
private final ServerName SERVER_ONE = ServerName.valueOf("server1", 8000, 1234);
private final String ID_TWO = "2";
private final String SERVER_TWO = "server2";
private final ServerName SERVER_TWO = ServerName.valueOf("server2", 8000, 1234);
private final Configuration conf;
private final ZKWatcher zkw;
@ -72,12 +72,12 @@ public class TestReplicationZKNodeCleaner {
@Test
public void testReplicationZKNodeCleaner() throws Exception {
repQueues.init(SERVER_ONE);
repQueues.init(SERVER_ONE.getServerName());
// add queue for ID_ONE which isn't exist
repQueues.addLog(ID_ONE, "file1");
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
assertEquals(1, undeletedQueues.size());
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
@ -100,7 +100,7 @@ public class TestReplicationZKNodeCleaner {
@Test
public void testReplicationZKNodeCleanerChore() throws Exception {
repQueues.init(SERVER_ONE);
repQueues.init(SERVER_ONE.getServerName());
// add queue for ID_ONE which isn't exist
repQueues.addLog(ID_ONE, "file1");
// add a recovery queue for ID_TWO which isn't exist

View File

@ -1,34 +1,34 @@
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertTrue;
import java.util.List;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@ -36,11 +36,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and
* ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
@ -114,41 +109,4 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
server.stop("");
}
@Test
public void testFailoverDeadServerCversionChange() throws Exception {
final Server s0 = new DummyServer("cversion-change0.example.org");
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0,
s0.getZooKeeper()));
repQueues.init(s0.getServerName().toString());
// populate some znodes in the peer znode
files.add("log1");
files.add("log2");
for (String file : files) {
repQueues.addLog("1", file);
}
// simulate queue transfer
Server s1 = new DummyServer("cversion-change1.example.org");
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
s1.getZooKeeper()));
rq1.init(s1.getServerName().toString());
ReplicationQueuesClientZKImpl client =
(ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
int v0 = client.getQueuesZNodeCversion();
List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName());
for(String queue : queues) {
rq1.claimQueue(s0.getServerName().getServerName(), queue);
}
rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName());
int v1 = client.getQueuesZNodeCversion();
// cversion should increase by 1 since a child node is deleted
assertEquals(v0 + 1, v1);
s0.stop("");
}
}